How to make a multi-input node in DAG?

The data transmission in Recommender system usually N producer to M consumer.
How can I make this N to M pipe with DAG or WorkFlow? For example, there are hundreds CPUs for dataset reader to thousands CPUs for data preprocessing, and they are supplied to 8 GPU for model training.
And the batch data from preprocessing nodes should be all equal numbers when delivered to each GPU worker. If there would be able to do fault recover, even better.

I think a basic way is to pass in a tuple and partition it inside the DAG and process.

As a side note, for Ray Compiled Graphs, we support passing in multiple-args and kwargs. And they can be accessed with input.arg1 or input.kw in the DAG.

Sorry for misunderstanding you. Not some function arguments. What I meant was different computing workers sending their results into one or more next-computing workers.

I may have misunderstood you… I guess when you refer to multi-input, you meant multi-input for a node, not the DAG.

I don’t think there is a builtin support for N->M mapping. User logic will need to handle that.

Does the following work for you?

    with InputNode() as inp:
        data = [producer.load.bind(inp) for producer in producers]
        output = [consumer.process.bind(data) for consumer in consumers]
        dag = MultiOutputNode(output)

data is a list of size N, and output is a list of size M. If there is “repartitioning” you need and would like to avoid passing all N partitions to consumer.process, I think user logic will need to handle that, say only passing in the relevant partitions.

Well, there are many producers and many consumers in N->M mapping. If object transferred by requesting the centralized Raylet, performance would be terrible. We have tested Redpanda(a kind of Kafka) and Ray object get() with same resource limit, and the throughput of Ray is a quarter of Redpanda.

Ray Compiled Graphs is the latest effort to reduce the overhead by statically allocating resources (e.g., shared memory buffers) and reusing them, rather than reallocating them for each execution. It also supports GPU-GPU communication via NCCL. Sounds like it targets your needs?

Not all the features we need, because statically allocating resources, NCCL and other features have been realized in Alpa two years ago.
The problem of Alpa is that it can only support one-to-one data transfer between each devices mesh. For example, if A mesh has 8 GPUs and B mesh has 4 GPUs, only one piece of data can be transferred from A mesh master to B mesh master.
But now a feature needed is that one piece of data can be transferred from two of the GPUs in A mesh to one of the GPUs in B mesh.

There are other drawbacks to Alpa. For example, there is no status recording mechanism for fault recovery.

In addition, Alpa needs to call long running actors of different devices mesh through the Ray driver and Raylet in each running loop, which is a bottleneck for latency-sensitive or high-throughput services.
We obtained similar performance test results as the Google Pathway paper. Ray showed more obvious performance degradation for simple instruction execution in huge number of nodes.
It would be great if Ray Compiled Graph could run without the need for centralized nodes when running the graph loops.

Note: The picture shows what Alpa can do. And it used CUDA python interface for buffer reuse and NCCL transfer:

Alpa presentation: High-Performance LLM Training at 1000 GPU Scale With Alpa & Ray

Btw, for the answer of this question, you can use ray data + ray train.

you can use ray data for read/preprocess (Quickstart — Ray 3.0.0.dev0). it supports autoscaling. After that, ray train supports spliting dataset to N workers and ingest Data Loading and Preprocessing — Ray 3.0.0.dev0

Yes, for M CPU <> N GPU patterns, where M is elastic and/or you want the data pipeline to be asynchronous with the GPU tasks, you should use Ray Data. Ray Data may eventually be integrated with Compiled Graphs but this is not on the roadmap yet. Ray Data will handle task scheduling, fault tolerance, etc.

For cases where M is static and the CPU tasks execute synchronously with the GPU tasks (e.g., offloading), but M != N, you can use shuffle the DAG nodes between downstream tasks. Like this:

@ray.remote
class CPUActor:
  def get_data_shard(i):
    return ...

@ray.remote
class GPUActor:
  # Use the * syntax to take in variable number of shards.
  def consume_data_shards(*shards):
    return ...

M = 8
N = 4
cpu_actors = [CPUActor.remote() for _ in range(M)]
gpu_actors = [GPUActor.remote() for _ in range(N)]
with InputNode() as inp:
  data_shards = [cpu_actor.get_data_shard.bind(inp) for cpu_actor in cpu_actors]
  dag = MultiOutputNode([gpu_actor.consume_data_shards.bind(
      *data_shards[i * M/N:(i+1) * M/N]
    ) for i, gpu_actor in enumerate(gpu_actors)])

If M < N, you can use multiple task returns to produce multiple shards per CPU task, e.g., cpu_actor.get_data_shard.options(num_returns=N/M).bind(...).

Ray Data may be good. But I’m not sure is there any unnecessary overhead with it? get_data_shard function seems like a pulling function. I mean does consume_data_shards need to wait distributed data transferred from cpu_actor every time it was triggered?

By the way, Ray Data has a fatal flaw: it does not have the ability to checkpoint data of reading progress. It assumes that one training can normally read all samples without considering fault recovery. In fact, when we do a lot of training, especially LLM training, we almost have a breakdown every week in large-scale GPU cluster.

I think the checkpoint and reproducibility of dataset is in the roadmap. I don’t know when it is planned to be released.

for consumption, ray data is a streaming-based solution, and it can support prefetching (prefetching next batches while running training epoch)