Asynchronous dataset pipeline map

While making this post, I figured out how I had to defined a DatasetPipeline:

import ray

def prepend_a(val):
    return f"a{val}"


def append_b(val):
    return f"{val}b"


def append_c(val):
    return f"{val}c"

data = ray.data.from_items([str(i) for i in range(10)])

pipe = data.window(blocks_per_window=2)
a_appended = pipe.map(prepend_a)
final = a_appended.map(lambda x: (append_b(x), append_c(x)))

for bb, cc in final.iter_rows():
    print(bb, cc)
1 Like