Work-stealing for multiple actors?

Hi all,

I have a set of stateful workers implemented as Ray actors. I’d like to distribute the load evenly across these actors; otherwise, my slower machines become bottlenecks while the faster ones sit idle. Essentially, I need work stealing.

I tried using ray.util.ActorPool, but it maintains its state locally. As a result, it cannot properly track load when there are multiple remote callers, since each caller ends up with its own ActorPool state.

My setup looks roughly like this:

@ray.remote
class EngineActor:
    def __init__(self, config):
        self.engine = Engine(config)

    def run(self, arg):
        return self.engine.run(arg)

# setup

config = { ... }
num_workers = int(ray.cluster_resources()["CPU"])
actors = ray.util.ActorPool([
    EngineActor.options(max_concurrency=1, num_cpus=1).remote(config)
    for _ in range(num_workers)
])

And later usage is:

@ray.remote
def process_batch(actors: ActorPool, batch: list):
    return actors.map(lambda actor, job: actor.run.remote(job), batch)

Because process_batch itself is a remote function, the ActorPool state is not shared or synchronized, which leads to incorrect load balancing. Unfortunately, my case cannot be restructured to use a single central coordinator.

If this were a single-process asyncio setup, I would simply use an asyncio.Queue, where each job is something like a tuple[asyncio.Future, Any], and workers would resolve the Future with the processed result. However, I haven’t been able to find anything equivalent to a lightweight asyncio.Future in Ray.

What is the proper way to implement work stealing between Ray actors in this scenario?

Thank you.

Ray’s built-in ActorPool does not support distributed work stealing or shared state across multiple remote callers; each ActorPool instance maintains its own local state, so it cannot coordinate load or enable true work stealing in a distributed setting. There is no built-in Ray equivalent to asyncio.Future for lightweight, distributed job tracking and resolution between actors, and Ray Queues are not designed for work stealing or actor load tracking either. According to this discussion, the only way to share an ActorPool is to wrap it in a dedicated manager actor, but this still centralizes coordination and does not provide distributed work stealing.

If you cannot use a single coordinator, you will need to implement a custom solution, such as a shared Ray Queue or a custom manager actor that tracks job assignments and actor load, but this is not natively supported for work stealing. Ray Datasets or Ray Data Pipelines may offer better built-in support for distributed load balancing and backpressure, as suggested in this thread. Would you like more detail on how to implement such a custom manager or alternative patterns?

Sources:

Hint: Mention @RunLLM in the post for followups.

For now, I did this:

@ray.remote
class Dispatcher:
    def __init__(self, actors: list):
        self.actors = actors
        self._free = actors[:]
        self._queue = asyncio.Queue()
        self._cond = asyncio.Condition()
        self._tasks = [
            asyncio.create_task(self._worker())
            for _ in range(len(actors))
        ]

    async def _worker(self) -> None:
        while True:
            f, task = await self._queue.get()
            async with self._cond:
                while not self._free:
                    await self._cond.wait()
                actor = self._free.pop()

            try:
                result = actor.eval.remote(task)
                f.set_result(await result)
            except Exception as e:
                f.set_exception(e)

            async with self._cond:
                self._free.append(actor)
                self._cond.notify()

    async def eval(self, batch: list) -> list:
        futures = []
        loop = asyncio.get_event_loop()
        for task in batch:
            f = loop.create_future()
            self._queue.put_nowait((f, task))
            futures.append(f)

        return await asyncio.gather(*futures)

    # must be called manually after the computation is over
    async def stop(self) -> None:
        for t in self._tasks:
            if not t.done():
                t.cancel()
        self._queue.shutdown(True)


Scheduled as:

    num_workers = int(ray.cluster_resources()['CPU'])
    dispatcher = Dispatcher.options(
        num_cpus=0,
        max_concurrency=num_workers * 10,
        scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
            node_id=ray.get_runtime_context().get_node_id(),
            soft=False
    )).remote([ # actual actors doing the work
        EvalActor.options(max_concurrency=1, num_cpus=1).remote(ref)
        for _ in range(num_workers)
    ])

# and later

@ray.remote(num_cpus=0)
def process_batch(dispatcher, batch):
    return ray.get(dispatcher.eval.remote(batch))

Quite a few issues:

  1. Requires manual worker failure handling (missing)
  2. Requires manual scheduling when cluster changes (part/join, missing)
  3. num_cpus=0 hacks
  4. Double serialize/deserialize as there’s no async ray.wait fetch_local=False counterpart and we move around the task twice
  5. Graceful shutdown could be better

Its more of a hack rather than a solution.