Hi all,
I have a set of stateful workers implemented as Ray actors. I’d like to distribute the load evenly across these actors; otherwise, my slower machines become bottlenecks while the faster ones sit idle. Essentially, I need work stealing.
I tried using ray.util.ActorPool, but it maintains its state locally. As a result, it cannot properly track load when there are multiple remote callers, since each caller ends up with its own ActorPool state.
My setup looks roughly like this:
@ray.remote
class EngineActor:
def __init__(self, config):
self.engine = Engine(config)
def run(self, arg):
return self.engine.run(arg)
# setup
config = { ... }
num_workers = int(ray.cluster_resources()["CPU"])
actors = ray.util.ActorPool([
EngineActor.options(max_concurrency=1, num_cpus=1).remote(config)
for _ in range(num_workers)
])
And later usage is:
@ray.remote
def process_batch(actors: ActorPool, batch: list):
return actors.map(lambda actor, job: actor.run.remote(job), batch)
Because process_batch itself is a remote function, the ActorPool state is not shared or synchronized, which leads to incorrect load balancing. Unfortunately, my case cannot be restructured to use a single central coordinator.
If this were a single-process asyncio setup, I would simply use an asyncio.Queue, where each job is something like a tuple[asyncio.Future, Any], and workers would resolve the Future with the processed result. However, I haven’t been able to find anything equivalent to a lightweight asyncio.Future in Ray.
What is the proper way to implement work stealing between Ray actors in this scenario?
Thank you.