Ray normal DAG vs Compiled DAG

@RunLLM yes please can you give me an example of how to structure above pipeline to take advantage of GPU-GPU communication?

To enable direct GPU-GPU communication between actors in your pipeline, you need to (1) ensure that the data passed between actors is a torch CUDA tensor (not a CPU tensor or numpy array), and (2) use Ray Compiled Graph with the appropriate tensor transport configuration. Here’s a simplified example for a 3-phase pipeline (preprocessing → inference → postprocessing), where each phase is a Ray actor operating on GPU tensors, and the compiled graph is set up for NCCL-based GPU-GPU transfer:

import ray
import torch
from ray.dag import InputNode

@ray.remote(num_gpus=1)
class PreprocessActor:
    def __init__(self, pt2_path):
        self.model = torch.jit.load(pt2_path).cuda()
        self.model.eval()
    def process(self, batch):
        with torch.inference_mode():
            # Assume batch is a CUDA tensor
            return self.model(batch)

@ray.remote(num_gpus=1)
class InferenceActor:
    def __init__(self, pt2_path):
        self.model = torch.jit.load(pt2_path).cuda()
        self.model.eval()
    def infer(self, batch):
        with torch.inference_mode():
            return self.model(batch)

@ray.remote(num_gpus=1)
class PostprocessActor:
    def __init__(self, pt2_path):
        self.model = torch.jit.load(pt2_path).cuda()
        self.model.eval()
    def post(self, batch):
        with torch.inference_mode():
            return self.model(batch)

# Instantiate actors
pre = PreprocessActor.remote("preprocess.pt2")
inf = InferenceActor.remote("inference.pt2")
post = PostprocessActor.remote("postprocess.pt2")

# Build compiled graph with GPU-GPU transport
with InputNode() as inp:
    # Ensure input is a CUDA tensor
    dag = pre.process.bind(inp)
    dag = dag.with_tensor_transport("nccl")  # Enable NCCL GPU-GPU transfer
    dag = inf.infer.bind(dag)
    dag = dag.with_tensor_transport("nccl")
    dag = post.post.bind(dag)
    dag = dag.with_tensor_transport("nccl")

compiled = dag.experimental_compile()

# Run the pipeline with a CUDA tensor input
input_tensor = torch.randn(32, 10, device="cuda")
result = ray.get(compiled.execute(input_tensor))
print(result)

Key points:

  • Each actor method receives and returns a CUDA tensor.
  • .with_tensor_transport("nccl") tells Ray to use direct GPU-GPU communication (NCCL) between actors.
  • The compiled graph is statically defined and optimized for GPU tensor transfer.

This structure ensures that data never leaves GPU memory between pipeline stages, maximizing efficiency in multi-GPU and multi-node setups.

Sources:

Hint: Mention @RunLLM in the post for followups.