Running ray air for pytorch hyperparameter tuning on SLURM cluster

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am running ray air for pytorch hyperparameter tuning on SLURM cluster and I am getting the following error:

Trial TorchTrainer_e25ae_00000: Error processing event.
ray.exceptions.RayTaskError(SyntaxError): < no detail available >

I am not sure how to debug this issue since no detail is provided. The functionality of the SLURM cluster means that I cannot run the code in an interactive format.

Any advice on how to approach debugging in this case?

SLURM submission script and python code are pasted below.

Thanks,

Harry

SLURM SUBMISSION SCRIPT:

#!/bin/bash
#SBATCH --nodes 1 # Request a minimum of number of nodes be allocated to this job.
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resources”.
#SBATCH --ntasks-per-node=1 # Request 1 process per GPU.
#SBATCH --cpus-per-task=2
#SBATCH --mem=20000M # Buffer expected memory use by 15% due to unforseen spikes in usage
#SBATCH --job-name=“Raytune DPP”
#SBATCH --time=00:8:00 # Specify run time
#SBATCH --output=%N-%j.out # Specify output file format generated by python script
#SBATCH --mail-user=hseely@mail.ubc.ca # Request email notifications
#SBATCH --mail-type=ALL

Load python module, and additional required modules

module load gcc/9.3.0 arrow python/3.10 scipy-stack

#Import pyarrow seperately
python -c “import pyarrow”

Load an existing environment

source $HOME/pytorch_py10/bin/activate

Set environment variables

export NCCL_BLOCKING_WAIT=1 #Set this environment variable if you wish to use the NCCL backend for inter-GPU communication.
export MASTER_ADDR=$(hostname) #Store the master node’s IP address in the MASTER_ADDR environment variable.

Start a single node ray cluster before calling the Python script

ray start --head --node-ip-address=“$MASTER_ADDR” --port=34567 --num-cpus=2 --num-gpus=2 --block &

Wait 10 seconds for ray setup

sleep 10

#Print (echo) info to output file
echo “r$SLURM_NODEID master: $MASTER_ADDR”
echo “r$SLURM_NODEID Launching raytune DDP script”

#Run python script
python raytune_DDP.py

Python script:

Code designed based on this tutorial: Convert existing PyTorch code to Ray AIR — Ray 2.2.0

Modules

import torch
import os

import ray
from ray import train, air, tune
from ray.air import session
from ray.air.config import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer
from ray.train.torch.config import TorchConfig
from ray.tune import CLIReporter
from ray.tune.search.optuna import OptunaSearch
from ray.tune.tuner import Tuner, TuneConfig
from ray.tune.schedulers import ASHAScheduler
from datetime import datetime as dt
import numpy as np

#Import my scripts
from scripts.models.dgcnn import DGCNN
from scripts.models.regressor import Regressor
from scripts.utils.optimizers_and_lr_schedulers import get_optimizer, get_lr_scheduler
from scripts.utils.train import loss_fn, prepare_data

********************************* TRAINING ********************************

def train_epoch(cfg, train_loader, model, optimizer):

# Put model in training mode
model.train()

for step, batch in enumerate(train_loader):

    # Extract xyz coordinates and additional features from batch dict and send to GPU(s)
    input = batch['points'].cuda()
    # Run point clouds through model
    pred = model(input).cuda()

    # Get target and send to correct device for batch parallel
    y = batch['target'].cuda()

    # Compute loss using custom loss function
    train_loss = loss_fn(pred, y, cfg)

    # Backpropagation
    optimizer.zero_grad()
    train_loss.backward()
    optimizer.step()

********************************* VALIDATION ********************************

def val_epoch(cfg, val_loader, model):

# Objects to store val loss
sum_epoch_val_loss = 0

# Check number of steps in validation loop
num_batches = len(val_loader)

# Do not compute gradient for validation section
with torch.no_grad():
    model.eval()
    # Start validation loop
    for step, batch in enumerate(val_loader):

        # Extract xyz coordinates and additional features from batch dict and send to GPU(s)
        input = batch['points'].cuda()
        
        # Run point clouds through model
        pred = model(input).cuda()

        # Get target and send to correct device for batch parallel
        y = batch['target'].cuda()

        # Compute loss using custom loss function
        sum_epoch_val_loss += loss_fn(pred, y, cfg).type(torch.float).item()

    # Compute the epoch mean val loss
    mn_epoch_val_loss = sum_epoch_val_loss / num_batches

return mn_epoch_val_loss

def train_func(trial_cfg: dict):

# ****************NOTE: IMPORTING A GLOBAL CONFIG OBJECT THAT IS UPDATED WITHIN EACH TRIAL *******************
from scripts.config import cfg

# SET STATIC HPs/Configurations
cfg['model_name'] = 'DGCNN'
cfg['num_points'] = 1024
cfg['num_augs'] = 1
cfg['optimizer'] = "Adam"
cfg['allowable_height_dif'] = 1
cfg['batch_size'] = 8
cfg['num_epochs'] = 100
cfg['verbose'] = False

# Update the global config params with the params selected for this trial
cfg.update(trial_cfg)

# Update batch size based on number of GPUs being used for DDP training
cfg['batch_size'] = cfg['batch_size'] // session.get_world_size()

# Load model
model = DGCNN(cfg, k=20)

# Wrap model in regressor
model = Regressor(model=model, num_outputs=4, use_proportions=cfg['use_proportions'])

#Set up the model with ray backend
model = train.torch.prepare_model(model)    

# Load datasets, apply augmentation to train data, set up samplers for DDP
train_loader, _, val_loader, _, = prepare_data(cfg, ddp=False)

# Set optimizer to be used in training
optimizer = get_optimizer(cfg, model)

# Set learning rate scheduler
scheduler = get_lr_scheduler(optimizer, cfg, train_loader)

# Switch on the cuDNN Autotuner
# Slows down training initially, but then speeds it up as CUDA has to find the best way to implement convolutions
if cfg['num_epochs'] > 5:
    # Had to adapt this line due to issues with ray tune based on this github issue: https://github.com/ray-project/ray/issues/5947
    eval('setattr(torch.backends.cudnn, "benchmark", True)')
    
#Create list to store epoch val losses
epoch_val_loss_list = []

# Loop through each epoch
for epoch in range(0, cfg['num_epochs']):

    # Training and validation loops
    train_epoch(cfg, train_loader, model, optimizer)
    mn_epoch_val_loss = val_epoch(cfg, val_loader, model)

    #Update mean epoch val loss
    epoch_val_loss_list.append(mn_epoch_val_loss)

    #Update metrics if they have improved
    if mn_epoch_val_loss <= np.min(epoch_val_loss_list):
        reported_val_loss = mn_epoch_val_loss

    else:
        reported_val_loss = np.min(epoch_val_loss_list)

    # At end of epoch, adjust the learning rate using scheduler (if not none)
    if scheduler is not None:
        if cfg['lr_scheduler'] == "ReduceLROnPlateau":
            scheduler.step(mn_epoch_val_loss)
        else:
            scheduler.step()

    # Report epoch val loss
    session.report({'val_loss': reported_val_loss})

def main(save_dir):

#Get os variables defined in sh submission script
import os

print(f"\n\n\n\n\n Current working directory: \n {os.getcwd()} \n\n\n\n\n")
    
# Re-using the variable you created in your submission (.sh) script here.
# make sure the port number (34567 here) is the same as in the submission script when starting the ray cluster
addr = f"{os.environ['MASTER_ADDR']}:34567" 
ray.init(address=addr, _node_ip_address=os.environ['MASTER_ADDR'])

# Get current time and date when study began
t_now = dt.now().strftime("%Y_%m_%d_%H_%M_%S")

# SET HPs TO BE TUNED
param_space = {"train_loop_config": {  # Note that this is a nested dict, can have different types of HPs

    "lr": tune.loguniform(1e-5, 1e-2),

}}

# Instantiate the reporter (prints progress and results)
reporter = CLIReporter(max_progress_rows=10, max_column_length=10)
reporter.add_metric_column("val_loss")

# TODO: explore search algorithms/schedulers for ray tune
# Define trial scheduler algorithm
search_alg = tune.search.basic_variant.BasicVariantGenerator() #Random search
# Define trial scheduler algorithm
#search_alg = OptunaSearch()

# Define trial scheduler
asha_scheduler = ASHAScheduler(time_attr='training_iteration', max_t=300, grace_period=10, reduction_factor=3,
                               brackets=1)
# median_scheduler = tune.schedulers.MedianStoppingRule(time_attr='training_iteration', grace_period=5, min_samples_required=5, min_time_slice=15)

# Specify trial early stopping function in ray
stopper = tune.stopper.TrialPlateauStopper(metric="val_loss", std=0.1, num_results=10, grace_period=10)

#Configure trainer
trainer = TorchTrainer(

    # Specify training loop
    train_loop_per_worker=train_func,
    # Set up DDP
    torch_config=TorchConfig(backend = 'nccl', #Use nccl backend because running on linux, on windows use gloo
                             init_method = 'tcp'), #Initiation method required for compute canada
    # Configure GPU resource use
    scaling_config=ScalingConfig(
        resources_per_worker={"GPU": 2},
        # Number of distributed workers.
        num_workers=1,
        # Turn on/off GPU.
        use_gpu=True,
        _max_cpu_fraction_per_node=0.8),

    run_config=air.RunConfig(
        # Save directories for checkpoints and experiment results
        name=f"Optuna_Search_w_ASHA_Scheduler_w_Plateau_Early_Stopping_{t_now}",
        local_dir=save_dir,
        # Verbosity and experiment printing to consol
        verbose=1,
        progress_reporter=reporter,
        # Settings for checkpointing trials
        checkpoint_config=air.CheckpointConfig(num_to_keep=None, #All checkpoints are saved
                                               checkpoint_score_attribute="val_loss",
                                               checkpoint_score_order="min", ),
        stop=stopper

    ),
)

tuner = Tuner(
    trainer,
    param_space=param_space,
    tune_config=TuneConfig(num_samples=2,  # use -1 to specify unlimited number of samples
                           metric="val_loss",
                           mode='min',
                           search_alg=search_alg,
                           scheduler=asha_scheduler,
                           time_budget_s=60*4),  # Run time in seconds (multiply by 3600 to convert to hours)

)

# Execute tuning.
results = tuner.fit()

# Fetch the best result and other study information
best_result = results.get_best_result(metric="val_loss", mode="min", scope="all")  # Get best result object
best_config = best_result.config  # Get best trial's hyperparameters
best_logdir = best_result.log_dir  # Get best trial's logdir
best_checkpoint = best_result.checkpoint  # Get best trial's best checkpoint
best_metrics = best_result.metrics  # Get best trial's last results
best_result_df = best_result.metrics_dataframe  # Get best result as
print("Best Result:", best_result)

# Save best study as csv
best_result_df.to_csv(os.path.join(save_dir, "best_trial_df.csv"))

# Save the search algorithm state
#search_alg.save(os.path.join(save_dir, "tuning_algorithm_saved_state.pkl"))

#Run main function
main(save_dir=‘scratch/run_dir/input/output/raytune’)

Hi,
It’s unfortunate that there is not enough details in the error message to work with. However, I do notice that it’s saying “SyntaxError”. Could you run the same script just on single node, single process, to make sure the training function runs ok?

1 Like

Hi thanks for the advice!

I was able to figure out what the issue was once I ran it the training loop on a single node as you suggested.

Quite an easy solution in the end, I am just not used to this style of debugging.

Thanks for the help!

1 Like