Divide Work between Actors

My application has a Actor class that process incoming data. In order to do so the class itself holds a large amount of parameters that are supplied via the constructor.
The current approach creates one Actor per data package, to process which creates a large number of actors (>500) and therefore lots of overhead.

So my question is: Is their an easy way to have a pool of N actors and auto-distribute the M data packages (M >> N) between these actors?

Thank you in advance for your answers.

Hi lemar, this really depends on how you structured data ingestion. Theoretically it shouldn’t be a problem at all; you can just create N actors and pass the data bundle with index i to the actor with index i % N.

Something like this:

N = 50
actors = [Actor.remote() for i in range(N)

for i, m in enumerate(get_data()):
    actors[i % N].process.remote(m)

This is a simple example for load balancing. Does this help already or are you looking for something more specific?

Hi kai,

thank you for your answer.
I think this should suffice for my application, as the data packages have approximately the same size and are all availabe as soon as i start the application.
Their is still one question i have with your suggestion:
How would i get back the processed data? Can I iterate over all actors and call ray.get() on each of them and get back the results for all data packages send to the actor?

Based on Kais answer i was able to create a minimum running example that suffices my use case. It seems the solution was much easier than i thought originally. Here is the complete code of my solution:

import ray

@ray.remote
class Actor:
    def __init__(self, params):
        self.params = params

    @ray.method(num_returns= 1)
    def calculate(self, input):
        return self.params[0] * input

if __name__ == '__main__':
    ray.init()
    # create parameters
    params = (10,)
    # create example input values
    inputData = [cnt*0.1 for cnt in range(1000)]

    # create a list of actors
    numberOfWorkers = 10
    actorList = [Actor.remote(params) for i in range(numberOfWorkers)]

    # balance load between actors
    actorResults = list()
    for i, data in enumerate(inputData):
        actorResults.append(actorList[i % numberOfWorkers].calculate.remote(data))
    output = [ray.get(result) for result in actorResults]

Hi @lemar94, thanks for sharing your complete solution!

One thing to add, you might want to call

output = ray.get(actorResults)

instead. This will automatically pull all results and return a list. Generally it’s easier to iterate from this (e.g. using ray.wait()) to process results as they come in eventually.

Hey, looks like you already found a solution, but I just wanted to chime in (if only for posterity) that it sounds like the behavior you’re describing matches Ray’s ActorPool. Using Actors — Ray v2.0.0.dev0

1 Like