1. Severity of the issue: (select one)
Medium: Significantly affects my productivity but can find a workaround.
2. Environment:
Ray version: 2.44.1
Python version: 3.9.9
OS: Linux 5.10.0-60.18.0.50.r509_2.hce2.aarch6
3. What happened vs. what you expected:
Phenomenon: Multi-model composition with Ray Serve deployment graphs | Anyscale.
I tried to refer to the article above to write two simple serve.delpoyment implementations for DAG connections. It does not work when the decoration is “serve.delpoyment”, but it works when the decoration is “ray.remote” and replace ”DAGDriver” by “final_result.execute()“.
import ray
from ray import serve
from ray.serve.dag import InputNode
ray.init()
@serve.deployment(num_replicas=1, name="Preprocess", ray_actor_options={"num_cpus": 0.3})
class Preprocess:
def __init__(self):
pass
def __call__(self, input_data):
return {"processed_value": input_data["raw_value"] * 2}
@serve.deployment(num_replicas=1, name="Preprocess", ray_actor_options={"num_cpus": 0.3})
class Inference:
def __init__(self):
pass
def __call__(self, preprocessed_data):
return {"final_result": preprocessed_data["processed_value"] + 10}
preprocess_node = Preprocess.bind()
inference_node = Inference.bind()
with InputNode() as dag_input:
preprocessed_result = preprocess_node.__call__(dag_input)
final_result = inference_node.__call__(preprocessed_result)
try:
from ray.serve.dag import DAGDriver
except ImportError:
from ray.serve.drivers import DAGDriver
dag_service = DAGDriver.bind(final_result)
serve.run(
dag_service,
name="simple_chain_dag",
route_prefix="/inference",
port=8000
)
My Confusion: It seems the DAG is redeveloped at new Ray version. What time could support the DAG of serve.deploymeny? Or is it my mistake operation to make it not work?
Your code is almost correct, but there are a few issues:
The decorator should be @serve.deployment, not @serve.delpoyment (typo).
When using Ray Serve’s DAG API, you must use .bind() to connect deployments, not direct calls like preprocess_node.__call__(dag_input). Instead, use preprocess_node.bind(dag_input) and inference_node.bind(preprocessed_result).
import ray
from ray import serve
from ray.serve.dag import InputNode
from ray.serve.drivers import DAGDriver
ray.init()
@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Preprocess:
def __call__(self, input_data):
return {"processed_value": input_data["raw_value"] * 2}
@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Inference:
def __call__(self, preprocessed_data):
return {"final_result": preprocessed_data["processed_value"] + 10}
preprocess_node = Preprocess.bind()
inference_node = Inference.bind()
with InputNode() as dag_input:
preprocessed_result = preprocess_node.bind(dag_input)
final_result = inference_node.bind(preprocessed_result)
dag_service = DAGDriver.bind(final_result)
serve.run(
dag_service,
name="simple_chain_dag",
route_prefix="/inference",
port=8000
)
The DAG API for Serve deployments is supported in Ray 2.x, and the correct usage is to connect deployments with .bind() as above. If you use ray.remote and .execute(), you are using Ray Core DAGs, not Serve DAGs, which is a different abstraction. For more, see Ray Serve model composition docs.
Would you like a step-by-step explanation of why .bind() is required and how the Serve DAG API works?
Thanks for your reply. I try to run your code. But it occurs this error:
Traceback (most recent call last):
File ".../dag_test.py", line 4, in <module>
from ray.serve.drivers import DAGDriver
ModuleNotFoundError: No module named 'ray.serve.drivers'
I install command is “pip3 install ray[serve]==2.44.1“. Another problem is that when I change “_call_” into “process“, it occurs this error:
import ray
from ray import serve
from ray.serve.dag import InputNode
# from ray.serve.drivers import DAGDriver
ray.init()
@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Preprocess:
def __call__(self, input_data):
return {"processed_value": input_data["raw_value"] * 2}
@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Inference:
def process(self, preprocessed_data):
return {"final_result": preprocessed_data["processed_value"] + 10}
preprocess_node = Preprocess.bind()
inference_node = Inference.bind()
with InputNode() as dag_input:
preprocessed_result = preprocess_node.process.bind(dag_input)
final_result = inference_node.process.bind(preprocessed_result)
dag_service = DAGDriver.bind(final_result)
serve.run(
dag_service,
name="simple_chain_dag",
route_prefix="/inference",
port=8000
)
Traceback (most recent call last):
File "/home/chenzhaojie/ray_code/dag_test.py", line 22, in <module>
preprocessed_result = preprocess_node.process.bind(dag_input)
AttributeError: 'Application' object has no attribute 'process'
Only find DAGDriverProxyActor in above files. It is my first time to use DAG, but I know @serve.deployment. At my project, I have several @serve.deployment. Like this:
@serve.deployment
class Preprocess:
def __init__(self):
pass
def process_image(self, images):
# do something preprocess
return images
@serve.deployment
class Inference:
def __init__(self):
pass
def process_image(self, images):
# do something inference
return images
@serve.deployment
class Postprocess:
def __init__(self):
pass
def process_image(self, images):
# do something postprocess
return images
@serve.deployment
class EventManager:
def __init__(self):
pass
def alert_check(self, images):
# check whether occurring target object or not
send message to specific webhook
@serve.deployment
class ModelService:
def __init__(self, preprocess, infer, postprocess, event_manager):
self.preprocess = preprocess
self.infer = infer
self.postprocess = postprocess
self.event_manager = event_manager
async def process(self, images):
preprocess_ref = self.preprocess.process_image.remote(images)
infer_ref = self.infer.process_image.remote(preprocess_ref)
postprocess_ref = self.postprocess.process_image.remote(infer_ref)
self.event_manager.alert_check.remote(postprocess_ref)
preprocess_app = Preprocess.bind()
inference_app = Inference.bind()
postprocess_app = Postprocess.bind()
eventmanager_app = EventManager.bind()
model_service_app = ModelService.bind(preprocess_app, inference_app, postprocess_app, eventmanager_app)
model_service_handle = serve.run(model_service_app, name="ModelService", route_prefix=None)
# do subsequent event
for images in decode(video):
model_service_handle.process.remote(images)
Above code is my brief method to call different actors in the ModelService. But when the input is the video by extracting 5 frame each seconds, I found it is to slow (I am sure that my code have been optimized). Hence, I try to find one that can replace the ModelService to call difference actors, as I support the the actor of ModelService exists scheduling cap (like existing the max size of waiting queue/processing queue).
Putting aside DAG, could you give me some advice about scheduling cap?