Unable to saturate cluster with ASHA trials (CPU-bound)

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

I’m using ASHA to tune an Evolution Strategy (CMA-ES) that works in generations. One hyperparam is the population size (7…300), which is also the number of tasks the trainable will submit to Ray for each generation.

I’m using the autoscaler. Here is my resources config:
(IIRC it doesn’t work without a PlacementGroup.)

resources_per_trial=tune.PlacementGroupFactory(
        [{'CPU': 0.0}] + [{'CPU': 1.0}] * 16

It think this means that Ray will keep 16 CPUs (possibly on different nodes) reserved per trial, whether they are used or not. (Is my understanding correct?)

This means if the trial has population size 7, Ray will ensure that 9 CPUs are idling. And with population size 129 the last task will probably run alone, while concurrent trials are forbidden to use the 15 idling CPU resources.

Solutions I’ve considered:

  1. Reserve fewer CPUs per trial. This works great in the beginning. But when ASHA has stopped the bad trials, the last one will take forever, unable to utilize the now-idle cluster because of the placement group.

  2. Reserve as many CPUs as the population size. (Using a lambda in the placement group spec.) I have not tried, but I think it would prevent trials with population size 300 from ever getting scheduled on a cluster of 256 CPUs. And if two trials with population size 200 are left, only one would run, leaving 56 cores idle.

  3. Over-subscribe the cluster. This is what I’m currently doing. It’s clearly a hack: I’m configuring each node to have 2x the CPUs it actually has, so that the OS scheduler is more likely to have a task to switch to.

Are there any other options to avoid those trade-offs?

(On a different note: I occasionally see that trials fail when the autoscaler is scaling down. I think it’s because of the {'CPU': 0.0} setting of the trial actor (not using checkpoints). This is probably how it’s intended to work, given that spec…?)

Hi @maxy,

there are a couple of other options. I don’t think any of them are perfect, but let’s see.

First, this script demonstrates how Ray Tune schedules actors and what happens within the placement groups. Note that the first example here will hang forever.

import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy


ray.init(num_cpus=4)


pg = ray.util.placement_group([{"CPU": 2}])


@ray.remote(num_cpus=2)
def inner(a):
    return a


@ray.remote(num_cpus=2)
class Actor:
    def run(self, a):
        # This will hang forever because there is no more available resources in the PG
        return ray.get(inner.remote(a))

# This is how Tune trainables are scheduled per default
actor = Actor.options(
    scheduling_strategy=PlacementGroupSchedulingStrategy(
        placement_group=pg,
        placement_group_bundle_index=0,
        placement_group_capture_child_tasks=True,
    )
).remote()

print(ray.get(actor.run.remote(5)))

The cluster here has 4 CPUs available, and the “trial” placement group reserves 2 of them. However, it can’t schedule the “inner” task as per default it tries to use the same placement group - but the resources are already occupied by the actor. Hence it hangs.

This is what you solve by specifying a second bundle in the PlacementGroupFactory. Your population tasks will run in that second bundle. So yes, one solution is to add a second bundle that reserves more resources for the trial.

As you correctly observed, this will prevent the trial from using other available cluster resources. It can also mean that some trials (with low population sizes) unnecessarily block resources that they don’t need.

There are some ways to mitigate this.

1. You can “break out” of the placement groups

Child tasks are “captured” in the placement group per default in Ray Tune trials. We can explicitly disable this behavior like this:


@ray.remote(num_cpus=2)
class Actor:
    def run(self, a):
        # This will "break out" of the placement group and use other available cluster
        # resources
        return ray.get(
            inner.options(
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=None
                )
            ).remote(a)
        )

With this change, Ray will not try to use the same placement group, but explicitly try to use remaining cluster resources that have not been reserved in placement groups.

This means you could leave a global pool of resources free that every trial can access. The trials will compete for these resources, but on average they will be uniformly distributed to the trials. Notably, when only one trial remains, it can access all remaining resources.

The main caveat here is that you need to ensure that some resources remain free, e.g. using a ConcurrencyLimiter.

2. Variant: You can use a specific placement group for the population

If you don’t want to use a concurrency limiter, you can also explicitly reserve resources in a “breakout” placement group like this:

breakout = ray.util.placement_group([{"CPU": 2}])


@ray.remote(num_cpus=2)
class Actor:
    def run(self, a):
        # This will "break out" of the placement group by using a specific placement
        # group that is reserved for the population processing
        return ray.get(
            inner.options(
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=breakout
                )
            ).remote(a)
        )

Like above, this will reserve resources for all trials to share.

3. You can use dynamic resource requests

Ray Tune supports passing a function to tune.with_resources which can dynamically adjust the placement group size to the population size of your trial:

import ray
from ray import tune


def train_fn(config):
    return {"specs": ray.util.get_current_placement_group().bundle_specs}


def get_resources(config) -> tune.PlacementGroupFactory:
    return tune.PlacementGroupFactory(
        [{"CPU": 1}, {"CPU": config["population_size"]}]
    )


tune.Tuner(
    tune.with_resources(
        train_fn, resources=get_resources
    ),
    param_space={
        "population_size": tune.grid_search([1, 2, 4])
    }
).fit()

For trials with large population sizes, you can reserve more resources. You can also implement a ceiling for them in your get_resources function to make sure all trials can be scheduled.

4. You can use a ResourceChangingScheduler

The dynamic resource function will still lead to a situation where the last remaining trial may use less resources than it could.

Ray Tune implements a utility for this, called the ResourceChangingScheduler. With this utility you can dynamically resize the placement groups of trials depending on how many trials are still running.

Note though that changing the resource size means the trial will be shutdown and restarted. You have to have checkpointing/restoration in place to make this work.

And example how to implement it can be found here.

I hope this helps, let me know if you have more questions!

Thanks a lot!

I didn’t realize that breaking out of a placement group may be possible. It works as expected.

I found this “break out” syntax a bit easier: (same effect)

@ray.remote(scheduling_strategy='DEFAULT')
def evaluate(...):

And you’re right, I also need to limit concurrency. (Now I understand why Tune prefers to reserve resources in advance.)

This is what I’m doing now:

tuner = tune.Tuner(
  tune.with_resources(train, resources={'cpu': 0.01, 'gpu': 0}),
  tune_config=tune.TuneConfig(
    max_concurrent_trials=16,
    scheduler=ASHAScheduler(...)
    ...

There is a slight “hack” to reserve 0.01 CPU for each trial actor (zero is not allowed). The actors will collectively reserve 0.16 CPUs.

I like the flexibility of a global task pool:

  • When waiting for the last few evaluations of each generation, other trials/tasks can use the idling CPUs.
  • If the ES algorithm (trial actor) needs some time to process the generation results single-threaded, in can do so without sitting on the CPU reservations that it will use for evaluating the next batch. (Currently I’m sticking with sep-CMA-ES, which is fast enough to not have this problem. But I’ve had it before.)
  • Every few generations, my trial actor will submit a “precise evaluation” task to measure performance independent of hyperparams. It was a bit silly to submit this task to the same pool tuned approximately for the population size (a hyperparam).

It’s good to know about the other variants. Maybe I’ll try them later, but for now I’m happy.

(The autoscaler now scales down later than before, because tasks are spread, so nodes tend to not become fully idle. Not a big problem. I can scale down manually, or submit a concurrent task, or just chill about it.)

Glad to hear this works for you! Thanks for pointing out the setting in the @ray.remote call - that actually makes it much easier to configure.