How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have an existing Python package intended for a kind of physics simulation that involves resource-intensive postprocessing of a large number of 3x3 orientation matrices. At first I have used multiprocessing.Pool
to run the calculations on a local shared-memory cluster, however for larger datasets I want to scale up to using the distributed memory (multi-node) PBS cluster (“supercomputer”) available at my university. I have seen that Ray offers a “drop-in” multiprocessing.Pool
API. I have also been able to set up the remote Ray cluster on the supercomputer using some shell scripts.
Inspecting the ray.cluster_resources()
within my Python code tells me that the remote Ray cluster is started with the correct amount of resources when I launch a PBS job on this supercomputer. However, the PBS job monitoring tools show only 22% total CPU utilisation and only about 1/3 of the memory I request is being used (500GB/1500GB).
The structure of the Python code is such that I use nested context blocks for multiprocessing.Pool
, something like this:
import ray
from ray.util.multiprocessing import Pool
import numpy as np
def foo(a):
# Postprocessing function.
return do_big_computation(a)
def bar(seed): # Seed is a PRNG seed used in the computation.
# Main processing is a scipy LSODA solve.
a_stack = do_moderate_computation(seed)
return a_stack
def foo_bulk(a_stack, pool):
out = np.empty(len(a_stack))
for i, o in enumerate(pool.imap(foo, a_stack)):
out[i] = o
return out
# Within a pytest test method:
def test_foo(seeds): # seeds is an array of PRNG seeds
# Connect to Ray cluster, I have exported $RAY_ADDRESS in the environment.
ray.init(address="auto") # Actually done in a session-scoped fixture
out = []
with Pool() as pool:
for i, o in enumerate(pool.imap_unordered(bar, seeds)):
out.append(foo_bulk(o, pool)) # In Real Life I preallocate the size of out of course.
np.savez(out)
ray.shutdown()
I’m wondering if:
- passing the ActorPool object to
foo_bulk
like that is working as I expect from the stdlib version - distributing the computation requires explicit
@remote
decorators somewhere, or that is all handled automatically byPool
- distributing objects requires explicit
ray.get
andray.put
(I tried to add some in various paces but get output arrays that are filled withnp.nan
) - there are any other obvious pitfalls resulting from this approach
perhaps I can come up with a small reproducer case next week if this approach is intended to work