Pool in a Ray cluster is sending the same number of jobs to different nodes even though the nodes have different sizes/different number of CPUs

I am using Pool in a Ray cluster. I want to be able to scale the number of jobs sent to different nodes proportionately to the compute capability (e.g., the number of CPUs) that each node has. Unfortunately, the Ray cluster pool I set up is sending the same number of jobs to different nodes even though the nodes have different sizes/different numbers of CPUs.

For example, I have four nodes in the cluster

Node 1: 64 CPUs [Head]
Node 2: 64 CPUs
Node 3: 48 CPUs
Node 4: 16 CPUs

While initializing the clusters/adding each of the nodes, I specified to Ray the number of CPUs that each of the nodes has. However, if I try to run highly parallelizable/completely independent processes, say 192 processes, the Ray sends 192/4 = 48 to each of the nodes. This is not desirable because the processes in Node 4 finish last (taking about 3 times the duration the other nodes take), while the processes in Nodes 1 to 3 finish almost at the same time (and are three times faster than Node 1). So, the entire job is slowed down by Node 1.

What I intend to achieve is (64:64:48:16) as follows.

Node 1: 64 processes
Node 2: 64 processes
Node 3: 48 processes
Node 4: 16 processes

What Ray is currently doing is (48:48:48:48) as follows.

Node 1: 48 processes
Node 2: 48 processes
Node 3: 48 processes
Node 4: 48 processes

I will appreciate any suggestions that can help me achieve something close to 64:64:48:16 (rather than 48:48:48:48).

The following is a sample code I am using to test/debug the cluster.

import ray
from ray.util.multiprocessing import Pool

ray.init(address='auto')

pool_1 = Pool()

# A slow enough CPU-intensive function just to test the idea
def fibonacci(n):
    if n<= 0:
        "Invalid"
    elif n == 1:
        return 0
    elif n == 2:
        return 1
    else:
        return fibonacci(n-1) + fibonacci(n-2)

results = pool_1.map(fibonacci, [38]*192)

Hey @Emmanuel_Salawu let me know if my answer on stack overflow works for you

1 Like

Thank you for looking into this @Alex. I tried it but Pool does not accept/take that argument. Could I be missing something? The version of Ray that I am using is 1.12.1.

For example, I get the error message shown below if I do the following.

import ray
from ray.util.multiprocessing import Pool as PoolFromRay
ray_context = ray.init(address='auto')
pool = PoolFromRay(192, ray_remote_args={"num_cpus": 1})

Error:

TypeError: __init__() got an unexpected keyword argument 'ray_remote_args'

Ah my bad, this argument is being introduced in 1.13, which should be out soon™️

@Yard1 any chance you know of a mitigation for older versions?

I think you would need to monkey patch it. In the meanwhile, perhaps using the nightly build would suffice?

1 Like

Thank you, @Alex and @Yard1 ! Your suggestions helped solve the problem. After I upgraded to the nightly build, I am able to use the ray_remote_args={"num_cpus": 1}.

pip uninstall -y ray
pip install -U 'ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl'

After that, I was able to do the following, and the actors/jobs were distributed as intended.

import ray
from ray.util.multiprocessing import Pool as PoolFromRay
ray_context = ray.init(address='auto')
pool_1 = PoolFromRay(192, ray_remote_args={"num_cpus": 1})

def fibonacci(n):
    # A slow enough CPU-intensive function just to test the idea
   "..."

results = pool_1.map(fibonacci, [38]*192)
1 Like

@Emmanuel_Salawu would you mind marking this topic as “Solved,” for the Ray maintainers’ tracking purposes?
Never mind, didn’t realize I could do that myself.