Hello everybody !
I hope you’re having a great day.
I’m meeting some trouble with checkpoints using Ray Tune for population based training with SLURM workload manager. More specifically, once an experiment is finished, sometimes the trial retrieved by .get_best_trial() is not the best trial shown in the CLI reporter.
For example, for 8 trials, the best had a loss of 0.41 on dev dataset. But when I try to retrieve this best trial with .get_best_trial() in main.py, the dev loss is 0.63, which correspond to the 2nd best trial as shown by CLI reporter.
Other times, when I try to get the checkpoint of best trial, an error occurs : The checkpoint’s path can’t be found.
I guess this is because I use sync_config=tune.SyncConfig(syncer=None) when I have two nodes (one head node and one worker node). But inside SLURM, without this option, my code wouldn’t work past one iteration.
Is there any way to configure the syncer inside SLURM to avoid those checkpointing problems ?
Thank you very much !
PS : I’m currently using the tune.run() API which I know, is soon-to-be deprecated.
To illustrate a little better my problem:
For launching my code inside SLURM, I use those environment variables :
#SBATCH --job-name=dibiso
#SBATCH --nodes=2
#SBATCH --cpus-per-task=6
#SBATCH --gres=gpu:1
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1
I then use the SLURM submission script from NERSC (slurm-ray-cluster/submit-ray-cluster.sbatch at master · NERSC/slurm-ray-cluster · GitHub)
Here’s how I initialize ray inside main.py:
_redis_password=os.environ["redis_password"])
Here’s an extract of my code inside main.py:
hyperparam_mutations=hyperparam_mutations,
time_attr="training_iteration",
metric="val_loss",
mode="min",
perturbation_interval=1,
)
reporter = CLIReporter(
parameter_columns=parameters_to_display,
metric_columns=["val_loss"],
metric="val_loss",
mode="min"
)
results = tune.run(
partial(training_func, train_dataset=train_dataset, dev_dataset=dev_dataset),
config=config,
scheduler=scheduler,
num_samples=2,
progress_reporter=reporter,
checkpoint_score_attr="training_iteration",
keep_checkpoints_num=1,
name=experience,
checkpoint_at_end=True,
local_dir="./ray_results",
log_to_file=True,
resources_per_trial={"cpu": args.cpu, "gpu": args.gpu},
resume="AUTO",
sync_config=tune.SyncConfig(syncer=None),
)
# Get best trial and its checkpoint
best_trial = results.get_best_trial("val_loss", "min", "all")
best_checkpoint = results.get_best_checkpoint(trial=best_trial, metric="val_loss", mode="min")
best_checkpoint = best_checkpoint.to_dict()
# Evaluate on dev and test datasets
trainer = Trainer(args, train_dataset, dev_dataset, test_dataset)
trainer.model.load_state_dict(best_checkpoint["model_state_dict"])
res_dev = evaluate(config, trainer, dev_dataset)
res_test = evaluate(config, trainer, test_dataset)
And a simplified version of my training_func:
#initialize trainer
trainer = Trainer(config, train_dataset, dev_dataset)
# Initialize train_sampler, train_dataloader and optimizer
...
global_step = 0
tr_loss = 0.0
trainer.model.zero_grad()
loaded_checkpoint = session.get_checkpoint()
if loaded_checkpoint:
dict_checkpoint = loaded_checkpoint.to_dict()
trainer.model.load_state_dict(dict_checkpoint["model_state_dict"])
optimizer.load_state_dict(dict_checkpoint["optimizer_state_dict"])
# Train!
while True: #infinity loop for PBT training
epoch_iterator = tqdm(train_dataloader, desc="Training iteration", disable=True)
for step, batch in enumerate(epoch_iterator):
# Code to train model
...
# Evaluation
val_loss_dev = evaluate(config, trainer, dev_dataset)
# Checkpointing and session.report for PBT
checkpoint = Checkpoint.from_dict({"model_state_dict": trainer.model.state_dict(), "optimizer_state_dict": optimizer.state_dict()})
session.report(
{
"val_loss"=val_loss_dev
},
checkpoint=checkpoint
)