How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
How can I map a DeploymentHandle.remote()
call to a Dataset? I’ve tried several different things. Here is a toy example of what I am trying to do:
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse
import numpy as np
import asyncio
import requests
@serve.deployment
class ToyModel:
def __init__(self):
pass
async def __call__(self, request):
await asyncio.sleep(3)
return np.random.random(256, 256, 3)
@serve.deployment()
class ToyProducer:
def __init__(self, handle: DeploymentHandle):
self.handle = handle
async def __call__(self, request):
ds = ray.data.from_numpy(np.random.random((3, 256, 256, 3)))
def call_toy_model(block):
block["pred"] = self.handle.remote(block["data"])
return block
ds = ds.map(call_toy_model)
return ds.materialize()
handle = ToyModel.bind()
producer = ToyProducer.bind(handle)
ray.init()
serve.run(producer)
# Call the producer to trigger the pipeline
ret = requests.get("http://localhost:8000/")
print(ret.content.decode("utf-8"))
I’m not too familiar with Ray Datasets, but does this work if you get the result
of the self.handle.remote(block["data"])
remote call before passing it into block
? Or is the expected behavior that you can return an ObjectRef
and Ray Data will resolve it for you?
If that’s the case, then you may need to convert the result of self.handle.remote(block["data"])
to a Ray ObjectRef. You can use the _to_object_ref
developer API to do so.
I’m not familiar with the _to_object_ref
api. This looks like it might help though. Thank you! I will let you know if this solves the issue.
1 Like
I was able to get it working using the following code. I’m wondering if this is the correct way to tackle this? Mainly, there seems to be no way to call an async function from dataset.map
.
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse
import numpy as np
import asyncio
import requests
@serve.deployment(num_replicas=5, ray_actor_options={"num_cpus": 1})
class ToyModel:
def __init__(self):
pass
async def __call__(self, data):
await asyncio.sleep(3)
return np.ones_like(data)
def call_toy_model(block, name="ToyModel"):
loop = asyncio.get_event_loop()
handle = ray.serve.get_deployment_handle(name, app_name="DatasetToy")
future = handle.remote(block["data"])._to_object_ref()
obj_ref = loop.run_until_complete(future)
block["pred"] = ray.get(obj_ref)
return block
@serve.deployment()
class ToyProducer:
def __init__(self, handle):
pass
async def __call__(self, request):
ds = ray.data.from_numpy(np.random.random((5, 1, 1, 3)))
ds = ds.map(call_toy_model)
for block in ds.iter_rows():
print(block)
return
handle = ToyModel.bind()
producer = ToyProducer.bind(handle)
ray.init()
serve.run(producer, name="DatasetToy")
# Call the producer to trigger the pipeline
ret = requests.get("http://localhost:8000/")
print(ret.content.decode("utf-8"))