Passing ActorPool to another ActorPool/Workflow

Hello everyone,

I am trying to pass an ActorPool as a reference to another ActorPool (or Workflow) to use Actors from that passed pool as a resource. Problem I’m facing is that passed ActorPool is pickled and sent to run in a remote python process. This means that effectively copies of ActorPools are created and it results in only one Actor from the pool being used.

When I submit to each copy of ActorPool I am mutating the state of each copy, for example to keep track of which actors are in use and which are free. But since I have separate copies they cannot see these mutations.

Here is a simple example of my use-case, all unnecessary logic has been removed and blocking operation of DB read/write has been replaced by sleep. This example is using two ActorPools but same behavior applies when I replace one simple_actor_pool with Ray Workflow.

import ray
import time 

ray.shutdown()
ray.init()

@ray.remote
class SimpleActor():
    def __init__(self, mark):
        self.mark = mark
    
    def work(self, connector_actor_pool):
        print ("Start work: ", self.mark)
        connector_actor_pool.submit(lambda a, v: a.connect.remote(v), {"mark":self.mark})
        connector_actor_pool.get_next()
        print ("Stop work: ", self.mark)
 
@ray.remote
class ConnectorActor():
    def __init__(self, con_id):
        self.con_id = con_id
    
    def connect(self, mark):
        print (f"Connected ConnectorActor: {self.con_id} for SimpleActor {mark}")
        time.sleep (10)
        print (f"Disconnected ConnectorActor: {self.con_id} for SimpleActor {mark}")
        return 1
 
connector_actor_pool = ray.util.ActorPool(
                [ 
                    ConnectorActor.options(num_cpus=1).remote(con_id=_)
                    for _ in range(2)
                ]
            )

simple_actor_pool = ray.util.ActorPool(
                [ 
                    SimpleActor.options(num_cpus=1).remote(mark=_)
                    for _ in range(4)
                ]
            )

for i in range (4):
    simple_actor_pool.submit(lambda a, v: a.work.remote(v), connector_actor_pool)

In the code snippet I am instantiating two ActorPools where one ActorPool (connector_actor_pool , 2 Actors) is being passed as a parameter to Actors from another pool (simple_actor_pool , 4 actors).
simple_actor_pool is behaving as expected, immediately all 4 Actors from that pool are scheduled with tasks.
I would like connector_actor_pool to be scheduling tasks for both Actors from but it seems that only one Actor from connector_actor_pool is being used while other Actor from that pool is never scheduled with any task. As I understand this is due to pickling of connector_actor_pool while it is being passed as an argument to simple_actor_pool.

Here are the logs from previous run:

(SimpleActor pid=26779) Start work:  3
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 3}
(SimpleActor pid=26775) Start work:  1
(SimpleActor pid=26777) Start work:  2
(SimpleActor pid=26774) Start work:  0
(SimpleActor pid=26779) Stop work:  3
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 3}
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 1}
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 1}
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 0}
(SimpleActor pid=26775) Stop work:  1
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 0}
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 2}
(SimpleActor pid=26774) Stop work:  0
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 2}
(SimpleActor pid=26777) Stop work:  2

Only ConnectorActor 1 is running while pool holds 2 ConnectorActors which I would like both to be available for use by simple_actor_pool Actors.

Finally my question: is there a way to pass ActorPool to another ActorPool or Workflow so that all Actors can use the same other ActorPool instead of getting copies of that pool and effectively only using first Actor from passed pool?

I think you can solve the problem by creating another Actor to wrap/manage ConnectorActorPool:

@ray.remote
class ConnectorActorPoolManager:
   def __init__():
      self.connector_actor_pool = ray.util.ActorPool(...)
  
  def submit(work):
     self.connector_actor_pool.submit(work)

@ray.remote
class SimpleActor:
   def work(self, connector_actor_pool_manager):
       connector_actor_pool_manager.submit(xxxx)

Passing actor handle is well supported by Ray. In this case, every SimpleActor has access to the same ConectorActorPool

1 Like

Thank you for your answer.

Wrapped ActorPool in Actor works perfectly, I just had to wrap submit and get_next methods to get full functionality of ActorPool.

Now I can pass ActorPool references to other Actors in pools or Ray Workflow tasks :slight_smile:

1 Like