Hello everyone,
I am trying to pass an ActorPool as a reference to another ActorPool (or Workflow) to use Actors from that passed pool as a resource. Problem I’m facing is that passed ActorPool is pickled and sent to run in a remote python process. This means that effectively copies of ActorPools are created and it results in only one Actor from the pool being used.
When I submit to each copy of ActorPool I am mutating the state of each copy, for example to keep track of which actors are in use and which are free. But since I have separate copies they cannot see these mutations.
Here is a simple example of my use-case, all unnecessary logic has been removed and blocking operation of DB read/write has been replaced by sleep. This example is using two ActorPools but same behavior applies when I replace one simple_actor_pool
with Ray Workflow.
import ray
import time
ray.shutdown()
ray.init()
@ray.remote
class SimpleActor():
def __init__(self, mark):
self.mark = mark
def work(self, connector_actor_pool):
print ("Start work: ", self.mark)
connector_actor_pool.submit(lambda a, v: a.connect.remote(v), {"mark":self.mark})
connector_actor_pool.get_next()
print ("Stop work: ", self.mark)
@ray.remote
class ConnectorActor():
def __init__(self, con_id):
self.con_id = con_id
def connect(self, mark):
print (f"Connected ConnectorActor: {self.con_id} for SimpleActor {mark}")
time.sleep (10)
print (f"Disconnected ConnectorActor: {self.con_id} for SimpleActor {mark}")
return 1
connector_actor_pool = ray.util.ActorPool(
[
ConnectorActor.options(num_cpus=1).remote(con_id=_)
for _ in range(2)
]
)
simple_actor_pool = ray.util.ActorPool(
[
SimpleActor.options(num_cpus=1).remote(mark=_)
for _ in range(4)
]
)
for i in range (4):
simple_actor_pool.submit(lambda a, v: a.work.remote(v), connector_actor_pool)
In the code snippet I am instantiating two ActorPools where one ActorPool
(connector_actor_pool
, 2 Actors) is being passed as a parameter to Actors from another pool (simple_actor_pool
, 4 actors).
simple_actor_pool
is behaving as expected, immediately all 4 Actors from that pool are scheduled with tasks.
I would like connector_actor_pool
to be scheduling tasks for both Actors from but it seems that only one Actor from connector_actor_pool
is being used while other Actor from that pool is never scheduled with any task. As I understand this is due to pickling of connector_actor_pool
while it is being passed as an argument to simple_actor_pool
.
Here are the logs from previous run:
(SimpleActor pid=26779) Start work: 3
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 3}
(SimpleActor pid=26775) Start work: 1
(SimpleActor pid=26777) Start work: 2
(SimpleActor pid=26774) Start work: 0
(SimpleActor pid=26779) Stop work: 3
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 3}
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 1}
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 1}
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 0}
(SimpleActor pid=26775) Stop work: 1
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 0}
(ConnectorActor pid=26773) Connected ConnectorActor: 1 for SimpleActor {'mark': 2}
(SimpleActor pid=26774) Stop work: 0
(ConnectorActor pid=26773) Disconnected ConnectorActor: 1 for SimpleActor {'mark': 2}
(SimpleActor pid=26777) Stop work: 2
Only ConnectorActor 1 is running while pool holds 2 ConnectorActors which I would like both to be available for use by simple_actor_pool
Actors.
Finally my question: is there a way to pass ActorPool to another ActorPool or Workflow so that all Actors can use the same other ActorPool instead of getting copies of that pool and effectively only using first Actor from passed pool?