Hi Justin,
I only posted a section of the original script. Here is the complete script. The 1, and 2 components you mentioned were included.
It follows the outline proposed by ray in terms of:
- Adding
ray.train.torch.accelerate()
with amp=True
to the top of your training function.
- Wrapping your optimizer with
ray.train.torch.prepare_optimizer()
.
- Replacing your backward call with
ray.train.torch.backward()
.
# Code designed based on this tutorial: https://docs.ray.io/en/latest/ray-air/examples/convert_existing_pytorch_code_to_ray_air.html
# Modules
import ray
from ray.train.torch import TorchTrainer
from ray.tune import CLIReporter
from ray import air, train
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.torch.config import TorchConfig
from ray.air import session
from ray.air.checkpoint import Checkpoint
import os
import numpy as np
import torch
from torch.utils.data import DataLoader
import torch.nn.functional as F
import pandas as pd
# Import my scripts
from scripts.utils.get_model import get_model
from scripts.utils.optimizers_and_lr_schedulers import get_optimizer, get_lr_scheduler
from scripts.utils.point_cloud_dataset import PointCloudsInFilesPreSampled
from scripts.utils.augmentation import augment_data
from scripts.utils.ocnn_custom_utils import CustomCollateBatch
# Global configuration
from scripts.config import cfg
def prepare_data(cfg, ddp=True):
# Load df that contains differences in height between ground measurement and lidar measurement
dif_df = pd.read_csv(cfg['lidar_plot_height_dif_filepath'])
# Filter df to plots that have an absolute height dif greater than tolerance specified in cfg
dif_df = dif_df[dif_df['abs_dif_z'] > cfg['allowable_height_dif']]
# Update plot IDs to exclude
cfg['excluded_plot_ids'] = dif_df['PlotID']
# Determine whether to use a batch collate function (required for octree/ocnn models)
if 'OCNN' in cfg['model_name']:
collate_fn = CustomCollateBatch(batch_size=cfg['batch_size'])
else:
collate_fn = None
# Set up train data sampler and loader
dataset_train = PointCloudsInFilesPreSampled(cfg, set='train', partition=cfg['partition'])
# Apply data augmentation to training dataset
dataset_train = augment_data(cfg, train_dataset=dataset_train, verbose=cfg['verbose'])
# Determine whether to use a sampler (only use when implementing DDP)
if ddp:
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
else:
train_sampler = None
train_loader = DataLoader(dataset_train, batch_size=cfg['batch_size'], shuffle=False,
num_workers=0, sampler=train_sampler, collate_fn=collate_fn,
drop_last=True,
pin_memory=True)
# Set up val data sampler and loader
dataset_val = PointCloudsInFilesPreSampled(cfg, set='val', partition=None)
# Determine whether to use a sampler (only use when implementing DDP)
if ddp:
val_sampler = torch.utils.data.distributed.DistributedSampler(dataset_val)
else:
val_sampler = None
val_loader = DataLoader(dataset_val, batch_size=cfg['batch_size'], shuffle=False,
num_workers=0, sampler=val_sampler, collate_fn=collate_fn,
drop_last=True,
pin_memory=True)
return train_loader, train_sampler, val_loader, val_sampler
def loss_fn(pred, y):
# Compute mse loss for each component
loss_bark = F.mse_loss(pred[:, 0], y[:, 0])
loss_branch = F.mse_loss(pred[:, 1], y[:, 1])
loss_foliage = F.mse_loss(pred[:, 2], y[:, 2])
loss_wood = F.mse_loss(pred[:, 3], y[:, 3])
# Calculate mse loss using loss for each component relative to its contribution to total biomass
loss = loss_bark + loss_branch + loss_foliage + loss_wood
return loss
def forward_pass(model, batch):
# Send all tensors in the batch dictionary to correct GPU
batch = {k: v.cuda() if hasattr(v, 'to') else v for k, v in batch.items()}
pred = model(batch)
return pred
# ********************************* TRAINING ********************************
def train_epoch(train_loader, model, optimizer):
sum_epoch_train_loss = 0
num_batches = len(train_loader)
# Put model in training mode
model.train()
for step, batch in enumerate(train_loader):
# Data enters model and prediction is made
pred = forward_pass(model, batch)
# Get target and send to correct device for batch parallel
y = batch['target'].cuda()
# Compute loss using custom loss function
train_loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
train.torch.backward(train_loss)
optimizer.step()
sum_epoch_train_loss += train_loss.type(torch.float).item()
# Compute the epoch mean train loss
mn_epoch_train_loss = sum_epoch_train_loss / num_batches
return mn_epoch_train_loss
# ********************************* VALIDATION ********************************
def val_epoch(val_loader, model):
sum_epoch_val_loss = 0
num_batches = len(val_loader)
# Do not compute gradient for validation section
with torch.no_grad():
model.eval()
# Start validation loop
for step, batch in enumerate(val_loader):
# Data enters model and prediction is made
pred = forward_pass(model, batch)
# Get target and send to correct device for batch parallel
y = batch['target'].cuda()
# Compute loss using custom loss function
sum_epoch_val_loss += loss_fn(pred, y).type(torch.float).item()
# Compute the epoch mean val loss
mn_epoch_val_loss = sum_epoch_val_loss / num_batches
return mn_epoch_val_loss
def train_func(trial_cfg: dict):
train.torch.accelerate(amp=True)
# ****************NOTE: GLOBAL CONFIG OBJECT IS UPDATED WITHIN EACH TRIAL *******************
# Update the global config params with the params selected for this trial
cfg.update(trial_cfg)
# Update batch size based on number of GPUs being used for DDP training
cfg['batch_size'] = cfg['batch_size'] // session.get_world_size()
# Load model
model = get_model(cfg)
# Set up the model with ray backend
model = train.torch.prepare_model(model)
# Load datasets, apply augmentation to train data, set up samplers for DDP
train_loader, _, val_loader, _, = prepare_data(cfg, ddp=False)
# Set optimizer to be used in training
optimizer = get_optimizer(cfg, model)
optimizer = train.torch.prepare_optimizer(optimizer)
# Set learning rate scheduler
scheduler = get_lr_scheduler(optimizer, cfg, train_loader)
# Switch on the cuDNN Autotuner
# Slows down training initially, but then speeds it up as CUDA has to find the best way to implement convolutions
if cfg['num_epochs'] > 5:
# Had to adapt this line due to issues with ray tune based on this github issue: https://github.com/ray-project/ray/issues/5947
eval('setattr(torch.backends.cudnn, "benchmark", True)')
# Create list to store epoch val losses
epoch_val_loss_list = []
# Loop through each epoch
for epoch in range(0, cfg['num_epochs']):
# Training and validation loops
train_epoch(train_loader, model, optimizer)
mn_epoch_val_loss = val_epoch(val_loader, model)
# Update mean epoch val loss
epoch_val_loss_list.append(mn_epoch_val_loss)
# If they have loss has improved update and save model weights
if mn_epoch_val_loss <= np.min(epoch_val_loss_list):
reported_val_loss = mn_epoch_val_loss
state_dict = model.state_dict()
checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model_weights=state_dict))
else:
reported_val_loss = np.min(epoch_val_loss_list)
checkpoint = None
# At end of epoch, adjust the learning rate using scheduler (if not none)
if scheduler is not None:
if cfg['lr_scheduler'] == "ReduceLROnPlateau":
scheduler.step(mn_epoch_val_loss)
else:
scheduler.step()
# Report epoch val loss
session.report(metrics={'val_loss': reported_val_loss}, checkpoint=checkpoint)
def get_trainer(train_func, reporter, study_name, save_dir):
# Configure trainer
trainer = TorchTrainer(
# Specify training loop
train_loop_per_worker=train_func,
# Set up DDP
torch_config=TorchConfig(backend='nccl', # Use nccl backend because running on linux, on windows use gloo
init_method='tcp'), # Initiation method required for compute canada
# Configure GPU resource use
scaling_config=ScalingConfig(
resources_per_worker={"GPU": int(os.environ['RAY_NUM_GPU'])},
# Number of distributed workers.
num_workers=1,
# Turn on/off GPU.
use_gpu=True,
_max_cpu_fraction_per_node=0.8),
run_config=air.RunConfig(
# Save directories for checkpoints and experiment results
name=study_name,
local_dir=save_dir,
# Verbosity and experiment printing to console
verbose=1,
progress_reporter=reporter,
# Settings for checkpointing trials
checkpoint_config=air.CheckpointConfig(num_to_keep=1,
checkpoint_score_attribute="val_loss",
checkpoint_score_order="min", ),
stop=None
),
)
return trainer
def main(save_dir, study_name):
# Re-using the variable you created in your submission (.sh) script here.
# make sure the port number (34567 here) is the same as in the submission script when starting the ray cluster
addr = f"{os.environ['MASTER_ADDR']}:34567"
ray.init(address=addr, _node_ip_address=os.environ['MASTER_ADDR'])
# Instantiate the reporter (prints progress and results)
reporter = CLIReporter(max_progress_rows=10, max_column_length=10)
reporter.add_metric_column("val_loss")
trainer = get_trainer(train_func, reporter, study_name, save_dir)
# Execute training.
results = trainer.fit()
print(results.metrics)
print(results)
# Run main function----------------------------------------------------------------------------------------------
if __name__ == "__main__":
# Get current time and date when study began from environment var
t_now = str(os.environ['DATETIME'])
save_dir = "output/raytune/ray_train_runs"
study_name = f"ray_train_run_{t_now}"
main(save_dir, study_name)