[Serve] Graph How many requests

I build graph with below architecture:

  1. Preprocessing
  2. Ensemble models running in same time
  3. Combine results from 2 step

I have two questions:
1. Why I recived warning in ray dashboard?

WARNING api.py:485 – You are retrieving a sync handle inside an asyncio loop. Try getting client.get_handle(…, sync=False) to get better performance. Learn more at https://docs.ray.io/en/master/serve/http-servehandle.html#sync-and-async-handles component=serve deployment=DAGDriver replica=DAGDriver#soHdqH

2. How many DAGDriver num_replicas for @server.deployment I have to create for throughput 100 requests per second? One process for request?

Below code for my ray graph

 with InputNode() as dag_input:

            # Bind preprocessing
            preprocessing = Preprocessing.bind(self.config)
            
            # First Preprocessing
            clean_data = preprocessing.preprocess.bind(dag_input)


            # Predict in models
            models = [model.bind() for model in self._get_models()]

            combiner = Combiner.bind(*models, weights=self.models_weights, labels=self.labels)
            #model_response = combiner.run.bind(clean_data)
            model_response = combiner.test.bind(dag_input)

            serve_dag = DAGDriver\
                .options(route_prefix=self.config['endpoint'])\
                .bind(model_response)

@serve.deployment(num_replicas=8, ray_actor_options={"num_cpus": 0.2})
class Combiner:
    '''
    Run multiple models same time in graph
    '''
    def __init__(self, *models, weights, labels):
        self.labels = labels
        self.weights = weights
        self.models = models

    def merge_results(self, results):
        return {'labels': self.labels, 'predictions': np.average(list(zip(*results)), axis=1, weights=self.weights)}

    async def run(self, X):
        refs = []

        for model in self.models:
            refs.append(model.remote(X))

        results = await asyncio.gather(*refs)
        return self.merge_results(results)
class DAGDriver:
    def __init__(self, dag_handle):
        self.dag_handle = dag_handle

    async def predict(self, inp):
        """Perform inference directly without HTTP."""
        return await self.dag_handle.remote(inp)

    def check_health(self):
        pass

    async def __call__(self, request: starlette.requests.Request):
        """HTTP endpoint of the DAG."""
        input_data = await request.json()
        return await self.predict(input_data)

For (2), it’s more likely that the number of replicas for preprocessor, ensemble models, and the combiners affect the throughput more than the number of replicas for the driver. The DAGDriver should be able to handle 100 requests without becoming a bottleneck. The num_replicas needed for your target depends on the deployments’ workload, so you may need to do some manual experimentation.

Hi @Pitos , thanks for using deployment graph feature !

For 1), this is an issue that will be fixed soon. Deployment graph always generate sync serve handle by default for now but it leads to errors like this in asyncio and missed opportunity to optimize perf. We will soon return async handle by default.

  1. Each serve replica on cpu can easily handle a few hundred no-op requests. It depends on your application and compute / io resource needed for each node. It might be the case that DAGDriver is the bottleneck but only if your raw input is demanding. I would suggest finding the most expensive compute / io node in your deployment graph and increase num_replicas for that one first and try it out to see which node is the bottleneck to improve throughput.

Meanwhile, if you have any feedbacks or suggestion for a deployment graph level auto-scaling, we’re more than happy to listen as well :slight_smile: