Pipeline DAG: join/aggregate independent steps

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

The final step of my pipeline has two independent steps, which I aggregate with lambda:

import ray

def prepend_a(val):
    return f"a{val}"


def append_b(val):
    return f"{val}b"


def append_c(val):
    return f"{val}c"

data = ray.data.from_items([str(i) for i in range(10)])

pipe = data.window(blocks_per_window=2)
a_appended = pipe.map(prepend_a)
final = a_appended.map(lambda x: (append_b(x), append_c(x)))

for bb, cc in final.iter_rows():
    print(bb, cc)

This makes the append_c wait for append_b. I would like them to be executed independently, then aggregated. Naively, I thought I could do something like:

final_b = a_appended.map(append_b)
final_c = a_appended.map(append_c)

for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
    print(bb, cc)

But this gives me the error:

Traceback (most recent call last):
  File "/home/sean/git/dataset_example.py", line 32, in <module>
    for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
  File "/home/sean/miniconda3/envs/ray_test/lib/python3.9/site-packages/ray/data/dataset_pipeline.py", line 127, in gen_rows
    for ds in self.iter_datasets():
  File "/home/sean/miniconda3/envs/ray_test/lib/python3.9/site-packages/ray/data/dataset_pipeline.py", line 663, in iter_datasets
    raise RuntimeError("Pipeline cannot be read multiple times.")
RuntimeError: Pipeline cannot be read multiple times.

Is it possible to created a fan-out → fan-in structure with Ray Dataset? Alternatively, is there some way to construct a DAG of dependent steps?

@Seanny123 I think there are a few things here to your question:

  • DatasetPipeline’s map() API is destructive read in that it consume the results rather than read-only on the data, so it cannot be read repeatedly. This is the direct reason of the your error.
  • For fan-out: in general you can do fanout with Dataset. For example, from the same Dataset, you map it with 2 UDFs, which will result in 2 new Datasets correspondingly (note: unlike DatasetPipeline, Dataset read is not destructive).
  • For fan-in: we don’t support fan-in (such as join) in general. The only two exceptions (as of writing) are two simple binary operations: union() and zip(). Basically you can ds1.union(ds2) or ds1.zip(ds2), which will create a DAG “lineage” (the steps of transformation that lead to the end Dataset).
  • There is no support for explicit pipeline DAG construction from Ray Dataset.