Workflow calling Deployment.remote()?

I have the following use case:

  • I want to build a server that will receive several items to run inference with a large model.
  • Inference might take very long (between minutes and hours), so I don’t want to block the client.
  • Server should receive the request, immediately send a reply, and queue the work.
  • The inference results will be stored in some DB afterwards.

After discovering Ray Workflows this seemed like it should solve my use case.

First solution

@ray.remote
def run_inference(items) -> Dict:
    # Takes long some time!
    model = load_model()
    predictions = model.predict(items)
    return predictions


@serve.deployment
class IngressDeployment:
    async def __call__(self, http_request: Request) -> Dict:
        items: Dict = await http_request.json()
        workflow.run_async(run_inference.options(num_gpus=1).bind(items))
        return {"success": True}


ray.init(storage="/tmp/data")
app = IngressDeployment.bind()

This works, but I feel this is a bit suboptimal for a couple reasons:

  1. The model is quite large, so load_model() takes a long time, and not all requests will be very large. So, sometimes the inference time might be shorter than the time it took to load the model…
  2. It’s not clear to me how to control concurrency. Right now, the Workflow only has this 1 task, but soon I’ll add more, some light and some heavy, which should run in parallel. To not run out of resources, I’d like to be able to have individual task concurrency (which I believe is not possible at the moment). For example, have at most 3 tasks of type X, and 4 tasks of type Y running at the same time.

Second solution: interact with Ray Deployment
But both these things should be solvable with Deployments! I can control the number of replicas on each deployment, and load the model when the replica is initializing. So my second version looks something like this:

@ray.remote
def run_inference(inference_handle, items) -> Dict:
    predictions = inference_handle.remote(items)
    print(type(predictions))
    return predictions


@serve.deployment(ray_actor_options={"num_gpus": 1})
class InferenceDeployment:
    def __init__(self):
        self.model = load_model()  

    async def __call__(self, items) -> Dict:
        predictions = self.model.predict(items)
        return predictions


@serve.deployment
class IngressDeployment:
    def __init__(self, inference_handle):
        self.inference_handle = inference_handle.options(use_new_handle_api=True)

    async def __call__(self, http_request: Request) -> Dict:
        items: Dict = await http_request.json()
        workflow.run_async(run_inference.bind(self.inference_handle, items))
        return {"success": True}


ray.init(storage="/tmp/data")
inference = InferenceDeployment.bind()
app = IngressDeployment.bind(inference)

But this doesn’t work… For example, I get errors like

TypeError: cannot pickle '_asyncio.Task' object

I tried many variations of the code above but every time I run into some error. Probably I’m not using the API correctly, but at this point I’m stuck.

  • Is it possible to do what I’m trying here?
  • Otherwise, how could I improve the solution 1?

Many thanks!