About Ray DAG API for serve.deployment at Ray 2.44.1

1. Severity of the issue: (select one)
Medium: Significantly affects my productivity but can find a workaround.

2. Environment:

  • Ray version: 2.44.1
  • Python version: 3.9.9
  • OS: Linux 5.10.0-60.18.0.50.r509_2.hce2.aarch6

3. What happened vs. what you expected:

  • Phenomenon: Multi-model composition with Ray Serve deployment graphs | Anyscale.
    I tried to refer to the article above to write two simple serve.delpoyment implementations for DAG connections. It does not work when the decoration is “serve.delpoyment”, but it works when the decoration is “ray.remote” and replace ”DAGDriver” by “final_result.execute()“.
import ray
from ray import serve
from ray.serve.dag import InputNode

ray.init()

@serve.deployment(num_replicas=1, name="Preprocess", ray_actor_options={"num_cpus": 0.3})
class Preprocess:
    def __init__(self):
        pass
    def __call__(self, input_data):
        return {"processed_value": input_data["raw_value"] * 2}

@serve.deployment(num_replicas=1, name="Preprocess", ray_actor_options={"num_cpus": 0.3})
class Inference:
    def __init__(self):
        pass
    def __call__(self, preprocessed_data):
        return {"final_result": preprocessed_data["processed_value"] + 10}


preprocess_node = Preprocess.bind()  
inference_node = Inference.bind()  

with InputNode() as dag_input:
    preprocessed_result = preprocess_node.__call__(dag_input)
  
    final_result = inference_node.__call__(preprocessed_result) 

try:
    from ray.serve.dag import DAGDriver
except ImportError:
    from ray.serve.drivers import DAGDriver

dag_service = DAGDriver.bind(final_result)


serve.run(
    dag_service,
    name="simple_chain_dag",  
    route_prefix="/inference",  
    port=8000  
)

  • My Confusion: It seems the DAG is redeveloped at new Ray version. What time could support the DAG of serve.deploymeny? Or is it my mistake operation to make it not work?

Your code is almost correct, but there are a few issues:

  1. The decorator should be @serve.deployment, not @serve.delpoyment (typo).
  2. When using Ray Serve’s DAG API, you must use .bind() to connect deployments, not direct calls like preprocess_node.__call__(dag_input). Instead, use preprocess_node.bind(dag_input) and inference_node.bind(preprocessed_result).
  3. The DAG API for Serve deployments is supported and works as shown in the official documentation and blog post.

Here is a corrected minimal example:

import ray
from ray import serve
from ray.serve.dag import InputNode
from ray.serve.drivers import DAGDriver

ray.init()

@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Preprocess:
    def __call__(self, input_data):
        return {"processed_value": input_data["raw_value"] * 2}

@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Inference:
    def __call__(self, preprocessed_data):
        return {"final_result": preprocessed_data["processed_value"] + 10}

preprocess_node = Preprocess.bind()
inference_node = Inference.bind()

with InputNode() as dag_input:
    preprocessed_result = preprocess_node.bind(dag_input)
    final_result = inference_node.bind(preprocessed_result)

dag_service = DAGDriver.bind(final_result)

serve.run(
    dag_service,
    name="simple_chain_dag",
    route_prefix="/inference",
    port=8000
)

The DAG API for Serve deployments is supported in Ray 2.x, and the correct usage is to connect deployments with .bind() as above. If you use ray.remote and .execute(), you are using Ray Core DAGs, not Serve DAGs, which is a different abstraction. For more, see Ray Serve model composition docs.

Would you like a step-by-step explanation of why .bind() is required and how the Serve DAG API works?

Sources:

Hint: Mention @RunLLM in the post for followups.

Thanks for your reply. I try to run your code. But it occurs this error:

Traceback (most recent call last):
  File ".../dag_test.py", line 4, in <module>
      from ray.serve.drivers import DAGDriver
ModuleNotFoundError: No module named 'ray.serve.drivers'

I install command is “pip3 install ray[serve]==2.44.1“. Another problem is that when I change “_call_” into “process“, it occurs this error:

import ray
from ray import serve
from ray.serve.dag import InputNode
# from ray.serve.drivers import DAGDriver

ray.init()

@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Preprocess:
    def __call__(self, input_data):
        return {"processed_value": input_data["raw_value"] * 2}

@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.3})
class Inference:
    def process(self, preprocessed_data):
        return {"final_result": preprocessed_data["processed_value"] + 10}

preprocess_node = Preprocess.bind()
inference_node = Inference.bind()

with InputNode() as dag_input:
    preprocessed_result = preprocess_node.process.bind(dag_input)
    final_result = inference_node.process.bind(preprocessed_result)

dag_service = DAGDriver.bind(final_result)

serve.run(
    dag_service,
    name="simple_chain_dag",
    route_prefix="/inference",
    port=8000
)
Traceback (most recent call last):
  File "/home/chenzhaojie/ray_code/dag_test.py", line 22, in <module>
    preprocessed_result = preprocess_node.process.bind(dag_input)
AttributeError: 'Application' object has no attribute 'process'

Hello! For the import error, can you try importing DAGDriver from ray.serve.dag, not from ray.serve.drivers?

For calling a custom method (like process), you need to do that inside another deployment using a deployment handle, not in the DAG graph itself.

If you take a peek at this example in the ray serve github repo: Ray Serve DAG API example (basic_dag.py), the relevant lines are:

  • lines 1–19: Define deployments using @serve.deployment and implement the callable interface (__call__)

  • lines 22–25: Use .bind() on the deployment itself (e.g., FNode = f.bind()), not on a method.

  • lines 26: Compose the DAG by binding the output of one deployment to another (e.g., DagNode = BasicDriver.bind(FNode))

Can you try using .bind() on the deployment class, not on its methods and maybe that will make the 2nd error go away.

Thanks for your reply. I found the DAGDriver under /usr/local/lib64 that there is no DAGDriver:

/usr/local/lib64/python3.9/site-packages/ray/_private/ray_experimental_perf.py
/usr/local/lib64/python3.9/site-packages/ray/dag/compiled_dag_node.py
/usr/local/lib64/python3.9/site-packages/ray/experimental/channel/shared_memory_channel.py
/usr/local/lib64/python3.9/site-packages/ray/experimental/channel/torch_tensor_nccl_channel.py

Only find DAGDriverProxyActor in above files. It is my first time to use DAG, but I know @serve.deployment. At my project, I have several @serve.deployment. Like this:

@serve.deployment
class Preprocess:
    def __init__(self):
        pass
    def process_image(self, images):
        # do something preprocess
        return images

@serve.deployment
class Inference:
    def __init__(self):
        pass
    def process_image(self, images):
        # do something inference
        return images

@serve.deployment
class Postprocess:
    def __init__(self):
        pass
    def process_image(self, images):
        # do something postprocess
        return images

@serve.deployment
class EventManager:
    def __init__(self):
        pass
    def alert_check(self, images):
        # check whether occurring target object or not
        send message to specific webhook

@serve.deployment
class ModelService:
    def __init__(self, preprocess, infer, postprocess, event_manager):
        self.preprocess = preprocess
        self.infer = infer
        self.postprocess = postprocess
        self.event_manager = event_manager
    async def process(self, images):
        preprocess_ref = self.preprocess.process_image.remote(images)
        infer_ref = self.infer.process_image.remote(preprocess_ref)
        postprocess_ref = self.postprocess.process_image.remote(infer_ref)
        self.event_manager.alert_check.remote(postprocess_ref)


preprocess_app = Preprocess.bind()
inference_app = Inference.bind()
postprocess_app = Postprocess.bind()
eventmanager_app = EventManager.bind()

model_service_app = ModelService.bind(preprocess_app, inference_app, postprocess_app, eventmanager_app)
model_service_handle = serve.run(model_service_app, name="ModelService", route_prefix=None)

# do subsequent event
for images in decode(video):
    model_service_handle.process.remote(images)

Above code is my brief method to call different actors in the ModelService. But when the input is the video by extracting 5 frame each seconds, I found it is to slow (I am sure that my code have been optimized). Hence, I try to find one that can replace the ModelService to call difference actors, as I support the the actor of ModelService exists scheduling cap (like existing the max size of waiting queue/processing queue).

Putting aside DAG, could you give me some advice about scheduling cap?