Nested `multiprocessing.Pool` on a distributed Ray cluster

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I have an existing Python package intended for a kind of physics simulation that involves resource-intensive postprocessing of a large number of 3x3 orientation matrices. At first I have used multiprocessing.Pool to run the calculations on a local shared-memory cluster, however for larger datasets I want to scale up to using the distributed memory (multi-node) PBS cluster (“supercomputer”) available at my university. I have seen that Ray offers a “drop-in” multiprocessing.Pool API. I have also been able to set up the remote Ray cluster on the supercomputer using some shell scripts.

Inspecting the ray.cluster_resources() within my Python code tells me that the remote Ray cluster is started with the correct amount of resources when I launch a PBS job on this supercomputer. However, the PBS job monitoring tools show only 22% total CPU utilisation and only about 1/3 of the memory I request is being used (500GB/1500GB).

The structure of the Python code is such that I use nested context blocks for multiprocessing.Pool, something like this:

import ray
from ray.util.multiprocessing import Pool
import numpy as np

def foo(a):
    # Postprocessing function.
    return do_big_computation(a)

def bar(seed):  # Seed is a PRNG seed used in the computation.
    # Main processing is a scipy LSODA solve.
    a_stack = do_moderate_computation(seed)
    return a_stack

def foo_bulk(a_stack, pool):
    out = np.empty(len(a_stack))
    for i, o in enumerate(pool.imap(foo, a_stack)):
        out[i] = o
    return out

# Within a pytest test method:
def test_foo(seeds):  # seeds is an array of PRNG seeds
    # Connect to Ray cluster, I have exported $RAY_ADDRESS in the environment.
    ray.init(address="auto")  # Actually done in a session-scoped fixture
    out = []
    with Pool() as pool:
        for i, o in enumerate(pool.imap_unordered(bar, seeds)):
            out.append(foo_bulk(o, pool))  # In Real Life I preallocate the size of out of course.
    np.savez(out)
    ray.shutdown()

I’m wondering if:

  • passing the ActorPool object to foo_bulk like that is working as I expect from the stdlib version
  • distributing the computation requires explicit @remote decorators somewhere, or that is all handled automatically by Pool
  • distributing objects requires explicit ray.get and ray.put (I tried to add some in various paces but get output arrays that are filled with np.nan)
  • there are any other obvious pitfalls resulting from this approach

perhaps I can come up with a small reproducer case next week if this approach is intended to work

I think the issue with nan was unrelated. I’m currently using a conditional to change the nested map into:

def foo_bulk(a_stack, pool):
    if HAS_RAY:
        out = ray.get([_foo_dstr.remote(ray.put(a)) for a in a_stack])
        return out
    else:
        # Uses pool.imap ...

where I’ve defined a wrapper for foo that is decorated with @ray.remote:

@ray.remote
def _foo_dstr(*args, **kwargs):
    return foo(*args, **kwargs)