Understanding the working of ray Queue

Following is my ray Queue code snippet.

import ray
from ray.util.queue import Queue as Queue
from ray.util.queue import Full,Empty as Full, Empty

ray.init(address='auto')

queue = Queue(maxsize=5)

@ray.remote
def consumer(queue):
    next_item = queue.get()
    print(f"got work {next_item}")
    return next_item * next_item


consumers = [consumer.remote(queue) for _ in range(5)]

for i in range(10):
    try:
        queue.put(i, block=False)

except ray.util.queue.Full:
    print("queue is full")
    # Do some logic
    print(ray.get(consumers))

print(ray.get(consumers))

My requirement is similar to the above code snippet. I have a queue size of x , data size will be 2x . After writing x amount of data to the queue . I need to execute my business logic using ray.get and retrieve data. Then again perform ray.get on the other x amount of data.

I have tried to simulate it above , but the behavior is not as expected, after writing 5 elements to the queue , it raises exception. Does ray.get gives response. After exiting out of the loop again wth ray.get prints same data again.

I ran your code and found no error, here is your code

import ray
from ray.util.queue import Queue as Queue
from ray.util.queue import Full,Empty as Full, Empty

ray.init()

queue = Queue(maxsize=5)

@ray.remote
def consumer(queue):
    next_item = queue.get()
    print(f"got work {next_item}")
    return next_item * next_item

consumers = [consumer.remote(queue) for _ in range(5)]

for i in range(10):
    try:
        queue.put(i, block=False)

    except ray.util.queue.Full:
        print("queue is full")
        # Do some logic
        print(ray.get(consumers))

print(ray.get(consumers))

And the output is:

2022-06-21 15:59:07,697	INFO services.py:1462 -- View the Ray dashboard at http://127.0.0.1:8265
[1, 9, 4, 16, 0]
(consumer pid=4110420) got work 4
(consumer pid=4110422) got work 2
(consumer pid=4110416) got work 0
(consumer pid=4110421) got work 3
(consumer pid=4110424) got work 1

I think that is what you want right?

BTW, initializing actors needs some time. The queue in ray is actually wrapped with actors, so you can sleep some seconds in this way:

...
ray.init()

queue = Queue(maxsize=5)

time.sleep(10)
...

And it will output

2022-06-21 16:01:13,296	INFO services.py:1462 -- View the Ray dashboard at http://127.0.0.1:8265
queue is full
(consumer pid=4189474) got work 1
(consumer pid=4189477) got work 0
(consumer pid=4189476) got work 2
[4, 16, 0, 9, 1]
[4, 16, 0, 9, 1]
(consumer pid=4189469) got work 4
(consumer pid=4189470) got work 3