Asynchronous dataset pipeline map

As discussed previously, I have a multi-stage forking pipeline:

import ray
import time

def prepend_a(val):
    # somehow only called once!
    print("a", val)
    return f"a{val}"

def append_b(val):
    time.sleep(1)
    return f"{val}b"

def append_c(val):
    time.sleep(1)
    return f"{val}c"


data = ray.data.from_items([str(i) for i in range(10)])

a_appended = data.map(prepend_a)
final_b = a_appended.map(append_b)
final_c = a_appended.map(append_c)

for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
    print(bb, cc)

Although this runs successfully, I would like final_b and final_c to be evaluated simultaneously/asynchronously instead of each statement blocking until it’s finished. How can I separate the setup and the execution of the pipeline?

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

While making this post, I figured out how I had to defined a DatasetPipeline:

import ray

def prepend_a(val):
    return f"a{val}"


def append_b(val):
    return f"{val}b"


def append_c(val):
    return f"{val}c"

data = ray.data.from_items([str(i) for i in range(10)])

pipe = data.window(blocks_per_window=2)
a_appended = pipe.map(prepend_a)
final = a_appended.map(lambda x: (append_b(x), append_c(x)))

for bb, cc in final.iter_rows():
    print(bb, cc)
1 Like