I have built a multi-stage pipeline using Ray. The first stage takes a single input from the calling process and generates N inputs. The next stage consumes those N inputs and returns their results to the original calling process.
I have implemented this using Ray Actors and Queues. Here is a toy example with trivial computation:
import ray
from ray.util.queue import Queue
num_queue = Queue(maxsize=100)
str_queue = Queue(maxsize=100)
out_queue = Queue(maxsize=100)
@ray.remote
def append_a(get_queue: Queue, put_queue: Queue):
while num := get_queue.get(block=True):
print(f"got work {num}")
put_queue.put(f"{num}a")
@ray.remote
def append_b(get_queue: Queue, put_queue: Queue):
while num_str := get_queue.get(block=True):
print(f"got work {num_str}")
for i in range(3):
put_queue.put(f"{num_str}{i}b")
# create two workers for each stage
for _ in range(2):
append_a.remote(num_queue, str_queue)
append_b.remote(str_queue, out_queue)
# submit to queue
for i in range(10):
num_queue.put(str(i))
# retrieve results
for i in range(10*3):
print(out_queue.get())
Using Queues and Actors does work, but if feels fragile/awkward:
- Queue capacity and actor state are not easily monitorable via the Ray dashboard.
- Manually creating actors prevents Ray from auto-scaling in response to queue pressure.
- If an actor fails, it’s difficult for the original calling process to respond accordingly. Instead, my original calling process can get stuck waiting at
out_queue.get()
Is there some other way to accomplish persistent actor/consumer which automatically scales? Is this the use-case for Ray Workflows, which is still in alpha? Should I be using Ray in combination with AirFlow instead?