My application has a Actor class that process incoming data. In order to do so the class itself holds a large amount of parameters that are supplied via the constructor.
The current approach creates one Actor per data package, to process which creates a large number of actors (>500) and therefore lots of overhead.
So my question is: Is their an easy way to have a pool of N actors and auto-distribute the M data packages (M >> N) between these actors?
Hi lemar, this really depends on how you structured data ingestion. Theoretically it shouldn’t be a problem at all; you can just create N actors and pass the data bundle with index i to the actor with index i % N.
Something like this:
N = 50
actors = [Actor.remote() for i in range(N)
for i, m in enumerate(get_data()):
actors[i % N].process.remote(m)
This is a simple example for load balancing. Does this help already or are you looking for something more specific?
thank you for your answer.
I think this should suffice for my application, as the data packages have approximately the same size and are all availabe as soon as i start the application.
Their is still one question i have with your suggestion:
How would i get back the processed data? Can I iterate over all actors and call ray.get() on each of them and get back the results for all data packages send to the actor?
Based on Kais answer i was able to create a minimum running example that suffices my use case. It seems the solution was much easier than i thought originally. Here is the complete code of my solution:
import ray
@ray.remote
class Actor:
def __init__(self, params):
self.params = params
@ray.method(num_returns= 1)
def calculate(self, input):
return self.params[0] * input
if __name__ == '__main__':
ray.init()
# create parameters
params = (10,)
# create example input values
inputData = [cnt*0.1 for cnt in range(1000)]
# create a list of actors
numberOfWorkers = 10
actorList = [Actor.remote(params) for i in range(numberOfWorkers)]
# balance load between actors
actorResults = list()
for i, data in enumerate(inputData):
actorResults.append(actorList[i % numberOfWorkers].calculate.remote(data))
output = [ray.get(result) for result in actorResults]
Hi @lemar94, thanks for sharing your complete solution!
One thing to add, you might want to call
output = ray.get(actorResults)
instead. This will automatically pull all results and return a list. Generally it’s easier to iterate from this (e.g. using ray.wait()) to process results as they come in eventually.
Hey, looks like you already found a solution, but I just wanted to chime in (if only for posterity) that it sounds like the behavior you’re describing matches Ray’s ActorPool. Using Actors — Ray v2.0.0.dev0