Problems to use multiprocessing.Pool and Queue together

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

When trying to use ray.util.multiprocess.Pool and ray.util.Queue together I got some errors. The skeleton of my use case is like this

with Pool(processes=num_workers) as pool:
         for i in range(num_workers):
                queue = Queue()
                pool.apply_async(worker_fn,
                                args=(queue),
                                error_callback=custom_error_callback)

And

def worker_fn(queue):
    while True:
        try:
            item = queue.get(block=False)
        except Empty:
            time.sleep(0.0001)
            continue

        if item is None:
            break
        # Do some stuff

But it emits an error like:

Got error: The actor died unexpectedly before finishing this task.
        class_name: PoolActor
        actor_id: c472300a1e76e2c7bece3c7001000000
        pid: 60456
        namespace: a0dd3b86-d513-4d7b-903d-8a903cf2c893
        ip: 127.0.0.1
The actor is dead because it was killed by `ray.kill`.

It seems that someone said this may be caused by Queue since it is not thread/process safe. So my question is whether there is any other ways to implement the same idea without putting Pool and Queue together?

@lelandfy

I think your script looks fine and I don’t see an issue with it, or the Queue. What I am seeing is

  • there is a long-running task running in the pool
  • when the pool exits it kills all its processes
  • the long-running tasks that is still running gets killed and hence we see the error

To avoid seeing the error I made the long-running task shut down gracefully when it sees a specific value from the queue, and in my case I used ‘None’.

Script:

import time
from ray.util.multiprocessing import Pool
from ray.util.queue import Queue, Empty

def worker_fn(q):
    while True:
        try:
            item = q.get(block=False)
            print(f'got {item}')
        except Empty:
            time.sleep(0.0001)
            continue

        if item is None:
            return

def cb(ex):
    print(ex)
    
with Pool(processes=1) as pool:
    queue = Queue()

    pool.apply_async(worker_fn,
                    args=(queue,),
                    error_callback=cb)

    items = list(range(10))
    for item in items:
        queue.put(item) 
    queue.put(None)

    pool.close()
    pool.join()

Output:

(PoolActor pid=23248) got 0
(PoolActor pid=23248) got 1
(PoolActor pid=23248) got 2
(PoolActor pid=23248) got 3
(PoolActor pid=23248) got 4
(PoolActor pid=23248) got 5
(PoolActor pid=23248) got 6
(PoolActor pid=23248) got 7
(PoolActor pid=23248) got 8
(PoolActor pid=23248) got 9
(PoolActor pid=23248) got None