Task distribution is not happening with new nodes


I’m new to Ray and still at learner’s stage. Started working on some example codes to check task distribution.
I tried my testing 2 way.

  1. Started ray head node manually.
    I could see on dashboard 2 cores and no usage. After this I fired my example python work script and I could see 100% CPU utilization.
    Then I from other local system, I connected to Ray’s head node. Immediately I can see second node on dashboard, but due to some reason no work is assigned to this new node. I tried connecting other few nodes, which all shows up as available nodes but no work is getting asssigned to them. Their CPU load stays at 2-4% while the main node is hitting 100%.

What am I doing wrong?

  1. Started ray local cluster.( Not planning to let ray handle scalling)
    Exact same situation as above.
    Why task is not distributed to new nodes connected to ray cluster/main head node?

If nodes are connected first and then I run the work script, task is distributed to all nodes equally.
But my requirement is to turn up nodes if/when required.

Isn’t that how it’s suppose to work? If I connect a node to master( head node), he should start using new nodes to whatever work is left( and there is plenty left).

Sorry, I’m new to Ray and couldn’t find online what am I doing wrong.


Can you make sure all ports are open? Configuring Ray — Ray v2.0.0.dev0

All ports are open for sure and my second test proves it.

Second test is, connecting all nodes BEFORE I run the work script.
In this case all nodes’s all CPU start doing this work, if I have any firewall issue, this shouldnt work.

Issue only happens with the nodes connected after I have started the work script.
Looks like ray only considers nodes available when he receive new work, any nodes connected after that is not used.

I’m using ray 1.8.
I can provide any logs if those provides any clear picture.

I don’t think it is the case. If so, it won’t work with our autoscaler at all. Is it possible to test this with some simple workload like this?

def f():
    import time
    return None

num_cpus = ray.cluster_resources()["CPU"]
ray.get([f.remote() for _ in range(num_cpus * 2)])
# Within 30 seconds add a new node and see if tasks are scheduled there

For your use case, why not just using Ray autoscaler? This seems to be the perfect use case.

I tried with Ray’s autoscaler too, but I do not want ray to scale up and down.
With local in provider, with head_ip set, I started autoscaler and fired up work, then manually connected other nodes as worker node, but it didn’t distribute task.

This is about using extra nodes at will to use in any processing.
What we are planning is to keep running multiple ray head node and as and when, based on task priority, let extra node manually connect to those head nodes to speed up processing.

I’m using multiprocessing.pool in my example which creates around 100 such pools to do all work on head node.
when we add other nodes to head node as worker, I can see them on dashboard that they are connected, but they dont get used, their CPU stays close to 2 or 4, while head node is clocking more than 90%.

This is how I’m using it currently, do you think multiprocessing.pool is the problem here?

    print("Connected to Cluster Successfully")
    from ray.util.multiprocessing import Pool
    pool = Pool()

    start = time.time()
    num_inside = 0
    for result in pool.map(sample, [SAMPLE_BATCH_SIZE for _ in range(num_samples//SAMPLE_BATCH_SIZE)]):
        num_inside += result

sample is the main task, which does some calculations and return results.

When I converted my function to remote function, it worked ( crashed in short time due to memory over run), but it looks like it worked and task was distributed on other nodes.

However it doesnt work when creating multiprocess.pool.

please see how I’m using pool as in my above post.
Status command in case of pool shows as below

======== Autoscaler status: 2021-11-24 11:19:53.673637 ========
Node status
 1 node_f5fe5fb26ed86d871b559f1fea0c8e4b467275beff5c4c10c3d3723e
 1 node_4ed8ad15c21e15ea2267e40db54ca514e589fd5cd86694f10471de4f
 1 node_2473323b720a7814cb85a9d2471f2f2886092927dda78ce85a791d28
 (no pending nodes)
Recent failures:
 (no failures)

 2.0/6.0 CPU
 0.00/14.376 GiB memory
 0.18/6.466 GiB object_store_memory

 (no resource demands)

here 1 head node ( 2 CPU) and 2 additional nodes( both with 2 CPU) are there. pool process never get assigned to run on those new nodes.

thank you.

Ray multi processing pool uses actors. When you initialize it, it creates actors as many as your CPU. It doesn’t automatically scale up or anything.

Please refer to the API specification.

I’m not asking multiprocessing pool to scale up.

I’m only hoping that it would use all CPU resource available to it, even those who were available later, which is not happening.

Here’s what I’m doing

  1. Starting head node
  2. Starting python script which uses multiprocessing pool to start over 100 such processes.
  3. I can see on dashboard that head node is 100% cpu
  4. Connecting other nodes to head node manually

What I’m expecting to see is that the new worker node will start getting utilized as well and show 100% CPU.
Instead, what I’m seeing is…
New node stays idle, those processes are not distributed to new node and head node alone keep processing it.

If I connect all worker node before I start my python work script, then these multiprocessing pools gets distributed to all nodes.

Issue is that this is not happening when node connects AFTER work script is executed( while it’s still running) .

To me it looks like a bug.

I don’t think it is a bug. What’s happening here is the pool size is static when you create a pool, and by default it is set to the total number of CPUs of your cluster (as written in the spec). What you observe is your pool is created with pool size == head node cpus, and since you already created a pool, when you add a new node, the pool size is not re-adjusted. If you start the pool with the bigger cluster, initial pool size == num_cpus of the cluster, and that’s why you can use the cpus of all cluster. Afaik, it is indeed the same behavior as regular multi processing library. Multiprocessing Python - change Pool size - Stack Overflow

If you’d like to use dynamically growing # of processes, please use the regular Ray task or request for the dynamic pool size change of multi processing pool to Ray github feature request!

So even with above, pool size is 2 fixed if I have 2 cpu on head node?

So let me try playing with pool = Pool().

If I’ll create more than 2 pools then it should work hopefully.

The pool size is decided when you create a Pool class as specified here; ray/pool.py at 55cb88f0858c30264e6f458f8173bbabeb870aac · ray-project/ray · GitHub

And it seems like the API doesn’t support dynamic pool size change.

Okay, got it.
Thank you so much for your help.

No problem! It is a little unfortunate this feature is not supported now… If you use Ray task, you can easily support your use case. Otherwise, you might need to create a feature request or come up with a clever way to achieve it…