Unable to saturate cluster with ASHA trials (CPU-bound)

Hi @maxy,

there are a couple of other options. I don’t think any of them are perfect, but let’s see.

First, this script demonstrates how Ray Tune schedules actors and what happens within the placement groups. Note that the first example here will hang forever.

import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy


ray.init(num_cpus=4)


pg = ray.util.placement_group([{"CPU": 2}])


@ray.remote(num_cpus=2)
def inner(a):
    return a


@ray.remote(num_cpus=2)
class Actor:
    def run(self, a):
        # This will hang forever because there is no more available resources in the PG
        return ray.get(inner.remote(a))

# This is how Tune trainables are scheduled per default
actor = Actor.options(
    scheduling_strategy=PlacementGroupSchedulingStrategy(
        placement_group=pg,
        placement_group_bundle_index=0,
        placement_group_capture_child_tasks=True,
    )
).remote()

print(ray.get(actor.run.remote(5)))

The cluster here has 4 CPUs available, and the “trial” placement group reserves 2 of them. However, it can’t schedule the “inner” task as per default it tries to use the same placement group - but the resources are already occupied by the actor. Hence it hangs.

This is what you solve by specifying a second bundle in the PlacementGroupFactory. Your population tasks will run in that second bundle. So yes, one solution is to add a second bundle that reserves more resources for the trial.

As you correctly observed, this will prevent the trial from using other available cluster resources. It can also mean that some trials (with low population sizes) unnecessarily block resources that they don’t need.

There are some ways to mitigate this.

1. You can “break out” of the placement groups

Child tasks are “captured” in the placement group per default in Ray Tune trials. We can explicitly disable this behavior like this:


@ray.remote(num_cpus=2)
class Actor:
    def run(self, a):
        # This will "break out" of the placement group and use other available cluster
        # resources
        return ray.get(
            inner.options(
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=None
                )
            ).remote(a)
        )

With this change, Ray will not try to use the same placement group, but explicitly try to use remaining cluster resources that have not been reserved in placement groups.

This means you could leave a global pool of resources free that every trial can access. The trials will compete for these resources, but on average they will be uniformly distributed to the trials. Notably, when only one trial remains, it can access all remaining resources.

The main caveat here is that you need to ensure that some resources remain free, e.g. using a ConcurrencyLimiter.

2. Variant: You can use a specific placement group for the population

If you don’t want to use a concurrency limiter, you can also explicitly reserve resources in a “breakout” placement group like this:

breakout = ray.util.placement_group([{"CPU": 2}])


@ray.remote(num_cpus=2)
class Actor:
    def run(self, a):
        # This will "break out" of the placement group by using a specific placement
        # group that is reserved for the population processing
        return ray.get(
            inner.options(
                scheduling_strategy=PlacementGroupSchedulingStrategy(
                    placement_group=breakout
                )
            ).remote(a)
        )

Like above, this will reserve resources for all trials to share.

3. You can use dynamic resource requests

Ray Tune supports passing a function to tune.with_resources which can dynamically adjust the placement group size to the population size of your trial:

import ray
from ray import tune


def train_fn(config):
    return {"specs": ray.util.get_current_placement_group().bundle_specs}


def get_resources(config) -> tune.PlacementGroupFactory:
    return tune.PlacementGroupFactory(
        [{"CPU": 1}, {"CPU": config["population_size"]}]
    )


tune.Tuner(
    tune.with_resources(
        train_fn, resources=get_resources
    ),
    param_space={
        "population_size": tune.grid_search([1, 2, 4])
    }
).fit()

For trials with large population sizes, you can reserve more resources. You can also implement a ceiling for them in your get_resources function to make sure all trials can be scheduled.

4. You can use a ResourceChangingScheduler

The dynamic resource function will still lead to a situation where the last remaining trial may use less resources than it could.

Ray Tune implements a utility for this, called the ResourceChangingScheduler. With this utility you can dynamically resize the placement groups of trials depending on how many trials are still running.

Note though that changing the resource size means the trial will be shutdown and restarted. You have to have checkpointing/restoration in place to make this work.

And example how to implement it can be found here.

I hope this helps, let me know if you have more questions!