Data Type Issues when using

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I am running ray == 2.3.0

I am trying to add automatic mixed precision (AMP) training to my code following this tutorial (link).

Here is the code for one epoch training loop:

def train_epoch(cfg, train_loader, model, optimizer):

    # Put model in training mode
    model.train()

    for step, batch in enumerate(train_loader):

        optimizer.zero_grad()

        # Get target and send to correct device for batch parallel
        y = batch['target'].cuda()

        # Data enters model and prediction is made
        pred = forward_pass(model, batch, rank=y.device)
        assert pred.dtype is torch.float16

        # Compute loss using custom loss function
        train_loss = loss_fn(pred, y, cfg)

        # Normalize the Gradients
        train_loss = train_loss / cfg['n_accumulation_steps']

        train.torch.backward(train_loss)


        if ((step + 1) % cfg['n_accumulation_steps'] == 0) or (step + 1 == len(train_loader)):
            # Update Optimizer
            optimizer.step()

Running this results in the following error:

  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\ray\_private\function_manager.py", line 674, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\ray\util\tracing\tracing_helper.py", line 466, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\ray\train\_internal\worker_group.py", line 31, in __execute
    raise skipped from exception_cause(skipped)
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\ray\train\_internal\utils.py", line 129, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "D:\Sync\DL_Development\Scripts\DL_Biomass\Pytorch\raytune_DDP.py", line 150, in train_func
    train_epoch(cfg, train_loader, model, optimizer)
  File "D:\Sync\DL_Development\Scripts\DL_Biomass\Pytorch\raytune_DDP.py", line 56, in train_epoch
    train.torch.backward(train_loss)
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\ray\train\torch\train_loop_utils.py", line 165, in backward
    get_accelerator(_TorchAccelerator).backward(tensor)
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\ray\train\torch\train_loop_utils.py", line 507, in backward
    self.scaler.scale(tensor).backward()
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\torch\_tensor.py", line 487, in backward
    torch.autograd.backward(
  File "C:\Users\hseely\Miniconda3\envs\pytorch\lib\site-packages\torch\autograd\__init__.py", line 197, in backward
    Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass
RuntimeError: Found dtype Float but expected Half

Interestingly, I am able to run AMP outside of ray in base pytorch with the following code:

for step, batch in enumerate(train_loader):

    optimizer.zero_grad()

    # Runs the forward pass under autocast.
    with torch.autocast(device_type='cuda', dtype=torch.float16, enabled=cfg['use_amp']):

        # Data enters model and prediction is made
        pred = forward_pass(model, batch, rank)

        if cfg['use_amp']:
            # output is float16 because linear layers autocast to float16.
            assert pred.dtype is torch.float16

        # Get target and send to correct device for batch parallel
        y = batch['target'].to(rank)

        # Compute loss using custom loss function
        train_loss = loss_fn(pred, y, cfg)

        # loss is float32 because mse_loss layers autocast to float32.
        assert train_loss.dtype is torch.float32

    # Normalize the Gradients
    train_loss = train_loss / cfg['n_accumulation_steps']
    # Backwards propagation
    scaler.scale(train_loss).backward()

    if ((step + 1) % cfg['n_accumulation_steps'] == 0) or (step + 1 == len(train_loader)):

        # Implement optimizer using the gradient scaler
        scaler.step(optimizer)

        # Updates the scale for next iteration.
        scaler.update()

Because AMP works with the same data/model in base pytorch, I am wondering if it is a ray issue that is causing the data type mismatch?

Please let me know if you have any ideas what might be causing this!

Thanks,

Harry

Hi @Harry_Seely,

Did you do these two steps? Looks like it’s missing in your code.

  1. Adding ray.train.torch.accelerate() with amp=True to the top of your training function.
  2. Wrapping your optimizer with ray.train.torch.prepare_optimizer().

Let me know if this works, thanks!

Hi Justin,

I only posted a section of the original script. Here is the complete script. The 1, and 2 components you mentioned were included.

It follows the outline proposed by ray in terms of:

  1. Adding ray.train.torch.accelerate() with amp=True to the top of your training function.
  2. Wrapping your optimizer with ray.train.torch.prepare_optimizer().
  3. Replacing your backward call with ray.train.torch.backward().

# Code designed based on this tutorial: https://docs.ray.io/en/latest/ray-air/examples/convert_existing_pytorch_code_to_ray_air.html

# Modules
import ray
from ray.train.torch import TorchTrainer
from ray.tune import CLIReporter
from ray import air, train
from ray.air.config import ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train.torch.config import TorchConfig
from ray.air import session
from ray.air.checkpoint import Checkpoint
import os
import numpy as np
import torch
from torch.utils.data import DataLoader
import torch.nn.functional as F
import pandas as pd

# Import my scripts
from scripts.utils.get_model import get_model
from scripts.utils.optimizers_and_lr_schedulers import get_optimizer, get_lr_scheduler
from scripts.utils.point_cloud_dataset import PointCloudsInFilesPreSampled
from scripts.utils.augmentation import augment_data
from scripts.utils.ocnn_custom_utils import CustomCollateBatch

# Global configuration
from scripts.config import cfg


def prepare_data(cfg, ddp=True):
    # Load df that contains differences in height between ground measurement and lidar measurement
    dif_df = pd.read_csv(cfg['lidar_plot_height_dif_filepath'])

    # Filter df to plots that have an absolute height dif greater than tolerance specified in cfg
    dif_df = dif_df[dif_df['abs_dif_z'] > cfg['allowable_height_dif']]

    # Update plot IDs to exclude
    cfg['excluded_plot_ids'] = dif_df['PlotID']

    # Determine whether to use a batch collate function (required for octree/ocnn models)
    if 'OCNN' in cfg['model_name']:
        collate_fn = CustomCollateBatch(batch_size=cfg['batch_size'])
    else:
        collate_fn = None

    # Set up train data sampler and loader
    dataset_train = PointCloudsInFilesPreSampled(cfg, set='train', partition=cfg['partition'])

    # Apply data augmentation to training dataset
    dataset_train = augment_data(cfg, train_dataset=dataset_train, verbose=cfg['verbose'])

    # Determine whether to use a sampler (only use when implementing DDP)
    if ddp:
        train_sampler = torch.utils.data.distributed.DistributedSampler(dataset_train)
    else:
        train_sampler = None
    train_loader = DataLoader(dataset_train, batch_size=cfg['batch_size'], shuffle=False,
                              num_workers=0, sampler=train_sampler, collate_fn=collate_fn,
                              drop_last=True,
                              pin_memory=True)

    # Set up val data sampler and loader
    dataset_val = PointCloudsInFilesPreSampled(cfg, set='val', partition=None)
    # Determine whether to use a sampler (only use when implementing DDP)
    if ddp:
        val_sampler = torch.utils.data.distributed.DistributedSampler(dataset_val)
    else:
        val_sampler = None
    val_loader = DataLoader(dataset_val, batch_size=cfg['batch_size'], shuffle=False,
                            num_workers=0, sampler=val_sampler, collate_fn=collate_fn,
                            drop_last=True,
                            pin_memory=True)

    return train_loader, train_sampler, val_loader, val_sampler


def loss_fn(pred, y):
    # Compute mse loss for each component
    loss_bark = F.mse_loss(pred[:, 0], y[:, 0])
    loss_branch = F.mse_loss(pred[:, 1], y[:, 1])
    loss_foliage = F.mse_loss(pred[:, 2], y[:, 2])
    loss_wood = F.mse_loss(pred[:, 3], y[:, 3])

    # Calculate mse loss using loss for each component relative to its contribution to total biomass
    loss = loss_bark + loss_branch + loss_foliage + loss_wood

    return loss


def forward_pass(model, batch):
    # Send all tensors in the batch dictionary to correct GPU
    batch = {k: v.cuda() if hasattr(v, 'to') else v for k, v in batch.items()}
    pred = model(batch)

    return pred


# ********************************* TRAINING ********************************
def train_epoch(train_loader, model, optimizer):
    sum_epoch_train_loss = 0
    num_batches = len(train_loader)

    # Put model in training mode
    model.train()

    for step, batch in enumerate(train_loader):
        # Data enters model and prediction is made
        pred = forward_pass(model, batch)

        # Get target and send to correct device for batch parallel
        y = batch['target'].cuda()

        # Compute loss using custom loss function
        train_loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        train.torch.backward(train_loss)
        optimizer.step()

        sum_epoch_train_loss += train_loss.type(torch.float).item()

    # Compute the epoch mean train loss
    mn_epoch_train_loss = sum_epoch_train_loss / num_batches

    return mn_epoch_train_loss


# ********************************* VALIDATION ********************************
def val_epoch(val_loader, model):
    sum_epoch_val_loss = 0
    num_batches = len(val_loader)

    # Do not compute gradient for validation section
    with torch.no_grad():
        model.eval()
        # Start validation loop
        for step, batch in enumerate(val_loader):
            # Data enters model and prediction is made
            pred = forward_pass(model, batch)

            # Get target and send to correct device for batch parallel
            y = batch['target'].cuda()

            # Compute loss using custom loss function
            sum_epoch_val_loss += loss_fn(pred, y).type(torch.float).item()

        # Compute the epoch mean val loss
        mn_epoch_val_loss = sum_epoch_val_loss / num_batches

    return mn_epoch_val_loss


def train_func(trial_cfg: dict):
    train.torch.accelerate(amp=True)
    # ****************NOTE: GLOBAL CONFIG OBJECT IS UPDATED WITHIN EACH TRIAL *******************

    # Update the global config params with the params selected for this trial
    cfg.update(trial_cfg)

    # Update batch size based on number of GPUs being used for DDP training
    cfg['batch_size'] = cfg['batch_size'] // session.get_world_size()

    # Load model
    model = get_model(cfg)

    # Set up the model with ray backend
    model = train.torch.prepare_model(model)

    # Load datasets, apply augmentation to train data, set up samplers for DDP
    train_loader, _, val_loader, _, = prepare_data(cfg, ddp=False)

    # Set optimizer to be used in training
    optimizer = get_optimizer(cfg, model)
    optimizer = train.torch.prepare_optimizer(optimizer)

    # Set learning rate scheduler
    scheduler = get_lr_scheduler(optimizer, cfg, train_loader)

    # Switch on the cuDNN Autotuner
    # Slows down training initially, but then speeds it up as CUDA has to find the best way to implement convolutions
    if cfg['num_epochs'] > 5:
        # Had to adapt this line due to issues with ray tune based on this github issue: https://github.com/ray-project/ray/issues/5947
        eval('setattr(torch.backends.cudnn, "benchmark", True)')

    # Create list to store epoch val losses
    epoch_val_loss_list = []

    # Loop through each epoch
    for epoch in range(0, cfg['num_epochs']):

        # Training and validation loops
        train_epoch(train_loader, model, optimizer)
        mn_epoch_val_loss = val_epoch(val_loader, model)

        # Update mean epoch val loss
        epoch_val_loss_list.append(mn_epoch_val_loss)

        # If they have loss has improved update and save model weights
        if mn_epoch_val_loss <= np.min(epoch_val_loss_list):
            reported_val_loss = mn_epoch_val_loss
            state_dict = model.state_dict()
            checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model_weights=state_dict))
        else:
            reported_val_loss = np.min(epoch_val_loss_list)
            checkpoint = None

        # At end of epoch, adjust the learning rate using scheduler (if not none)
        if scheduler is not None:
            if cfg['lr_scheduler'] == "ReduceLROnPlateau":
                scheduler.step(mn_epoch_val_loss)
            else:
                scheduler.step()

        # Report epoch val loss
        session.report(metrics={'val_loss': reported_val_loss}, checkpoint=checkpoint)


def get_trainer(train_func, reporter, study_name, save_dir):
    # Configure trainer
    trainer = TorchTrainer(

        # Specify training loop
        train_loop_per_worker=train_func,
        # Set up DDP
        torch_config=TorchConfig(backend='nccl',  # Use nccl backend because running on linux, on windows use gloo
                                 init_method='tcp'),  # Initiation method required for compute canada
        # Configure GPU resource use
        scaling_config=ScalingConfig(
            resources_per_worker={"GPU": int(os.environ['RAY_NUM_GPU'])},
            # Number of distributed workers.
            num_workers=1,
            # Turn on/off GPU.
            use_gpu=True,
            _max_cpu_fraction_per_node=0.8),

        run_config=air.RunConfig(
            # Save directories for checkpoints and experiment results
            name=study_name,
            local_dir=save_dir,
            # Verbosity and experiment printing to console
            verbose=1,
            progress_reporter=reporter,
            # Settings for checkpointing trials
            checkpoint_config=air.CheckpointConfig(num_to_keep=1,
                                                   checkpoint_score_attribute="val_loss",
                                                   checkpoint_score_order="min", ),
            stop=None

        ),
    )

    return trainer


def main(save_dir, study_name):
    # Re-using the variable you created in your submission (.sh) script here.
    # make sure the port number (34567 here) is the same as in the submission script when starting the ray cluster
    addr = f"{os.environ['MASTER_ADDR']}:34567"
    ray.init(address=addr, _node_ip_address=os.environ['MASTER_ADDR'])

    # Instantiate the reporter (prints progress and results)
    reporter = CLIReporter(max_progress_rows=10, max_column_length=10)
    reporter.add_metric_column("val_loss")

    trainer = get_trainer(train_func, reporter, study_name, save_dir)

    # Execute training.
    results = trainer.fit()

    print(results.metrics)
    print(results)


# Run main function----------------------------------------------------------------------------------------------
if __name__ == "__main__":
    # Get current time and date when study began from environment var
    t_now = str(os.environ['DATETIME'])

    save_dir = "output/raytune/ray_train_runs"
    study_name = f"ray_train_run_{t_now}"
    main(save_dir, study_name)

Is it possible to add the default pytorch AMP code described here to my training loop as an alternative? If so what would be the best way to do that?

Thanks for providing the script @Harry_Seely! Currently working on reproducing. Will get back to you after I’m able to do so.

Looking at how we enable AMP, we should basically be doing that default AMP loop.

1 Like

What’s your cluster setup? How many nodes are you running on? And using GPUs?

I have tried running the same code on my local machine with 2 GPUs, and on a SLURM cluster with 1 node and 4 GPUs. This code is for the SLURM job, but the local code is almost identical. I got the same error on both local and SLURM.

Hi @Harry_Seely,

I tried reproducing yesterday, but didn’t encounter this error. Is it possible to provide a minimal reproducible script that I can run out of the box? Basically, mocking any imports with some dummy models/data. That’d be very helpful for me to debug with!

Hi Justin,

I updated my ray code to use the base pytorch AMP functionality and it works now. Unfortunately, I do not have time to get a minimal reproducible script that I can run out of the box at the moment. When I do get time in the future, I will come back and update this issue with a example you can try.

For now, using the base pytorch AMP will have to suffice.

Here is the updated training loop for reference. I instantiate the scaler in the main trainable.


def train_epoch(cfg, train_loader, model, optimizer, scaler):
    # Put model in training mode
    model.train()

    for step, batch in enumerate(train_loader):

        optimizer.zero_grad()

        # Runs the forward pass under autocast.
        with torch.autocast(device_type='cuda', dtype=torch.float16, enabled=cfg['use_amp']):

            # Get target and send to correct device for batch parallel
            y = batch['target'].cuda()

            # Data enters model and prediction is made
            pred = forward_pass(model, batch, rank=y.device)

            if cfg['use_amp']:
                # output is float16 because linear layers autocast to float16.
                assert pred.dtype is torch.float16

            # Compute loss using custom loss function
            train_loss = loss_fn(pred, y, cfg)

            # loss is float32 because mse_loss layers autocast to float32.
            assert train_loss.dtype is torch.float32

        # Normalize the Gradients
        train_loss = train_loss / cfg['n_accumulation_steps']
        # Backwards propagation
        scaler.scale(train_loss).backward()

        if ((step + 1) % cfg['n_accumulation_steps'] == 0) or (step + 1 == len(train_loader)):
            # Implement optimizer using the gradient scaler
            scaler.step(optimizer)

            # Updates the scale for next iteration.
            scaler.update()

        if ((step + 1) % cfg['n_accumulation_steps'] == 0) or (step + 1 == len(train_loader)):
            # Implement optimizer using the gradient scaler
            scaler.step(optimizer)

            # Updates the scale for next iteration.
            scaler.update()

def train_func(trial_cfg: dict):
    # Instantiate automatic mixed precision scaling gradient scaling to avoid gradients flushing to zero (“underflowing”)
    scaler = torch.cuda.amp.GradScaler()

    # Loop through each epoch
    for epoch in range(0, cfg['num_epochs']):

        # Training and validation loops
        train_epoch(cfg, train_loader, model, optimizer, scaler)



1 Like