Following is my ray Queue code snippet.
import ray
from ray.util.queue import Queue as Queue
from ray.util.queue import Full,Empty as Full, Empty
ray.init(address='auto')
queue = Queue(maxsize=5)
@ray.remote
def consumer(queue):
next_item = queue.get()
print(f"got work {next_item}")
return next_item * next_item
consumers = [consumer.remote(queue) for _ in range(5)]
for i in range(10):
try:
queue.put(i, block=False)
except ray.util.queue.Full:
print("queue is full")
# Do some logic
print(ray.get(consumers))
print(ray.get(consumers))
My requirement is similar to the above code snippet. I have a queue size of x , data size will be 2x . After writing x amount of data to the queue . I need to execute my business logic using ray.get and retrieve data. Then again perform ray.get on the other x amount of data.
I have tried to simulate it above , but the behavior is not as expected, after writing 5 elements to the queue , it raises exception. Does ray.get gives response. After exiting out of the loop again wth ray.get prints same data again.
I ran your code and found no error, here is your code
import ray
from ray.util.queue import Queue as Queue
from ray.util.queue import Full,Empty as Full, Empty
ray.init()
queue = Queue(maxsize=5)
@ray.remote
def consumer(queue):
next_item = queue.get()
print(f"got work {next_item}")
return next_item * next_item
consumers = [consumer.remote(queue) for _ in range(5)]
for i in range(10):
try:
queue.put(i, block=False)
except ray.util.queue.Full:
print("queue is full")
# Do some logic
print(ray.get(consumers))
print(ray.get(consumers))
And the output is:
2022-06-21 15:59:07,697 INFO services.py:1462 -- View the Ray dashboard at http://127.0.0.1:8265
[1, 9, 4, 16, 0]
(consumer pid=4110420) got work 4
(consumer pid=4110422) got work 2
(consumer pid=4110416) got work 0
(consumer pid=4110421) got work 3
(consumer pid=4110424) got work 1
I think that is what you want right?
BTW, initializing actors needs some time. The queue in ray is actually wrapped with actors, so you can sleep some seconds in this way:
...
ray.init()
queue = Queue(maxsize=5)
time.sleep(10)
...
And it will output
2022-06-21 16:01:13,296 INFO services.py:1462 -- View the Ray dashboard at http://127.0.0.1:8265
queue is full
(consumer pid=4189474) got work 1
(consumer pid=4189477) got work 0
(consumer pid=4189476) got work 2
[4, 16, 0, 9, 1]
[4, 16, 0, 9, 1]
(consumer pid=4189469) got work 4
(consumer pid=4189470) got work 3