Multi-stage fanning pipeline using Ray: Queues + Actors vs. Workflows

I have built a multi-stage pipeline using Ray. The first stage takes a single input from the calling process and generates N inputs. The next stage consumes those N inputs and returns their results to the original calling process.

I have implemented this using Ray Actors and Queues. Here is a toy example with trivial computation:

import ray
from ray.util.queue import Queue

num_queue = Queue(maxsize=100)
str_queue = Queue(maxsize=100)
out_queue = Queue(maxsize=100)


@ray.remote
def append_a(get_queue: Queue, put_queue: Queue):
    while num := get_queue.get(block=True):
        print(f"got work {num}")
        put_queue.put(f"{num}a")


@ray.remote
def append_b(get_queue: Queue, put_queue: Queue):
    while num_str := get_queue.get(block=True):
        print(f"got work {num_str}")
        for i in range(3):
            put_queue.put(f"{num_str}{i}b")


# create two workers for each stage
for _ in range(2):
    append_a.remote(num_queue, str_queue)
    append_b.remote(str_queue, out_queue)


# submit to queue
for i in range(10):
    num_queue.put(str(i))


# retrieve results
for i in range(10*3):
    print(out_queue.get())

Using Queues and Actors does work, but if feels fragile/awkward:

  1. Queue capacity and actor state are not easily monitorable via the Ray dashboard.
  2. Manually creating actors prevents Ray from auto-scaling in response to queue pressure.
  3. If an actor fails, it’s difficult for the original calling process to respond accordingly. Instead, my original calling process can get stuck waiting at out_queue.get()

Is there some other way to accomplish persistent actor/consumer which automatically scales? Is this the use-case for Ray Workflows, which is still in alpha? Should I be using Ray in combination with AirFlow instead?

cc @yic can you answer this question?

For this kind of streaming topology, you can try Dataset Pipelines — Ray v1.9.2

It would look something like this

def source():
    for i in range(100):
        yield ray.data.from_items(["input", "items", "for", "batch"])

pipe = DatasetPipeline.from_iterable(source) \
    .map(append_a) \
    .map(append_b) \

for output in pipe.iter_rows():
    print(output)

It even handles more complicated pipelines!

import ray

def prepend_a(val):
    # somehow only called once!
    print("a", val)
    return f"a{val}"

def append_b(val):
    return f"{val}b"

def append_c(val):
    return f"{val}c"


data = ray.data.from_items([str(i) for i in range(10)])

a_appended = data.map(prepend_a)
final_b = a_appended.map(append_b)
final_c = a_appended.map(append_c)

for bb, cc in zip(final_b.iter_rows(), final_c.iter_rows()):
    print(bb, cc)