@hilanzy What’s it you trying to accomplish here? Write your own distributed queuing data structure?
Why do you want this to loop forever? If this actor method is called the first time, it’ll loop forever, so how does another call call the same actor’s method since it never completes, any subsequent call will block until the method is executed.
cc: @Ruiyang_Wang Do you see anything here that I’m missing?
As I know multiprocessing.Queue is process safety, When I replace Ray task by Process, It works fine.
just like this
import time
from multiprocessing import Process, Queue
q = Queue(128)
for i in range(22):
q.put(1024)
def put(q):
print(f"put: {q.get()}, {id(q)}")
i = 0
while True:
time.sleep(0.05)
q.put(i)
i += 1
def get(q):
print(f"get: {q.get()}, {id(q)}")
while True:
data = [q.get() for _ in range(10)]
print("get:", data)
p_put = Process(target=put, args=(q,))
p_get = Process(target=get, args=(q,))
p_put.start()
p_get.start()
Thanks for you replay!
I want to use the remote Queue as my replay buffer. Some rollout processes put data into it, while others Learner process get data from it.
As you said, “any subsequent call will block until the method is executed”
I think I must have some misunderstandings about Ray Actor. I expect it to work the same as the multiprocessing I wrote above, but it actually doesn’t. So, I want to know how it works.
I think I might know where I went wrong. For each member of an Actor, Ray will use a process lock to ensure safe operations by remote.task. Therefore, in an Ray Actor, the same variable can only be owned by one remote task at a time. So, When q.get or q.put be block, any subsequent call which need q will block. Is my understanding correct?
Ray Actor methods for the same actor instance are executed in the order they are called. In order words, they executed serially
a = Actor.remote()
a.method_1(..args)
b.method_2(..args)
These will be executed on the worker process on the node where Actor is scheduled.
If you want them to be executed asynchronously, without a method being blocked until it finished, for the same instance, and not follow the serial order, then you can define the methods as async, By default, Actor instance methods are executed serially in the order they are invoked.
" By default, a Ray actor runs in a single thread and actor method calls are executed sequentially. This means that a long running method call blocks all the following ones."
Thinks for you repaly, I think the code below is really I want to do
import ray
import time
import threading
# from ray.util.queue import Queue
from multiprocessing import Queue
ray.init()
@ray.remote
class MyQueue(object):
def __init__(self, maxsize=128):
self._q = Queue(maxsize)
def my_put(self, item):
self._q.put(item)
def my_get(self):
data = [self._q.get() for _ in range(10)]
print(data)
@ray.remote
def put_submitter(q):
for i in range(100000):
q.my_put.remote(i)
@ray.remote
def get_submitter(q):
time.sleep(0.5)
while True:
q.my_get.remote()
q = MyQueue.remote()
put_submitter.remote(q) # some RL Actor put the data into the replay buffer
get_submitter.remote(q) # some RL Learner get the data from the replay buffer
threading.Event().wait()
What‘s I expect is put_submitter keeps to put data to the replay buffer(the Queue), and the get_submitter keeps get data from the same replay buffer. And as I expect, they are from 2 different submitter, so there should be executed in 2 diffirent process(or thread?), and they should be parallel in multiprocessing, and as we konw multiprocessing.Queue is process safety, so the queue shouldn’t be deadlock. But actually deadlock was happend, so I think I must have some misunderstandings about Ray Actor.
I’m really sorry for my poor English express, I was trying do my best, thanks.