I build graph with below architecture:
- Preprocessing
- Ensemble models running in same time
- Combine results from 2 step
I have two questions:
1. Why I recived warning in ray dashboard?
WARNING api.py:485 – You are retrieving a sync handle inside an asyncio loop. Try getting client.get_handle(…, sync=False) to get better performance. Learn more at https://docs.ray.io/en/master/serve/http-servehandle.html#sync-and-async-handles component=serve deployment=DAGDriver replica=DAGDriver#soHdqH
2. How many DAGDriver num_replicas for @server.deployment I have to create for throughput 100 requests per second? One process for request?
Below code for my ray graph
with InputNode() as dag_input:
# Bind preprocessing
preprocessing = Preprocessing.bind(self.config)
# First Preprocessing
clean_data = preprocessing.preprocess.bind(dag_input)
# Predict in models
models = [model.bind() for model in self._get_models()]
combiner = Combiner.bind(*models, weights=self.models_weights, labels=self.labels)
#model_response = combiner.run.bind(clean_data)
model_response = combiner.test.bind(dag_input)
serve_dag = DAGDriver\
@serve.deployment(num_replicas=8, ray_actor_options={"num_cpus": 0.2})
class Combiner:
Run multiple models same time in graph
def __init__(self, *models, weights, labels):
self.labels = labels
self.weights = weights
self.models = models
def merge_results(self, results):
return {'labels': self.labels, 'predictions': np.average(list(zip(*results)), axis=1, weights=self.weights)}
async def run(self, X):
refs = []
for model in self.models:
results = await asyncio.gather(*refs)
return self.merge_results(results)
class DAGDriver:
def __init__(self, dag_handle):
self.dag_handle = dag_handle
async def predict(self, inp):
"""Perform inference directly without HTTP."""
return await self.dag_handle.remote(inp)
def check_health(self):
async def __call__(self, request: starlette.requests.Request):
"""HTTP endpoint of the DAG."""
input_data = await request.json()
return await self.predict(input_data)