How to update the number of CPUs and GPUs in PyTorch DDP when ASHA scheduler eliminates some experiments?

My understanding is that the number of resources for a trial may change as ASHA scheduler prunes some experiments. For example, a trial may be initially assigned eight CPUs and one GPU but eventually be allocated 16 CPUs and two GPUs.

I assume such a change can be detected by calling tune.get_trial_resources() inside the training function (when using the functional API). For example, in the following snippet

tuner = tune.Tuner(
    trainable=tune.with_resources(
        partial(training_function, args), {"cpu": 8, "gpu": 1}
    ), ...
)

def training_function(args, config, checkpoint_dir=None):
    trial_resources = tune.get_trial_resources()
    required_resources = trial_resources.required_resources
    num_cpus = int(required_resources["CPU"])
    num_gpus = int(required_resources["GPU"])

    args.workers = num_cpus
    args.gpus = ','.join([str(i) for i in range(num_gpus)])

    # The rest of the function uses args.num_workers
    # to set up data loaders.

num_cpus and num_gpus will initially be eight and one, respectively.

At this point inside the training function, one can not use mp.spawn() to use DDP because the training function used for invoking tune’s functional API is not in the __main__ block. It is also not possible (?) to use torchrun inside the function.

I understand I can wrap my training function inside DistributedTrainableCreator as shown in this example. However, even in this case, I need to know the number of GPUs prior to calling tune.run() or tuner.fit().

I would really appreciate it if you could suggest pointers on how I can change the number of resources allocated to a DDP task dynamically to resolve this issue.

Thanks!

Hi there!
Have you taken a look at this resource changing scheduler example? I believe it may shed some light here: test_resource_changing.py - ray-project/ray - Sourcegraph

One thing to note is that, a DDP trainable cannot just dynamically change to more workers on the fly. You need to implement some checkpoint saving and loading logic for the transition. And depending on how big your model is and does it need to be moved to cuda, there may be some set up cost.

cc @Yard1

Thanks for sharing these pointers; both the XGBoost example and unit test are very helpful in better understanding the ResourceChangingScheduler.

Agreed!

What is still unclear to me is how one could use the functional API and yet be able to adjust the resources for a DDP trial. According to the docs (emphasis mine)

If the functional API is used, the current trial resources can be obtained by calling tune.get_trial_resources() inside the training function.

For XGBoost where the number of threads can be passed in config, this would work flawlessly.

However, for DDP, where one has to call mp.spawn() and provide the number of available resources there, this would not work because mp.spawn() can only be called from the __main__ block and not from inside another function, including inside the training function.

P.S. Apologies for sending multiple replies. I have a limit of two links per post because I am a new user.

Hi,
What if you change the code in unit test from using XGBoostTrainer to using TorchTrainer?
I think TorchTrainer should handle that automatically for you.
Basically I am referring to the code here:

@Yard1 should we update our documentation to use more ScalingConfig terminology than get_trial_resources for ResourceChangingScheduler?

1 Like

Thanks @xwjiang2010. So what you are suggesting is to use something like the example under Ray Tune Usage on this page?

from ray.air import session
from ray.train.torch import TorchTrainer
import ray.data
from ray.air.config import ScalingConfig
from ray import tune
from ray.tune.tuner import Tuner
from ray.tune.tune_config import TuneConfig


def train_loop_per_worker():
    # By default, bulk loading is used and returns a Dataset object.
    data_shard = session.get_dataset_shard("train")

    # Manually iterate over the data 10 times (10 epochs).
    for _ in range(10):
        for batch in data_shard.iter_batches():
            print("Do some training on batch", batch)


trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=1),
    datasets={"train": ray.data.range_tensor(1000)},
)
param_space = {
    "scaling_config": ScalingConfig(num_workers=tune.grid_search([1, 2])),
    "params": {
        "objective": "binary:logistic",
        "tree_method": "approx",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9),
    },
}
tuner = Tuner(
    trainable=trainer,
    param_space=param_space,
    tune_config=TuneConfig(mode="min", metric="train-error"),
)
results = tuner.fit()

The text before the example says

Rather than passing in the scaling_config parameter to Trainer, instead set the scaling_config key of the param_space dict that is passed to your Tuner initializer

but it does not use the scaling_config key of the param_space anywhere (something like config["scaling_config"]).

Can you please elaborate on how using the ScalingConfig in this example can help so I can apply a similar idea to my code?

It’s handled automatically for you (see how ScalingConfig is merged in this file).

I think you can pretty much follow this example to see how to use ResourceChangingScheduler + TorchTrainer together.
And I don’t think you need to put scaling_config into param_space for it to work (see the above example).

1 Like

We should update the terminology, yes

Can you confirm whether it is possible (from the underlying Ray framework) to change the GPU resources during the training run using the TorchTrainer and ResourceChangingScheduler?

Yes, that is what the ResourceChangingScheduler is supposed to do, with two caveats:

  1. It needs to pause the training by saving a checkpoint and then loading it in order to reallocate resources, which can add some overhead
  2. Due to 1, it cannot reallocate in the middle of a training iteration (in other words, it will reallocate only after a session.report call in your training function)

Ray in general cannot change the resources of a running task, which is why we need to checkpoint, stop the task, start it again with new resources and load the checkpoint again.

1 Like