Autoscaling not working with ray.util.multiprocessing

I have a ray cluster with min 2 workers and max 16 workers with one CPU each. I am submitting a job using ray.util.multiprocessing and it utilizes the 3 CPU but is not autoscaling any further. I thought it would scale up to the 16 worker nodes to complete the job as quickly as possible.

How is the autoscaler supposed to work? Am I supposed to provide what resources I need when calling ray or will it autoscale as it needs more resources?

Here’s the log when job is running

Resources
---------------------------------------------------------------
Usage:
 3.0/3.0 CPU
 0.00/2.100 GiB memory
 0.01/1.125 GiB object_store_memory
Demands:
 (no resource demands)
ray-cluster,dietl:2021-06-14 17:16:15,631	DEBUG legacy_info_string.py:24 -- Cluster status: 2 nodes
 - MostDelayedHeartbeats: {'192.168.248.4': 0.41873884201049805, '192.168.184.4': 0.41870808601379395, '192.168.248.5': 0.41868042945861816}
 - NodeIdleSeconds: Min=0 Mean=0 Max=0
 - ResourceUsage: 3.0/3.0 CPU, 0.0 GiB/2.1 GiB memory, 0.01 GiB/1.13 GiB object_store_memory
 - TimeSinceLastHeartbeat: Min=0 Mean=0 Max=0
Worker node types:
 - rayWorkerType: 2
ray-cluster,dietl:2021-06-14 17:16:20,904	DEBUG resource_demand_scheduler.py:160 -- Cluster resources: [{'object_store_memory': 606513176.0, 'memory': 1503238553.0, 'node:192.168.248.4': 1.0, 'CPU': 0.0}, {'node:192.168.248.5': 1.0, 'object_store_memory': 302398422.0, 'memory': 375809638.0, 'CPU': 0.0}, {'memory': 375809638.0, 'node:192.168.184.4': 1.0, 'object_store_memory': 290426625.0, 'CPU': 0.0}]
ray-cluster,dietl:2021-06-14 17:16:20,904	DEBUG resource_demand_scheduler.py:161 -- Node counts: defaultdict(<class 'int'>, {'rayHeadType': 1, 'rayWorkerType': 2})
ray-cluster,dietl:2021-06-14 17:16:20,904	DEBUG resource_demand_scheduler.py:172 -- Placement group demands: []
ray-cluster,dietl:2021-06-14 17:16:20,904	DEBUG resource_demand_scheduler.py:218 -- Resource demands: []
ray-cluster,dietl:2021-06-14 17:16:20,904	DEBUG resource_demand_scheduler.py:219 -- Unfulfilled demands: []
ray-cluster,dietl:2021-06-14 17:16:20,972	DEBUG resource_demand_scheduler.py:241 -- Node requests: {}
ray-cluster,dietl:2021-06-14 17:16:20,979	DEBUG autoscaler.py:755 -- ray-cluster-ray-worker-type-qxtwm is not being updated and passes config check (can_update=True).
ray-cluster,dietl:2021-06-14 17:16:20,999	DEBUG autoscaler.py:755 -- ray-cluster-ray-worker-type-v9tgj is not being updated and passes config check (can_update=True).
ray-cluster,dietl:2021-06-14 17:16:21,018	DEBUG autoscaler.py:755 -- ray-cluster-ray-worker-type-qxtwm is not being updated and passes config check (can_update=True).
ray-cluster,dietl:2021-06-14 17:16:21,031	DEBUG autoscaler.py:755 -- ray-cluster-ray-worker-type-v9tgj is not being updated and passes config check (can_update=True).
ray-cluster,dietl:2021-06-14 17:16:21,120	INFO autoscaler.py:354 -- 
======== Autoscaler status: 2021-06-14 17:16:21.120617 ========
Node status
---------------------------------------------------------------
Healthy:
 1 rayHeadType
 2 rayWorkerType
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

We’ve also tried setting the processes arg in ray.util.multiprocessing.Pool but we get the following error on an existing cluster

ValueError: Tried to start a pool with 6 processes on an existing ray cluster, but there are only 3 CPUs in the ray cluster.

We have 24 CPU on the cluster and set ray max workers to 16 with 1 CPU each.

How can we get ray to autoscale?

@eoakes does ray.util.multiprocessing have the ability to trigger cluster autoscaling, or are pools always limited by current cluster size?

@blu could you post a snippet of the code used to submit the job?

@Dmitri here’s a snippet of the code using multiprocessing

import ray
from ray.util.multiprocessing import Pool

from etl.exporter import Exporter

def export(args):
   e = Exporter(*args)
   e.run()

if __name__ == '__main__':
    ray.util.connect(os.environ.get("RAY_ADDRESS"))

    work = [i for i in some_list]

    with Pool() as pool:
        results = pool.map(export, work)

Do you have recommendation for using something else that does trigger cluster autoscaling?

Yeah, so I think the pool is capped to the current cluster size. It could be interesting to try to enable autoscaling to automatically scale in response to a pool that’s bigger than the current cluster size allows.

For your application, I think there are two options

  • Option 1(preferred)
    Use ray tasks directly:
import ray

from etl.exporter import Exporter

@ray.remote
def export(args):
   e = Exporter(*args)
   e.run()

if __name__ == '__main__':
    ray.util.connect(os.environ.get("RAY_ADDRESS"))

    futures = [export.remote(i) for i in some_list]
    results = ray.get(futures)

Ray tasks use 1 CPU by default – so one would get equivalent behavior with @ray.remote(num_cpus=1)

Option 2:
(1) Trigger autoscaling manually with autoscaler.sdk.request_resources(num_cpus=…)
(2) Verify scale up, e.g. with ray.cluster_resources()
(3) Execute the script as is
(4) Remove the resource request with autoscaler.sdk.request_resources(num_cpus=0) in order to allow scale down to min workers

Thanks, this got the autoscaling working!

1 Like