Ray Tune x SLURM - Problem with checkpoints

Hello everybody !

I hope you’re having a great day.

I’m meeting some trouble with checkpoints using Ray Tune for population based training with SLURM workload manager. More specifically, once an experiment is finished, sometimes the trial retrieved by .get_best_trial() is not the best trial shown in the CLI reporter.

For example, for 8 trials, the best had a loss of 0.41 on dev dataset. But when I try to retrieve this best trial with .get_best_trial() in main.py, the dev loss is 0.63, which correspond to the 2nd best trial as shown by CLI reporter.
Other times, when I try to get the checkpoint of best trial, an error occurs : The checkpoint’s path can’t be found.

I guess this is because I use sync_config=tune.SyncConfig(syncer=None) when I have two nodes (one head node and one worker node). But inside SLURM, without this option, my code wouldn’t work past one iteration.

Is there any way to configure the syncer inside SLURM to avoid those checkpointing problems ?

Thank you very much !

PS : I’m currently using the tune.run() API which I know, is soon-to-be deprecated.


To illustrate a little better my problem:

For launching my code inside SLURM, I use those environment variables :

#SBATCH --job-name=dibiso
#SBATCH --nodes=2 
#SBATCH --cpus-per-task=6
#SBATCH --gres=gpu:1
#SBATCH --exclusive
#SBATCH --ntasks-per-node=1

I then use the SLURM submission script from NERSC (slurm-ray-cluster/submit-ray-cluster.sbatch at master · NERSC/slurm-ray-cluster · GitHub)

Here’s how I initialize ray inside main.py:

                _redis_password=os.environ["redis_password"])

Here’s an extract of my code inside main.py:

             hyperparam_mutations=hyperparam_mutations,
             time_attr="training_iteration",
             metric="val_loss",
             mode="min",
             perturbation_interval=1,
         )
 
         reporter = CLIReporter(
             parameter_columns=parameters_to_display,
             metric_columns=["val_loss"],
             metric="val_loss",
             mode="min"
         )
 
         results = tune.run(
             partial(training_func, train_dataset=train_dataset, dev_dataset=dev_dataset),
             config=config,
             scheduler=scheduler,
             num_samples=2,
             progress_reporter=reporter,
             checkpoint_score_attr="training_iteration",
             keep_checkpoints_num=1,
             name=experience,
             checkpoint_at_end=True,
             local_dir="./ray_results",
             log_to_file=True, 
             resources_per_trial={"cpu": args.cpu, "gpu": args.gpu},
             resume="AUTO", 
             sync_config=tune.SyncConfig(syncer=None),
         )

       # Get best trial and its checkpoint
        best_trial = results.get_best_trial("val_loss", "min", "all")
        best_checkpoint = results.get_best_checkpoint(trial=best_trial, metric="val_loss", mode="min")
        best_checkpoint = best_checkpoint.to_dict()

        # Evaluate on dev and test datasets
        trainer = Trainer(args, train_dataset, dev_dataset, test_dataset)
        trainer.model.load_state_dict(best_checkpoint["model_state_dict"])
        res_dev = evaluate(config, trainer, dev_dataset)
        res_test = evaluate(config, trainer, test_dataset)

And a simplified version of my training_func:


    #initialize trainer
    trainer = Trainer(config, train_dataset, dev_dataset)

    # Initialize train_sampler, train_dataloader and optimizer
    ...

    global_step = 0
    tr_loss = 0.0
    trainer.model.zero_grad()

    loaded_checkpoint = session.get_checkpoint()
    if loaded_checkpoint:
        dict_checkpoint = loaded_checkpoint.to_dict()
        trainer.model.load_state_dict(dict_checkpoint["model_state_dict"])
        optimizer.load_state_dict(dict_checkpoint["optimizer_state_dict"])

    # Train!
    while True: #infinity loop for PBT training
        epoch_iterator = tqdm(train_dataloader, desc="Training iteration", disable=True)
        for step, batch in enumerate(epoch_iterator):
            # Code to train model
            ...
        # Evaluation 
        val_loss_dev = evaluate(config, trainer, dev_dataset)
        # Checkpointing and session.report for PBT
        checkpoint = Checkpoint.from_dict({"model_state_dict": trainer.model.state_dict(), "optimizer_state_dict": optimizer.state_dict()})
        session.report(
            {
                "val_loss"=val_loss_dev
            },
            checkpoint=checkpoint
        )

Hi @Alana, this user guide will help you out. It gives 3 options for configuring storage with Tune (head node local filesystem, cloud bucket, or NFS): How to Configure Storage Options for a Distributed Tune Experiment? — Ray 2.3.0.

Let me know if you have any questions!

1 Like

Thanks @justinvyu, I’ll try the " Configure Tune without external persistent storage" and tell you how it goes =)

1 Like

Sorry for the late answer.

So I finally got something working for me, thanks to a github issue (but I don’t remember which one, sorry). The problem I had was probably linked to a syncing problem and also because I kept a certain number of checkpoints for each trial. Sometimes, a checkpoint was deleted but the info didn’t went to the head. I used the new Tuner API, reactivated the syncing between nodes and also set checkpointing system num_to_keep to None.

Here’s something working for me, using the new API, if it can help anyone :slight_smile: :

       # Population Based Training scheduler
        scheduler = PopulationBasedTraining(
            hyperparam_mutations=hyperparam_mutations,
            time_attr="training_iteration",
            metric="val_loss",
            mode="min",
            perturbation_interval=args.perturb_interval,
        )

        # Reporter for log
        reporter = CLIReporter(
            parameter_columns=parameters_to_display,
            metric_columns=["val_loss", "training_iteration"],
            metric="val_loss",
            mode="min"
        )

        ### NEW Ray Tune API
        tuner = tune.Tuner(
            tune.with_resources(
                partial(train_joint, args=args, train_dataset=train_dataset, dev_dataset=dev_dataset),
                {"cpu": args.cpu, "gpu": args.gpu},
            ),
            param_space=config,
            run_config=air.RunConfig(
                name=my_experiment,
                local_dir="./ray_results",
                log_to_file=True,
                checkpoint_config=air.CheckpointConfig(
                    checkpoint_score_attribute="val_loss",
                    checkpoint_at_end=True,
                    num_to_keep=None,
                ),
                progress_reporter=reporter,
            ),
            tune_config=tune.TuneConfig(
                scheduler=scheduler,
                num_samples=args.num_samples,
            ),
        )

        results = tuner.fit()

        best_result = results.get_best_result(metric="val_loss", mode="min")

I may need a little more testing to be 100% sure that it’s working as intended, but right now it’s promising

@Alana Are we sorted on this problem or we still having issues?
cc @justinvyu

I still need a little more testing to be 100% sur but with short tests, I didn’t meet any unexpected checkpointing errors