While making this post, I figured out how I had to defined a DatasetPipeline
:
import ray
def prepend_a(val):
return f"a{val}"
def append_b(val):
return f"{val}b"
def append_c(val):
return f"{val}c"
data = ray.data.from_items([str(i) for i in range(10)])
pipe = data.window(blocks_per_window=2)
a_appended = pipe.map(prepend_a)
final = a_appended.map(lambda x: (append_b(x), append_c(x)))
for bb, cc in final.iter_rows():
print(bb, cc)