Ray normal DAG vs Compiled DAG

Hello Community,

I am from offline inference platform team where we trigger standard Ray Batch Inference solution as suggested. Typically what we do is

  1. we create a ray dataset with out custom datasource
  2. We invoke map_batches that takes an actor with bunch of init params.
  3. We write data to our custom datasource.

I want to understand how normal Ray DAG would look like in the above flow vs how it would look like with Ray Compiled Graph? what is the main difference in the complied DAG?

In your offline inference flow, a normal Ray DAG (using Ray Data and map_batches with an actor) builds a dynamic execution graph at runtime: each map_batches call creates Ray tasks/actors for each data block, and dependencies are managed dynamically. The DAG is constructed and scheduled as the data flows, with each batch processed independently and overhead incurred per task/actor invocation. This is flexible but can introduce higher system overhead, especially for fine-grained or latency-sensitive workloads. See Ray Data map_batches and Ray DAG API.

With Ray Compiled Graph, the entire computation graph (DAG) is defined statically before execution. The graph is “compiled”—Ray pre-allocates resources, communication channels, and schedules all operations ahead of time. This reduces per-task overhead, enables optimizations like shared memory/NCCL for GPU communication, and can provide sub-millisecond system overhead for each operation. The main difference is that the compiled DAG is static and optimized for repeated, high-throughput, or low-latency execution, while the normal Ray DAG is dynamic and more flexible but less efficient for these use cases. See Compiled Graph documentation PR and example code.

Would you like a step-by-step breakdown or code example of how these DAGs are constructed?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM can you give me detail on what information gets available in the dynamic execution graph at run time, upon receiving the input block vs what information gets available when we do static compilation? How ray uses this information available at static compilation?

In a dynamic execution graph (normal Ray DAG), information about task dependencies, resource requirements, and data flow is only known at runtime—when each input block is processed, Ray schedules tasks/actors on the fly, and dependencies are resolved as tasks are submitted. The system cannot optimize resource allocation or communication ahead of time, so each task incurs scheduling and communication overhead. Lineage and dependency info are kept in memory for fault tolerance, but not used for global optimization or pre-allocation. See Ray Data key concepts and Ray DAG discussion.

With static compilation (Ray Compiled Graph), the entire DAG—including all task dependencies, resource needs, and communication patterns—is known before execution. Ray uses this information to pre-allocate resources, set up communication channels (e.g., shared memory, NCCL for GPU), and optimize scheduling to minimize overhead and avoid deadlocks. This enables optimizations like overlapping compute and communication, and ensures sub-millisecond orchestration. The static knowledge allows Ray to avoid per-task scheduling overhead and to optimize for throughput and latency. See Compiled Graph docs and Compiled Graph quickstart.

Would you like a step-by-step example of how this information is used in practice?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM so this task graph is at pipeline level or within run of a particular batch inference? I have two users both of them giving me different actors and different dataset and as platform I trigger offline inference for them. In both the executions task graph will be same or different?

When you say in dynamic graph execution information about task dependencies, resource requirements, and data flow is only known at runtime, do you mean within one particular run of the batch inference or across all runs?

It seems likely that the compiled task graph in Ray Compiled Graph is defined at the pipeline level—meaning, it is constructed based on the specific actors, methods, and data flow you define before execution. If two users provide different actors or datasets, the compiled graph for each user will be different, as the static graph is built from the actual pipeline structure and resources specified for each run. The graph is not shared or reused across different user pipelines unless their definitions are identical. See Compiled Graph docs and quickstart.

When discussing dynamic graph execution, the information about dependencies, resources, and data flow is determined at runtime for each individual run of batch inference. This means that for every execution, Ray dynamically constructs and schedules the task graph based on the current input, actors, and data, rather than reusing a precompiled structure across runs. So, the runtime graph is specific to each batch inference execution, not shared globally. See Ray DAG discussion.

Would you like a more detailed breakdown or a code example?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM so I am doing batch inference with 1) dataset creation 2) map_batches 3) write data. Suppose my dataset has 1000 rows. Internally it creates multiple Ray Tasks for reading Ray Block. Suppose each block is of 5 rows making it 200 ReadTask. Then for each task it does map_batches which I assume would be creating one more task for inference and finally write would be creating some more tasks. So all these 200 ReadTask + Map batches Task + Writing task builds a Ray Dynamic DAG? I am trying to understand how typical task graph of a batch inference would look like.

In static compiled graph, these tasks are pre-calculated along with its resource requirements and inter dependencies?

Yes, in your batch inference flow, Ray dynamically builds a task graph (DAG) at runtime: 200 ReadTasks (one per block), followed by 200 map_batches tasks (one per block for inference), and then writing tasks. Each stage’s tasks are connected by data dependencies, forming a dynamic execution DAG specific to that run. This graph is constructed as the pipeline executes, and Ray schedules tasks as resources become available. See Ray Data key concepts.

In a static compiled graph, all these tasks, their resource requirements, and dependencies are pre-calculated before execution. Ray uses this static information to optimize scheduling, resource allocation, and communication, reducing overhead and enabling more efficient execution. The entire pipeline’s structure is known in advance, so Ray can pre-allocate resources and set up dependencies for the whole run. See Compiled Graph quickstart.

Would you like a visual or code example of such a task graph?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM thank you for the insights. I want to learn more about how static graph improves thing. First lets take scheduling. As per my understanding (correct me if I wrong) this is a typical scheduling flow.

  1. caller of the task, which is a worker process, chooses a raylet for scheduling. Typical selection of raylet happens based on task argument (data) locality. The node having maximum arguments are the ideal node and raylet of that node is consulated for task execution.
  2. upon receiving resource requirements the raylet chooses the “best node” for the task. The best node selection is based on the resources requirement and data locality again.
  3. if resources are available then raylet grants a lease of resources and task execution is initiated on that node.

So if my understanding is correct, in the above scenario the best node selection always happens during runtime when task is ready for execution. Compare to this how static compiled graph optimizes the scheduling?

Your understanding of dynamic scheduling is correct: in normal Ray, the best node for each task is selected at runtime based on resource requirements and data locality, and resource leases are granted dynamically as tasks are ready to execute. This means scheduling decisions and resource allocations are made on-the-fly for each task, which adds overhead and can lead to suboptimal resource utilization, especially for fine-grained or repetitive workloads. See Ray scheduling docs and Raylet scheduling policy.

With Ray Compiled Graph, the entire task graph—including all dependencies and resource requirements—is known before execution. Ray can pre-calculate the optimal scheduling plan for the whole graph, pre-allocate resources, and set up communication channels in advance. This eliminates per-task scheduling overhead, enables deadlock-free scheduling (especially for GPU/NCCL workloads), and allows for overlapping compute and communication. The static plan ensures that tasks are launched with minimal latency and resources are used efficiently throughout the pipeline. See Compiled Graph docs.

Would you like a step-by-step breakdown of how static scheduling is performed?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM yes please can you help with step by step static scheduling flow?

Certainly! Here’s a step-by-step breakdown of static scheduling in Ray Compiled Graph:

  1. Graph Definition: The user defines the entire computation as a static DAG using the Compiled Graph API (e.g., with InputNode, .bind(), and .experimental_compile()). All tasks, dependencies, and resource requirements are specified up front.
  2. Graph Compilation: When experimental_compile() is called, Ray analyzes the full DAG, determines all task dependencies, and collects resource requirements for each node (e.g., CPU, GPU, memory).
  3. Resource Pre-allocation: Ray pre-allocates resources (CPUs, GPUs, memory) for all tasks in the graph, ensuring that resource contention and scheduling delays are minimized during execution.
  4. Communication Setup: For GPU workloads, Ray sets up communication channels (e.g., NCCL communicators) between actors in advance, enabling direct GPU-to-GPU communication and avoiding runtime setup overhead.
  5. Deadlock-free Scheduling: Ray computes a deadlock-free execution schedule for the entire graph, ordering tasks to avoid resource or communication deadlocks (especially important for multi-GPU workloads).
  6. Optimized Execution: When graph.execute() is called, tasks are launched according to the precomputed schedule, with minimal system overhead (<50us per task), and resources/communication are already in place.

This static approach enables Ray to optimize for throughput, latency, and resource utilization, especially for repeated or high-performance workloads. See Compiled Graph docs and quickstart.

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM can you explain step by step how Communication Setup you mentioned above happens in current dynamic execution graphs vs how it happens in the static compiled graph? You said it enables direct GPU-GPU comm in compiled graph. Does it not do direct GPU-GPU communication in current dynamic execution graph setup?

In current dynamic execution graphs (normal Ray), communication setup for GPU tensors is not optimized for direct GPU-GPU transfer. When a task returns a torch CUDA tensor, Ray serializes it to CPU memory and stores it in the object store; when another task/actor needs it, the tensor is deserialized and copied from CPU to GPU, even if both actors are on GPUs. Direct GPU-GPU communication (e.g., via NCCL) is not natively supported in this flow—users must implement custom logic if they want to use NCCL or CUDA IPC themselves. See Ray discuss: ray.get on torch cuda tensors.

In static compiled graphs, Ray analyzes the entire DAG at compile time, identifies which tensors will be communicated between GPU actors, and sets up NCCL communicators or other accelerator-specific channels in advance. This enables native, zero-copy, direct GPU-to-GPU communication for torch CUDA tensors, bypassing the CPU and object store. The communication schedule is precomputed, and Ray can overlap communication and computation for further efficiency. See Compiled Graph docs and quickstart.

Would you like a code example or a deeper technical breakdown?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM got it. So like I said above, I am doing offline inference where each I am doing an example invocation like below

dataset.map_batches(actor class, num_gpus = 5) . Actor class is simple python class that builds vLLM object and runs inference on it in its call method. So is there actor to actor object passing happening in this case? When does tensor communication happens between actors during the inference workflow?

In your offline inference workflow using dataset.map_batches(actor_class, num_gpus=5), each actor processes its assigned data partition independently—there is no actor-to-actor object passing by default. Each actor receives a block of data, runs inference (e.g., with vLLM), and returns results to Ray, which collects outputs for further processing or writing. Tensor communication between actors does not occur unless your code explicitly passes outputs from one actor to another (e.g., as arguments to remote calls).

Tensor communication (and thus any potential for direct GPU-GPU transfer) only happens if an object (such as a torch.Tensor) produced by one actor is sent as an argument to another actor’s method. In standard batch inference with map_batches, this is not the case—each actor is isolated, and data is only passed between Ray and the actors, not between actors themselves. See Ray Data map_batches and Ray Data key concepts.

Would you like more detail on how to enable actor-to-actor communication or optimize GPU data movement?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM so lets consider a scenario where one model cannot be loaded into a GPU with all of its weight. How does ray offline inference then happen in this case? I am only aware about map_batches which takes a function. Inside function I am loading the model. map_batches takes num_gpus as input. I am trying to understand how this setup work in a scenario where model cannot fit on a single GPU

If your model cannot fit on a single GPU, you need to use model parallelism to shard the model across multiple GPUs. Ray Data’s map_batches with num_gpus only assigns one GPU per actor by default, so simply increasing num_gpus will not automatically shard the model. Instead, you must use a model parallelism framework (like DeepSpeed, FSDP, or vLLM with tensor/pipeline parallelism) inside your actor or function to split the model across available GPUs. See Ray Data batch inference docs and Ray/vLLM parallelism.

For example, with vLLM, you can specify tensor_parallel_size and pipeline_parallel_size to shard the model across multiple GPUs within an actor. For DeepSpeed or FSDP, you would initialize the distributed process group and shard the model accordingly. Ray will schedule the actor on a node with enough GPUs, but you must handle the intra-actor model sharding logic. See vLLM parallelism and Ray Train FSDP2 template.

Would you like a step-by-step example of how to set this up?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM yes please lets go through a step-by-step example of this kind of setup?

Certainly! Here’s a step-by-step example of running offline batch inference with Ray Data and vLLM, where the model is too large for a single GPU and must be sharded across multiple GPUs using tensor and/or pipeline parallelism:

1. Install dependencies:

pip install -U "ray[data, llm]>=2.49.1" vllm==0.7.2

2. Configure vLLM for model parallelism:
You must specify tensor_parallel_size and/or pipeline_parallel_size in the vLLM config. For example, to split across 4 GPUs (2x2):

from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "max_model_len": 16384,
        "tensor_parallel_size": 2,      # 2 GPUs for tensor parallelism
        "pipeline_parallel_size": 2,    # 2 GPUs for pipeline parallelism (total 4 GPUs)
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 2048,
    },
    concurrency=1,
    batch_size=32,
    accelerator_type="L4",  # or your GPU type
)

(Reference)

3. Build the processor and run inference:

import ray

processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a bot that responds with haikus."},
            {"role": "user", "content": row["item"]},
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=250,
        ),
    ),
    postprocess=lambda row: dict(
        answer=row["generated_text"],
        **row,
    ),
)

ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])
ds = processor(ds)
ds.show(limit=1)

(Reference)

4. What happens under the hood:

  • Ray Data launches an actor for each batch, and vLLM inside the actor uses the specified parallelism to shard the model across multiple GPUs on the node.
  • The actor is scheduled on a node with enough GPUs (here, 4 GPUs per actor).
  • vLLM handles the intra-actor communication and model sharding; Ray manages the actor lifecycle and data flow.

5. Notes:

Would you like a more advanced example (e.g., multi-node or with DeepSpeed/FSDP)?

Sources:

Hint: Mention @RunLLM in the post for followups.