Pipeline DAG: join/aggregate independent steps

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

The final step of my pipeline has two independent steps, which I aggregate with lambda:

import ray

def prepend_a(val):
    return f"a{val}"


def append_b(val):
    return f"{val}b"


def append_c(val):
    return f"{val}c"

data = ray.data.from_items([str(i) for i in range(10)])

pipe = data.window(blocks_per_window=2)
a_appended = pipe.map(prepend_a)
final = a_appended.map(lambda x: (append_b(x), append_c(x)))

for bb, cc in final.iter_rows():
    print(bb, cc)

This makes the append_c wait for append_b. I would like them to be executed independently, then aggregated. Naively, I thought I could do something like:

final_b = a_appended.map(append_b)
final_c = a_appended.map(append_c)

for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
    print(bb, cc)

But this gives me the error:

Traceback (most recent call last):
  File "/home/sean/git/dataset_example.py", line 32, in <module>
    for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
  File "/home/sean/miniconda3/envs/ray_test/lib/python3.9/site-packages/ray/data/dataset_pipeline.py", line 127, in gen_rows
    for ds in self.iter_datasets():
  File "/home/sean/miniconda3/envs/ray_test/lib/python3.9/site-packages/ray/data/dataset_pipeline.py", line 663, in iter_datasets
    raise RuntimeError("Pipeline cannot be read multiple times.")
RuntimeError: Pipeline cannot be read multiple times.

Is it possible to created a fan-out → fan-in structure with Ray Dataset? Alternatively, is there some way to construct a DAG of dependent steps?

@Seanny123 I think there are a few things here to your question:

  • DatasetPipeline’s map() API is destructive read in that it consume the results rather than read-only on the data, so it cannot be read repeatedly. This is the direct reason of the your error.
  • For fan-out: in general you can do fanout with Dataset. For example, from the same Dataset, you map it with 2 UDFs, which will result in 2 new Datasets correspondingly (note: unlike DatasetPipeline, Dataset read is not destructive).
  • For fan-in: we don’t support fan-in (such as join) in general. The only two exceptions (as of writing) are two simple binary operations: union() and zip(). Basically you can ds1.union(ds2) or ds1.zip(ds2), which will create a DAG “lineage” (the steps of transformation that lead to the end Dataset).
  • There is no support for explicit pipeline DAG construction from Ray Dataset.

The union() and zip() operators are defined on Dataset objects, so I see how they could be used to perform a fan-in at the beginning of the pipeline. But what about in the middle of the pipeline?

Here is a motivating application. You have two columns img and text representing image-caption pairs. You would like to compute the cosine similarity in some cross-modal representation space (e.g. CLIP).

Ideally this could be done in one pipeline, without fusing the embedding of text and images into the same node in the pipeline. This ideal is depicted in the figure below:

This presumably would require calling a union or zip on DatasetPipeline objects (write before cosine_sim(), which it seems like is not currently possible.

Is something like this possible with Ray Datasets currently? What would be the recommended way to handle this kind of workload.

The union/zip are supported in Dataset but not DatasetPipeline.

I suppose you actually want to use zip(), like cosine of the first row of image with first row of text, and so on. An alternative way is do it like this:

img = ray.data.read_images("...")
txt = ray.data.read_text("...")
ds = img.zip(txt)
embed = ds.map_batches(embedding_func)  # embedding_func computes embeddings for image and text respectively
cosine = embed.add_column("cosine", compute_cosine_func)  # compute_cosine_func takes the image and text vector and computes cosine, and writes result to a new column
....