Map DeploymentResponse to Dataset

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

How can I map a DeploymentHandle.remote() call to a Dataset? I’ve tried several different things. Here is a toy example of what I am trying to do:

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse

import numpy as np
import asyncio
import requests


@serve.deployment
class ToyModel:
    def __init__(self):
        pass

    async def __call__(self, request):
        await asyncio.sleep(3)
        return np.random.random(256, 256, 3)


@serve.deployment()
class ToyProducer:
    def __init__(self, handle: DeploymentHandle):
        self.handle = handle

    async def __call__(self, request):
        ds = ray.data.from_numpy(np.random.random((3, 256, 256, 3)))

        def call_toy_model(block):
            block["pred"] = self.handle.remote(block["data"])
            return block

        ds = ds.map(call_toy_model)

        return ds.materialize()


handle = ToyModel.bind()
producer = ToyProducer.bind(handle)

ray.init()
serve.run(producer)

# Call the producer to trigger the pipeline
ret = requests.get("http://localhost:8000/")

print(ret.content.decode("utf-8"))

I’m not too familiar with Ray Datasets, but does this work if you get the result of the self.handle.remote(block["data"]) remote call before passing it into block? Or is the expected behavior that you can return an ObjectRef and Ray Data will resolve it for you?

If that’s the case, then you may need to convert the result of self.handle.remote(block["data"]) to a Ray ObjectRef. You can use the _to_object_ref developer API to do so.

I’m not familiar with the _to_object_ref api. This looks like it might help though. Thank you! I will let you know if this solves the issue.

1 Like

I was able to get it working using the following code. I’m wondering if this is the correct way to tackle this? Mainly, there seems to be no way to call an async function from dataset.map.

import ray
from ray import serve
from ray.serve.handle import DeploymentHandle, DeploymentResponse

import numpy as np
import asyncio
import requests


@serve.deployment(num_replicas=5, ray_actor_options={"num_cpus": 1})
class ToyModel:
    def __init__(self):
        pass

    async def __call__(self, data):
        await asyncio.sleep(3)
        return np.ones_like(data)


def call_toy_model(block, name="ToyModel"):
    loop = asyncio.get_event_loop()

    handle = ray.serve.get_deployment_handle(name, app_name="DatasetToy")

    future = handle.remote(block["data"])._to_object_ref()

    obj_ref = loop.run_until_complete(future)

    block["pred"] = ray.get(obj_ref)

    return block


@serve.deployment()
class ToyProducer:
    def __init__(self, handle):
        pass

    async def __call__(self, request):
        ds = ray.data.from_numpy(np.random.random((5, 1, 1, 3)))

        ds = ds.map(call_toy_model)

        for block in ds.iter_rows():
            print(block)

        return


handle = ToyModel.bind()
producer = ToyProducer.bind(handle)

ray.init()
serve.run(producer, name="DatasetToy")

# Call the producer to trigger the pipeline
ret = requests.get("http://localhost:8000/")

print(ret.content.decode("utf-8"))