RayTune cluster not distributing load correctly?

Hi Ray Team,

Cluster details

  • head node and single worker node have equivalent specs: 12 cpu, 1 gpu, ray 2.8.0, Python 3.8.18
  • both nodes communicate over LAN
  • ray at head node started via ray start --head
  • ray at worker node started via ray start --address='192.168.20.194:6379'
======== Autoscaler status: 2023-11-13 12:09:02.672622 ========
Node status
---------------------------------------------------------------
Active:
 1 node_71f0b9ad4e328c9ab71c9b8394b595b5344969dfefd4c879860491e2
 1 node_3e3fcf23de6181d9939caa8f78fdd5ead394ebd84a13ed01f91690f8
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/24.0 CPU
 0.0/2.0 GPU
 0B/74.16GiB memory
 0B/34.09GiB object_store_memory

Demands:
 (no resource demands)

Problem
I’ve been consistently having issues with raytune not adequately distributing the load among the nodes in the cluster to see a decrease in runtime. This problem might just be in my unique scenario where the optimisation problem is an objective equation (see code).

In the simple problem below, tune would run 1000 trails (for each value of alpha).
On single node cluster (just the head node), the problem takes 20ish seconds.
When I extend the cluster to include the worker node, the problem still takes 20ish seconds.
I’ve tinkered with many parameters with no luck so far.
Any ideas ? Thank you

import ray
from ray import tune
from ray.air import session
import numpy as np
 
ray.init(address="auto")
def objective(step, alpha, beta):
   return (0.1 + alpha * step / 100)**(-1) + beta * 0.1
 
def training_function(config):
	alpha, beta = config["alpha"], config["beta"]
	for step in range(10):
		intermediate_score = objective(step, alpha, beta)
		session.report({"mean_loss": intermediate_score})

resource_group = tune.PlacementGroupFactory([{"CPU": 4}])
trainable_with_resources = tune.with_resources(training_function, resource_group)
analysis = tune.run(
   training_function,
   config={
       "alpha": tune.grid_search(list(np.arange(0,1,0.001))),
       "beta": tune.choice([1, 2, 3])
   })
 
print("Best config: ", analysis.get_best_config(metric="mean_loss", mode="min"))
df = analysis.results_df

This is likely happening because the training/objective functions are very small, so scheduling becomes an overhead. Maybe you can add time.sleep(1) to the start of the training_function, and see if the difference is more observable?

1 Like

Thanks for replying Matt.

Yes your thinking looks correct. After adding the time.sleep(1), the scaling effect of adding a node almost halved the runtime as expected.

Is there a workaround in ray tune to cater to my scenario ? The problem seems easily parallelize able.
Like partitioning the search space such that node1 operates on alpha 0 to 0.5 and node2 operates on alpha 0.5 and 1. Each node individually computes their best value of (alpha, beta) for their individual search space and joins at a later period to decide the best value of (alpha, beta) for the overall search space.

It really depends on your workload. You won’t really see the benefits of going distributed if each individual task is tiny.

One naive way you could think about it is you can “increase” the amount of work done in each training_function. For example…

def training_function(config):
	alpha, beta = config["alpha"], config["beta"]
        for alpha1 in list(np.arange(alpha, alpha+0.1,0.001):
             # Compute the best value of alpha1 in this range

analysis = tune.run(
   training_function,
   config={
       "alpha": tune.grid_search(list(np.arange(0,1,0.1))), # Reduce the number of tasks
       "beta": tune.choice([1, 2, 3])
   })

that works perfectly, thank you