The task is always in Waiting for scheduling, Queue.put blocked Queue.get

import ray
import time
import threading
# from ray.util import queue
import queue

ray.init()

@ray.remote
class MyQueue(object):

  def __init__(self, maxsize=128):
    self._q = queue.Queue(maxsize)
  
  def my_put(self):
    for i in range(100000):
      time.sleep(0.1)
      self._q.put(i)
  
  def my_get(self):
    while True:
      data = [self._q.get() for _ in range(10)]
      print(data)


@ray.remote
def put_submitter(q):
  q.my_put.remote()


@ray.remote
def get_submitter(q):
  q.my_get.remote()

q = MyQueue.remote()
put_submitter.remote(q)
get_submitter.remote(q)

threading.Event().wait()

Dashboard:

What is the problem? why queue.get blocked queue.put?

This is still the case even when using ray.util.queue and multiprocessing.Queue.

@hilanzy When does the function’s while condition become false:

def my_get(self):
    while True:
      data = [self._q.get() for _ in range(10)]
      print(data)

Looks like it could be looping forever, unless I’ve grossly missed something here.

Yes, it is looping forever, but another process keeps calling put.

Yes, it is looping forever, but another process keeps calling put.

@hilanzy What’s it you trying to accomplish here? Write your own distributed queuing data structure?

Why do you want this to loop forever? If this actor method is called the first time, it’ll loop forever, so how does another call call the same actor’s method since it never completes, any subsequent call will block until the method is executed.

cc: @Ruiyang_Wang Do you see anything here that I’m missing?

As I know multiprocessing.Queue is process safety, When I replace Ray task by Process, It works fine.
just like this

import time

from multiprocessing import Process, Queue


q = Queue(128)
for i in range(22):
  q.put(1024)

def put(q):
  print(f"put: {q.get()}, {id(q)}")
  i = 0
  while True:
    time.sleep(0.05)
    q.put(i)
    i += 1

def get(q):
  print(f"get: {q.get()}, {id(q)}")
  while True:
    data = [q.get() for _ in range(10)]
    print("get:", data)

p_put = Process(target=put, args=(q,))
p_get = Process(target=get, args=(q,))

p_put.start()
p_get.start()

Thanks for you replay!
I want to use the remote Queue as my replay buffer. Some rollout processes put data into it, while others Learner process get data from it.

As you said, “any subsequent call will block until the method is executed”
I think I must have some misunderstandings about Ray Actor. I expect it to work the same as the multiprocessing I wrote above, but it actually doesn’t. So, I want to know how it works.

Why subsequent call will block, put and get are calling by two different submitters. Aren’t they parallel in multiprocessing? I’m really confused.

I think I might know where I went wrong. For each member of an Actor, Ray will use a process lock to ensure safe operations by remote.task. Therefore, in an Ray Actor, the same variable can only be owned by one remote task at a time. So, When q.get or q.put be block, any subsequent call which need q will block. Is my understanding correct?

Ray Actor methods for the same actor instance are executed in the order they are called. In order words, they executed serially

a = Actor.remote()
a.method_1(..args)
b.method_2(..args)

These will be executed on the worker process on the node where Actor is scheduled.

If you want them to be executed asynchronously, without a method being blocked until it finished, for the same instance, and not follow the serial order, then you can define the methods as async, By default, Actor instance methods are executed serially in the order they are invoked.

This is what you want described here.

" By default, a Ray actor runs in a single thread and actor method calls are executed sequentially. This means that a long running method call blocks all the following ones."

Also, consider using Ray Queue.. And Actor task execution order.

@hilanzy

q = MyQueue.remote()
put_submitter.remote(q)
get_submitter.remote(q)

They are different submitters so they should work, albeit their execution order is not quranteed.

cc: @Ruiyang_Wang any insight into this behaviour?

Thinks for you repaly, I think the code below is really I want to do

import ray
import time
import threading
# from ray.util.queue import Queue
from multiprocessing import Queue

ray.init()

@ray.remote
class MyQueue(object):

  def __init__(self, maxsize=128):
    self._q = Queue(maxsize)

  def my_put(self, item):
    self._q.put(item)

  def my_get(self):
    data = [self._q.get() for _ in range(10)]
    print(data)


@ray.remote
def put_submitter(q):
  for i in range(100000):
    q.my_put.remote(i)


@ray.remote
def get_submitter(q):
  time.sleep(0.5)
  while True:
    q.my_get.remote()

q = MyQueue.remote()
put_submitter.remote(q)  # some RL Actor put the data into the replay buffer
get_submitter.remote(q)  # some RL Learner get the data from the replay buffer

threading.Event().wait()

What‘s I expect is put_submitter keeps to put data to the replay buffer(the Queue), and the get_submitter keeps get data from the same replay buffer. And as I expect, they are from 2 different submitter, so there should be executed in 2 diffirent process(or thread?), and they should be parallel in multiprocessing, and as we konw multiprocessing.Queue is process safety, so the queue shouldn’t be deadlock. But actually deadlock was happend, so I think I must have some misunderstandings about Ray Actor.

I’m really sorry for my poor English express, I was trying do my best, thanks.