How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I am running ray air for pytorch hyperparameter tuning on SLURM cluster and I am getting the following error:
Trial TorchTrainer_e25ae_00000: Error processing event.
ray.exceptions.RayTaskError(SyntaxError): < no detail available >
I am not sure how to debug this issue since no detail is provided. The functionality of the SLURM cluster means that I cannot run the code in an interactive format.
Any advice on how to approach debugging in this case?
SLURM submission script and python code are pasted below.
Thanks,
Harry
SLURM SUBMISSION SCRIPT:
#!/bin/bash
#SBATCH --nodes 1 # Request a minimum of number of nodes be allocated to this job.
#SBATCH --gres=gpu:2 # Request 2 GPU "generic resourcesâ.
#SBATCH --ntasks-per-node=1 # Request 1 process per GPU.
#SBATCH --cpus-per-task=2
#SBATCH --mem=20000M # Buffer expected memory use by 15% due to unforseen spikes in usage
#SBATCH --job-name=âRaytune DPPâ
#SBATCH --time=00:8:00 # Specify run time
#SBATCH --output=%N-%j.out # Specify output file format generated by python script
#SBATCH --mail-user=hseely@mail.ubc.ca # Request email notifications
#SBATCH --mail-type=ALL
Load python module, and additional required modules
module load gcc/9.3.0 arrow python/3.10 scipy-stack
#Import pyarrow seperately
python -c âimport pyarrowâ
Load an existing environment
source $HOME/pytorch_py10/bin/activate
Set environment variables
export NCCL_BLOCKING_WAIT=1 #Set this environment variable if you wish to use the NCCL backend for inter-GPU communication.
export MASTER_ADDR=$(hostname) #Store the master nodeâs IP address in the MASTER_ADDR environment variable.
Start a single node ray cluster before calling the Python script
ray start --head --node-ip-address=â$MASTER_ADDRâ --port=34567 --num-cpus=2 --num-gpus=2 --block &
Wait 10 seconds for ray setup
sleep 10
#Print (echo) info to output file
echo âr$SLURM_NODEID master: $MASTER_ADDRâ
echo âr$SLURM_NODEID Launching raytune DDP scriptâ
#Run python script
python raytune_DDP.py
Python script:
Code designed based on this tutorial: Convert existing PyTorch code to Ray AIR â Ray 2.2.0
Modules
import torch
import os
import ray
from ray import train, air, tune
from ray.air import session
from ray.air.config import ScalingConfig
from ray.train.torch import TorchCheckpoint, TorchTrainer
from ray.train.torch.config import TorchConfig
from ray.tune import CLIReporter
from ray.tune.search.optuna import OptunaSearch
from ray.tune.tuner import Tuner, TuneConfig
from ray.tune.schedulers import ASHAScheduler
from datetime import datetime as dt
import numpy as np
#Import my scripts
from scripts.models.dgcnn import DGCNN
from scripts.models.regressor import Regressor
from scripts.utils.optimizers_and_lr_schedulers import get_optimizer, get_lr_scheduler
from scripts.utils.train import loss_fn, prepare_data
********************************* TRAINING ********************************
def train_epoch(cfg, train_loader, model, optimizer):
# Put model in training mode
model.train()
for step, batch in enumerate(train_loader):
# Extract xyz coordinates and additional features from batch dict and send to GPU(s)
input = batch['points'].cuda()
# Run point clouds through model
pred = model(input).cuda()
# Get target and send to correct device for batch parallel
y = batch['target'].cuda()
# Compute loss using custom loss function
train_loss = loss_fn(pred, y, cfg)
# Backpropagation
optimizer.zero_grad()
train_loss.backward()
optimizer.step()
********************************* VALIDATION ********************************
def val_epoch(cfg, val_loader, model):
# Objects to store val loss
sum_epoch_val_loss = 0
# Check number of steps in validation loop
num_batches = len(val_loader)
# Do not compute gradient for validation section
with torch.no_grad():
model.eval()
# Start validation loop
for step, batch in enumerate(val_loader):
# Extract xyz coordinates and additional features from batch dict and send to GPU(s)
input = batch['points'].cuda()
# Run point clouds through model
pred = model(input).cuda()
# Get target and send to correct device for batch parallel
y = batch['target'].cuda()
# Compute loss using custom loss function
sum_epoch_val_loss += loss_fn(pred, y, cfg).type(torch.float).item()
# Compute the epoch mean val loss
mn_epoch_val_loss = sum_epoch_val_loss / num_batches
return mn_epoch_val_loss
def train_func(trial_cfg: dict):
# ****************NOTE: IMPORTING A GLOBAL CONFIG OBJECT THAT IS UPDATED WITHIN EACH TRIAL *******************
from scripts.config import cfg
# SET STATIC HPs/Configurations
cfg['model_name'] = 'DGCNN'
cfg['num_points'] = 1024
cfg['num_augs'] = 1
cfg['optimizer'] = "Adam"
cfg['allowable_height_dif'] = 1
cfg['batch_size'] = 8
cfg['num_epochs'] = 100
cfg['verbose'] = False
# Update the global config params with the params selected for this trial
cfg.update(trial_cfg)
# Update batch size based on number of GPUs being used for DDP training
cfg['batch_size'] = cfg['batch_size'] // session.get_world_size()
# Load model
model = DGCNN(cfg, k=20)
# Wrap model in regressor
model = Regressor(model=model, num_outputs=4, use_proportions=cfg['use_proportions'])
#Set up the model with ray backend
model = train.torch.prepare_model(model)
# Load datasets, apply augmentation to train data, set up samplers for DDP
train_loader, _, val_loader, _, = prepare_data(cfg, ddp=False)
# Set optimizer to be used in training
optimizer = get_optimizer(cfg, model)
# Set learning rate scheduler
scheduler = get_lr_scheduler(optimizer, cfg, train_loader)
# Switch on the cuDNN Autotuner
# Slows down training initially, but then speeds it up as CUDA has to find the best way to implement convolutions
if cfg['num_epochs'] > 5:
# Had to adapt this line due to issues with ray tune based on this github issue: https://github.com/ray-project/ray/issues/5947
eval('setattr(torch.backends.cudnn, "benchmark", True)')
#Create list to store epoch val losses
epoch_val_loss_list = []
# Loop through each epoch
for epoch in range(0, cfg['num_epochs']):
# Training and validation loops
train_epoch(cfg, train_loader, model, optimizer)
mn_epoch_val_loss = val_epoch(cfg, val_loader, model)
#Update mean epoch val loss
epoch_val_loss_list.append(mn_epoch_val_loss)
#Update metrics if they have improved
if mn_epoch_val_loss <= np.min(epoch_val_loss_list):
reported_val_loss = mn_epoch_val_loss
else:
reported_val_loss = np.min(epoch_val_loss_list)
# At end of epoch, adjust the learning rate using scheduler (if not none)
if scheduler is not None:
if cfg['lr_scheduler'] == "ReduceLROnPlateau":
scheduler.step(mn_epoch_val_loss)
else:
scheduler.step()
# Report epoch val loss
session.report({'val_loss': reported_val_loss})
def main(save_dir):
#Get os variables defined in sh submission script
import os
print(f"\n\n\n\n\n Current working directory: \n {os.getcwd()} \n\n\n\n\n")
# Re-using the variable you created in your submission (.sh) script here.
# make sure the port number (34567 here) is the same as in the submission script when starting the ray cluster
addr = f"{os.environ['MASTER_ADDR']}:34567"
ray.init(address=addr, _node_ip_address=os.environ['MASTER_ADDR'])
# Get current time and date when study began
t_now = dt.now().strftime("%Y_%m_%d_%H_%M_%S")
# SET HPs TO BE TUNED
param_space = {"train_loop_config": { # Note that this is a nested dict, can have different types of HPs
"lr": tune.loguniform(1e-5, 1e-2),
}}
# Instantiate the reporter (prints progress and results)
reporter = CLIReporter(max_progress_rows=10, max_column_length=10)
reporter.add_metric_column("val_loss")
# TODO: explore search algorithms/schedulers for ray tune
# Define trial scheduler algorithm
search_alg = tune.search.basic_variant.BasicVariantGenerator() #Random search
# Define trial scheduler algorithm
#search_alg = OptunaSearch()
# Define trial scheduler
asha_scheduler = ASHAScheduler(time_attr='training_iteration', max_t=300, grace_period=10, reduction_factor=3,
brackets=1)
# median_scheduler = tune.schedulers.MedianStoppingRule(time_attr='training_iteration', grace_period=5, min_samples_required=5, min_time_slice=15)
# Specify trial early stopping function in ray
stopper = tune.stopper.TrialPlateauStopper(metric="val_loss", std=0.1, num_results=10, grace_period=10)
#Configure trainer
trainer = TorchTrainer(
# Specify training loop
train_loop_per_worker=train_func,
# Set up DDP
torch_config=TorchConfig(backend = 'nccl', #Use nccl backend because running on linux, on windows use gloo
init_method = 'tcp'), #Initiation method required for compute canada
# Configure GPU resource use
scaling_config=ScalingConfig(
resources_per_worker={"GPU": 2},
# Number of distributed workers.
num_workers=1,
# Turn on/off GPU.
use_gpu=True,
_max_cpu_fraction_per_node=0.8),
run_config=air.RunConfig(
# Save directories for checkpoints and experiment results
name=f"Optuna_Search_w_ASHA_Scheduler_w_Plateau_Early_Stopping_{t_now}",
local_dir=save_dir,
# Verbosity and experiment printing to consol
verbose=1,
progress_reporter=reporter,
# Settings for checkpointing trials
checkpoint_config=air.CheckpointConfig(num_to_keep=None, #All checkpoints are saved
checkpoint_score_attribute="val_loss",
checkpoint_score_order="min", ),
stop=stopper
),
)
tuner = Tuner(
trainer,
param_space=param_space,
tune_config=TuneConfig(num_samples=2, # use -1 to specify unlimited number of samples
metric="val_loss",
mode='min',
search_alg=search_alg,
scheduler=asha_scheduler,
time_budget_s=60*4), # Run time in seconds (multiply by 3600 to convert to hours)
)
# Execute tuning.
results = tuner.fit()
# Fetch the best result and other study information
best_result = results.get_best_result(metric="val_loss", mode="min", scope="all") # Get best result object
best_config = best_result.config # Get best trial's hyperparameters
best_logdir = best_result.log_dir # Get best trial's logdir
best_checkpoint = best_result.checkpoint # Get best trial's best checkpoint
best_metrics = best_result.metrics # Get best trial's last results
best_result_df = best_result.metrics_dataframe # Get best result as
print("Best Result:", best_result)
# Save best study as csv
best_result_df.to_csv(os.path.join(save_dir, "best_trial_df.csv"))
# Save the search algorithm state
#search_alg.save(os.path.join(save_dir, "tuning_algorithm_saved_state.pkl"))
#Run main function
main(save_dir=âscratch/run_dir/input/output/raytuneâ)