Long Running Aynch Job

How severe does this issue affect your experience of using Ray?

  • None: Just asking a question out of curiosity

I’m putting together a prototype for a model serving framework as a proof of concept. I’m not 100% sure where to start though. The basic requirement is that users need to be able to post a large CSV file to the an endpoint using something like curl. The files are in the order of 10k rows. They want the model to process the file in the background then save the predictions somewhere and send an email when they’re ready.

I don’t expect many concurrent requests. So my initial thoughts are to create a ray serve deployment. The deployment’s __call__ accepts a request containing csv data. It then starts an async task to do the processing/save the file/send the email and returns immediately. There probably also needs to be a maximum number of concurrent requests, the caller should block (with a timeout) to prevent overloading the deployments compute resources.

Is there an example of this pattern I can use for inspiration? I’ve got a basic deployment working with batching but that isn’t really what I’m looking for.

You can do this ideally via chaining multiple Ray Tasks and/or Serve deployment together. Use a single deployment as entry point to accept the CSV file and kick off async task in the same asyncio event loop to manage rest of the tasks. Serving ML Models — Ray 1.11.0

Thanks, I’m not really 100% sure how to do what you suggest though. What I have got (apparently) working so far is to create a separate actor using a concurrency_group to run the model, and a deployment to queue up jobs. This is based on https://docs.ray.io/en/latest/ray-core/async_api.html

Not sure if this is the best way to do it - as a proof of concept using Flask with Redis queues actually seems a lot simpler. But it does seem to work on the surface - the deployment __call__ method returns immediately, the actor runs the task and even when I do multiple posts only one task runs at a time.

@ray.remote(concurrency_groups={"compute": 1}, num_gpus=1)
class ModelActor:
    def __init__(self, model_path:str):
        logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
        self.model = create_model(model_path, cuda_device=0)

    @ray.method(concurrency_group="compute")
    async def predict(self, batch):
        logging.info(f"predict: {len(batch)}")
        time.sleep(10) 
        logging.info(f"predict: finished")
@serve.deployment()
class ModelDeployment:
    def __init__(self, model_path:str):
        logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
        self.worker = ModelActor.remote(model_path)

    async def __call__(self, request):
        data = await request.json()
        logging.info(f"__call__: {len(data)}")
        self.worker.predict.remote(data)
        logging.info(f"__call__: finished")

        return {"result": "ok"}

Indeed! This workload might be well suited for a regular task queue. Ray Serve is for cases where your single requests needs to span multiple processes and latency sensitive.

Yeah, I am interesting in the `pipeline as I actually need to pass the csv file to multiple models and then combine the results into a single output csv which is the main reason why I started looking at ray serve, but maybe I’ll hack something together that works using flask+redis then look at migrating it to ray later if it make sense.