As discussed previously, I have a multi-stage forking pipeline:
import ray
import time
def prepend_a(val):
# somehow only called once!
print("a", val)
return f"a{val}"
def append_b(val):
time.sleep(1)
return f"{val}b"
def append_c(val):
time.sleep(1)
return f"{val}c"
data = ray.data.from_items([str(i) for i in range(10)])
a_appended = data.map(prepend_a)
final_b = a_appended.map(append_b)
final_c = a_appended.map(append_c)
for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
print(bb, cc)
Although this runs successfully, I would like final_b
and final_c
to be evaluated simultaneously/asynchronously instead of each statement blocking until it’s finished. How can I separate the setup and the execution of the pipeline?
How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.