How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
The final step of my pipeline has two independent steps, which I aggregate with lambda
:
import ray
def prepend_a(val):
return f"a{val}"
def append_b(val):
return f"{val}b"
def append_c(val):
return f"{val}c"
data = ray.data.from_items([str(i) for i in range(10)])
pipe = data.window(blocks_per_window=2)
a_appended = pipe.map(prepend_a)
final = a_appended.map(lambda x: (append_b(x), append_c(x)))
for bb, cc in final.iter_rows():
print(bb, cc)
This makes the append_c
wait for append_b
. I would like them to be executed independently, then aggregated. Naively, I thought I could do something like:
final_b = a_appended.map(append_b)
final_c = a_appended.map(append_c)
for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
print(bb, cc)
But this gives me the error:
Traceback (most recent call last):
File "/home/sean/git/dataset_example.py", line 32, in <module>
for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
File "/home/sean/miniconda3/envs/ray_test/lib/python3.9/site-packages/ray/data/dataset_pipeline.py", line 127, in gen_rows
for ds in self.iter_datasets():
File "/home/sean/miniconda3/envs/ray_test/lib/python3.9/site-packages/ray/data/dataset_pipeline.py", line 663, in iter_datasets
raise RuntimeError("Pipeline cannot be read multiple times.")
RuntimeError: Pipeline cannot be read multiple times.
Is it possible to created a fan-out → fan-in structure with Ray Dataset? Alternatively, is there some way to construct a DAG of dependent steps?