My understanding is that the number of resources for a trial may change as ASHA scheduler prunes some experiments. For example, a trial may be initially assigned eight CPUs and one GPU but eventually be allocated 16 CPUs and two GPUs.
I assume such a change can be detected by calling tune.get_trial_resources()inside the training function (when using the functional API). For example, in the following snippet
tuner = tune.Tuner(
trainable=tune.with_resources(
partial(training_function, args), {"cpu": 8, "gpu": 1}
), ...
)
def training_function(args, config, checkpoint_dir=None):
trial_resources = tune.get_trial_resources()
required_resources = trial_resources.required_resources
num_cpus = int(required_resources["CPU"])
num_gpus = int(required_resources["GPU"])
args.workers = num_cpus
args.gpus = ','.join([str(i) for i in range(num_gpus)])
# The rest of the function uses args.num_workers
# to set up data loaders.
num_cpus and num_gpus will initially be eight and one, respectively.
At this point inside the training function, one can not use mp.spawn() to use DDP because the training function used for invoking tune’s functional API is not in the __main__ block. It is also not possible (?) to use torchrun inside the function.
I understand I can wrap my training function inside DistributedTrainableCreator as shown in this example. However, even in this case, I need to know the number of GPUs prior to calling tune.run() or tuner.fit().
I would really appreciate it if you could suggest pointers on how I can change the number of resources allocated to a DDP task dynamically to resolve this issue.
One thing to note is that, a DDP trainable cannot just dynamically change to more workers on the fly. You need to implement some checkpoint saving and loading logic for the transition. And depending on how big your model is and does it need to be moved to cuda, there may be some set up cost.
What is still unclear to me is how one could use the functional API and yet be able to adjust the resources for a DDP trial. According to the docs (emphasis mine)
If the functional API is used, the current trial resources can be obtained by calling tune.get_trial_resources()inside the training function.
Hi,
What if you change the code in unit test from using XGBoostTrainer to using TorchTrainer?
I think TorchTrainer should handle that automatically for you.
Basically I am referring to the code here:
@Yard1 should we update our documentation to use more ScalingConfig terminology than get_trial_resources for ResourceChangingScheduler?
Thanks @xwjiang2010. So what you are suggesting is to use something like the example under Ray Tune Usage on this page?
from ray.air import session
from ray.train.torch import TorchTrainer
import ray.data
from ray.air.config import ScalingConfig
from ray import tune
from ray.tune.tuner import Tuner
from ray.tune.tune_config import TuneConfig
def train_loop_per_worker():
# By default, bulk loading is used and returns a Dataset object.
data_shard = session.get_dataset_shard("train")
# Manually iterate over the data 10 times (10 epochs).
for _ in range(10):
for batch in data_shard.iter_batches():
print("Do some training on batch", batch)
trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=1),
datasets={"train": ray.data.range_tensor(1000)},
)
param_space = {
"scaling_config": ScalingConfig(num_workers=tune.grid_search([1, 2])),
"params": {
"objective": "binary:logistic",
"tree_method": "approx",
"eval_metric": ["logloss", "error"],
"eta": tune.loguniform(1e-4, 1e-1),
"subsample": tune.uniform(0.5, 1.0),
"max_depth": tune.randint(1, 9),
},
}
tuner = Tuner(
trainable=trainer,
param_space=param_space,
tune_config=TuneConfig(mode="min", metric="train-error"),
)
results = tuner.fit()
The text before the example says
Rather than passing in the scaling_config parameter to Trainer, instead set the scaling_config key of the param_space dict that is passed to your Tuner initializer
but it does not use the scaling_config key of the param_space anywhere (something like config["scaling_config"]).
Can you please elaborate on how using the ScalingConfig in this example can help so I can apply a similar idea to my code?
It’s handled automatically for you (see how ScalingConfig is merged in this file).
I think you can pretty much follow this example to see how to use ResourceChangingScheduler + TorchTrainer together.
And I don’t think you need to put scaling_config into param_space for it to work (see the above example).
Can you confirm whether it is possible (from the underlying Ray framework) to change the GPU resources during the training run using the TorchTrainer and ResourceChangingScheduler?
Yes, that is what the ResourceChangingScheduler is supposed to do, with two caveats:
It needs to pause the training by saving a checkpoint and then loading it in order to reallocate resources, which can add some overhead
Due to 1, it cannot reallocate in the middle of a training iteration (in other words, it will reallocate only after a session.report call in your training function)
Ray in general cannot change the resources of a running task, which is why we need to checkpoint, stop the task, start it again with new resources and load the checkpoint again.