TuneReportCallback is unable to read PyTorch Lightning metrics during Tuner.fit(...)

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m running Ray Tune 2.2.0 with a PyTorch Lightning module and found that tune.report(…) inside TuneReportCallback is unable to relay metrics back to the Ray session. Diving deeper, I found that the Ray session is disabled during the training/validation steps of the PyTorch Lightning module.

This is the error I have been receiving:

Session not detected. You should not be calling report outside tuner.fit() or while using the class API.

ValueError: Trial returned a result which did not include the specified metric(s) ptl/val_loss that tune.TuneConfig() expects. Make sure your calls to tune.report() include the metric, or set the TUNE_DISABLE_STRICT_METRIC_CHECKING environment variable to 1.

This is a simplified version of my code:

def inside_tune():
    return ray.tune.is_session_enabled()

class HPDeepRegressionModel(pl.LightningModule):

    def __init__(self, cfg):

        print("Initializing HpDeepRegressionModel...")
        super().__init__()
        self.save_hyperparameters()
        print("Init: Inside raytune session ", inside_tune())

        # hyperparameters
        self.in_length = cfg['in_length']
        self.num_blocks = cfg['num_blocks']
        self.layer_length = cfg['layer_width']
        self.out_length = cfg['out_length']
        self.dropout_rate = cfg['dropout']
        self.optimizer_lr = cfg['optimizer_lr']

        self._create_layers()

        self.init_metrics()

        print("Using default MSELoss...")
        self.loss = nn.MSELoss() 
        print("Init end: Inside raytune session ", inside_tune())

    def _create_layers(self):

        self.layers = nn.ModuleList()

        self.layers.append(nn.Linear(self.in_length, self.layer_length))
        for block_idx in range(0, self.num_blocks-1):
                self.layers.append(nn.Linear(self.layer_length, self.layer_length))
            self.layers.append(nn.ReLU())
            self.layers.append(nn.Dropout(self.dropout_rate))
        self.layers.append(nn.Linear(self.layer_length, self.out_length))

    def init_metrics(self):

        #   loss metrics
        self.train_mse_loss = MeanSquaredError()
        self.val_mse_loss = MeanSquaredError()
        self.train_mae_loss = MeanAbsoluteError()
        self.val_mae_loss = MeanAbsoluteError()

    def _shared_forward(self, x: Dict[str, torch.Tensor]):

        out = x['features']
        for i, layer in enumerate(self.layers):
            out = layer(out)
        return out

    def forward(self, x: Dict[str, torch.Tensor]):

        return self._shared_forward(x)

    def training_step(self, batch, batch_idx):
        
        print("Train: Inside raytune session ", inside_tune())

        model_output = self.forward(batch)
        batch_size = batch['target'].shape[0]
        targets = batch['target'].to(model_output.dtype)

        loss = self.loss(model_output, targets)

        self.log("train_loss", loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.train_mse_loss(model_output, targets)
        self.log("train_mse_loss", self.train_mse_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.train_mae_loss(model_output, targets)
        self.log("train_mae_loss", self.train_mae_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        print("Train end: Inside raytune session ", inside_tune())

        return loss

    def validation_step(self, batch, batch_idx):
        model_output = self.forward(batch)
        batch_size = batch['target'].shape[0]
        targets = batch['target'].to(model_output.dtype)

        print("Valid: Inside raytune session ", inside_tune())

        loss = self.loss(model_output, targets)
        self.log("val_loss", loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.val_mse_loss(model_output, targets)
        self.log("valid_mse_loss", self.val_mse_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.val_mae_loss(model_output, targets)
        self.log("valid_mae_loss", self.val_mae_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        return loss

    def validation_epoch_end(self, output):
        print("On validation epoch end: ray session: ", inside_tune())
        self.log("ptl/val_loss", float(output[0]))

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.optimizer_lr)

    def train_epoch_start(self):
        print("On train epoch start: ray session: ", inside_tune())

    def train_epoch_end(self):
        print("On train epoch end: ray session: ", inside_tune())
def inside_tune():
    return ray.tune.is_session_enabled()

def main(cfg, verbose: bool = True):

    logger = TensorBoardLogger(save_dir=cfg.logging.path, version=cfg.logging.name, name="")

    dm = instantiate(cfg.dataset) 

    # Ray Tune hyperparameter search
    results = ray_tune_train(cfg, dm, logger, verbose=verbose)

    return results

def train_fn(model_cfg: Dict, cfg, data_module: pl.LightningDataModule, logger: TensorBoardLogger, verbose: bool = True):

    model = HPDeepRegressionModel(model_cfg)
    print("Initialized model: Inside raytune session: ", inside_tune())

    lr_monitor = LearningRateMonitor(logging_interval='step')
    checkpoint_callback = ModelCheckpoint(save_top_k=cfg.checkpoint.save_top_k, monitor="epoch",
                                          mode="max", filename="model-{epoch}")
    tune_report_callback = TuneReportCallback(["ptl/val_loss"],
                                              on="validation_end")
    
    print("Initializing trainer: Inside raytune session: ", inside_tune())
    trainer = pl.Trainer(
        **cfg.trainer,
        logger=logger,
        callbacks=[lr_monitor, checkpoint_callback, tune_report_callback]
    )
    print("Initialized trainer: Inside raytune session: ", inside_tune())

    print("Trainer fit: Inside raytune session: ", inside_tune())
    trainer.fit(model, datamodule=data_module, ckpt_path=get_checkpoint_path(cfg))

    print("Trainer callback metrics: ", trainer.callback_metrics)

def ray_tune_train(cfg, data_module: pl.LightningDataModule, logger: TensorBoardLogger, verbose: bool = True):

    print("--Started raytune session: ", inside_tune())

    num_epochs = cfg.trainer.max_epochs
    num_gpus = 0
    gpus_per_trial = 0
    num_hp_samples = 5     # number of times to sample from hyperparameter space

    hp_configs = {
        "layer_width": tune.randint(8, 32),
        "num_blocks": tune.randint(1, 32),
        "dropout": tune.uniform(0, 0.5),
        "optimizer_lr": tune.loguniform(1e-4, 1e-1),
        "in_length": cfg.model.nn.in_length,
        "out_length": cfg.model.nn.out_length,
    }

    train_fn_with_parameters = tune.with_parameters(train_fn,
                                                    cfg=cfg,
                                                    data_module=data_module,
                                                    logger=logger,
                                                    verbose=verbose
                                                    )

    scheduler = ASHAScheduler(
        max_t=num_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["layer_width", "num_blocks", "dropout", "optimizer_lr"],
        metric_columns=["ptl/val_loss", "training_iteration"])

    tuner = tune.Tuner(
        train_fn_with_parameters,
        tune_config=tune.TuneConfig(
            metric="ptl/val_loss",
            mode="min",
            scheduler=scheduler,
            num_samples=num_hp_samples,
        ),
        run_config=air.RunConfig(
            local_dir="./ray_results",
            name="tune_asha_regression",
            progress_reporter=reporter,
        ),
        param_space=hp_configs,
    )
  
    results = tuner.fit()
   
    print("Best hyperparameters found were: ", results.get_best_result().config)

    return results

if __name__ == "__main__":

    main(cfg)

From the checkpoint flags with inside_tune(), I received the following print statements:

Init: Inside raytune session  True
Init end: Inside raytune session  True
Initialized model: Inside raytune session:  True
Initializing trainer: Inside raytune session:  True
Initialized trainer: Inside raytune session:  True
Trainer fit: Inside raytune session:  True
Train: Inside raytune session  False
Train end: Inside raytune session  False
Valid: Inside raytune session  False
On validation epoch end: ray session:  False
Trainer fit end: Inside raytune session:  True

Any insights on how to solve this issue would be of great help. Thanks!

cc: @matthewdeng @Jiao_Dong
btw, we should move this issue to Ray AIR category.

1 Like

Hey @worker do you happen to have a full script I can try to reproduce this with?

Hi @matthewdeng, thanks for your help. Here’s two files (for import and running script) which can be used to reproduce the error.

rt_models.py: Contains Lightning DataModule and Lightning Module

import numpy as np
import pytorch_lightning as pl
import ray
import torch
import torch.nn as nn

from torch.utils.data import random_split, DataLoader, TensorDataset
from torchmetrics import MeanSquaredError, MeanAbsoluteError
from typing import Dict, List


DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

def inside_tune():
    return ray.tune.is_session_enabled()


class RTDataModule(pl.LightningDataModule):

    def __init__(self):
        super().__init__()
        self.batch_size = 32

    def prepare_data(self):
        a = np.random.uniform(0, 500, 500)
        b = np.random.normal(0, 2, len(a))

        c = a + b
        X = np.transpose(np.array([a, b]))          # 500 rows with 2 features each

        # Converting numpy array to Tensor
        self.x_train_tensor = torch.from_numpy(X).float().to(DEVICE)
        self.y_train_tensor = torch.from_numpy(c).float().to(DEVICE)

        training_dataset = TensorDataset(self.x_train_tensor, self.y_train_tensor)

        self.training_dataset = training_dataset

    def setup(self, stage=None):
        data = self.training_dataset
        self.train_data, self.val_data = random_split(data, [400, 100])

    def train_dataloader(self):
        return DataLoader(self.train_data, batch_size=self.batch_size)

    def val_dataloader(self):
        return DataLoader(self.val_data, batch_size=self.batch_size)


class RTDeepRegressionModel(pl.LightningModule):

    def __init__(self, cfg: Dict):
        print("Initializing RTDeepRegressionModel...")
        super().__init__()
        self.save_hyperparameters()
        print("Init: Inside raytune session ", inside_tune())

        # hyperparameters
        self.in_length = cfg['in_length']
        self.num_blocks = cfg['num_blocks']
        self.layer_length = cfg['layer_width']
        self.out_length = cfg['out_length']
        self.dropout_rate = cfg['dropout']
        self.optimizer_lr = cfg['optimizer_lr']

        self._create_layers()

        self.init_metrics()

        print("Using default MSELoss...")
        self.loss = nn.MSELoss()
        print("Init end: Inside raytune session ", inside_tune())

    def _create_layers(self):
        self.layers = nn.ModuleList()

        self.layers.append(nn.Linear(self.in_length, self.layer_length))
        for block_idx in range(0, self.num_blocks - 1):
            self.layers.append(nn.Linear(self.layer_length, self.layer_length))
        self.layers.append(nn.ReLU())
        self.layers.append(nn.Dropout(self.dropout_rate))

        self.layers.append(nn.Linear(self.layer_length, self.out_length))

    def init_metrics(self):
        #   loss metrics
        self.train_mse_loss = MeanSquaredError()
        self.val_mse_loss = MeanSquaredError()
        self.train_mae_loss = MeanAbsoluteError()
        self.val_mae_loss = MeanAbsoluteError()

    def _shared_forward(self, x: TensorDataset):

        out = x[0]          # first index is features
        for i, layer in enumerate(self.layers):
            out = layer(out)
        return out

    def forward(self, x: TensorDataset):
        return self._shared_forward(x)

    def training_step(self, batch: TensorDataset, batch_idx):
        print("Train: Inside raytune session ", inside_tune())

        model_output = self.forward(batch)
        targets = batch[-1].unsqueeze(1)        # add dimension in axis=1 to match model_output

        batch_size = targets.shape[0]
        targets = targets.to(model_output.dtype)

        loss = self.loss(model_output, targets)

        self.log("train_loss", loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.train_mse_loss(model_output, targets)
        self.log("train_mse_loss", self.train_mse_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.train_mae_loss(model_output, targets)
        self.log("train_mae_loss", self.train_mae_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        print("Train end: Inside raytune session ", inside_tune())

        return loss

    def validation_step(self, batch, batch_idx):
        model_output = self.forward(batch)
        targets = batch[-1].unsqueeze(1)

        batch_size = targets.shape[0]
        targets = targets.to(model_output.dtype)

        print("Valid: Inside raytune session ", inside_tune())

        loss = self.loss(model_output, targets)
        self.log("val_loss", loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.val_mse_loss(model_output, targets)
        self.log("valid_mse_loss", self.val_mse_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        self.val_mae_loss(model_output, targets)
        self.log("valid_mae_loss", self.val_mae_loss, on_step=True, on_epoch=True,
                 prog_bar=True, logger=True, batch_size=batch_size)

        return loss

    def validation_epoch_end(self, output):
        print("On validation epoch end: ray session: ", inside_tune())
        self.log("ptl/val_loss", float(output[0]))

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.optimizer_lr)

    def train_epoch_start(self):
        print("On train epoch start: ray session: ", inside_tune())

    def train_epoch_end(self):
        print("On train epoch end: ray session: ", inside_tune())

And the Python script raytune_full_script.py:

import pytorch_lightning as pl
import ray

from rt_models import RTDeepRegressionModel, RTDataModule
from ray import air, tune
from typing import Dict, List
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray.tune.integration.pytorch_lightning import TuneReportCallback


def inside_tune():
    return ray.tune.is_session_enabled()

def main(cfg: Dict, verbose: bool = True):
    logger = TensorBoardLogger(save_dir=cfg['logging_path'], version=cfg['logging_name'], name="")

    dm = RTDataModule()
    dm.prepare_data()           # make example data available

    # Ray Tune hyperparameter search
    results = ray_tune_train(cfg, dm, logger, verbose=verbose)

    return results


def train_fn(model_cfg: Dict, cfg: Dict, data_module: pl.LightningDataModule, logger: TensorBoardLogger,
             verbose: bool = True):

    model = RTDeepRegressionModel(model_cfg)
    print("Initialized model: Inside raytune session: ", inside_tune())

    lr_monitor = LearningRateMonitor(logging_interval='step')
    checkpoint_callback = ModelCheckpoint(save_top_k=cfg['checkpoint_save_top_k'], monitor="epoch",
                                          mode="max", filename="model-{epoch}")
    tune_report_callback = TuneReportCallback(["ptl/val_loss"],
                                              on="validation_end")

    print("Initializing trainer: Inside raytune session: ", inside_tune())
    trainer = pl.Trainer(
        max_epochs=5,
        check_val_every_n_epoch=2,
        log_every_n_steps=100,
        devices=4,
        accelerator='cpu',
        fast_dev_run=False,
        logger=logger,
        callbacks=[lr_monitor, checkpoint_callback, tune_report_callback]
    )
    print("Initialized trainer: Inside raytune session: ", inside_tune())

    print("Trainer fit: Inside raytune session: ", inside_tune())
    trainer.fit(RTDeepRegressionModel(model_cfg), datamodule=data_module)          # ckpt_path=get_checkpoint_path(cfg)

    print("Trainer callback metrics: ", trainer.callback_metrics)


def ray_tune_train(cfg, data_module: pl.LightningDataModule, logger: TensorBoardLogger, verbose: bool = True):
    print("--Started raytune session: ", inside_tune())

    num_epochs = cfg['trainer_max_epochs']
    num_hp_samples = 5  # number of times to sample from hyperparameter space

    hp_configs = {
        "layer_width": tune.randint(8, 32),
        "num_blocks": tune.randint(1, 32),
        "dropout": tune.uniform(0, 0.5),
        "optimizer_lr": tune.loguniform(1e-4, 1e-1),
        "in_length": 2,         # 2 input features
        "out_length": 1,        # 1 output (regression)
    }

    train_fn_with_parameters = tune.with_parameters(train_fn,
                                                    cfg=cfg,
                                                    data_module=data_module,
                                                    logger=logger,
                                                    verbose=verbose
                                                    )

    scheduler = ASHAScheduler(
        max_t=num_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["layer_width", "num_blocks", "dropout", "optimizer_lr"],
        metric_columns=["ptl/val_loss", "training_iteration"])

    tuner = tune.Tuner(
        train_fn_with_parameters,
        tune_config=tune.TuneConfig(
            metric="ptl/val_loss",
            mode="min",
            scheduler=scheduler,
            num_samples=num_hp_samples,
        ),
        run_config=air.RunConfig(
            local_dir="./ray_results",
            name="tune_asha_regression",
            progress_reporter=reporter,
        ),
        param_space=hp_configs,
    )

    results = tuner.fit()

    print("Best hyperparameters found were: ", results.get_best_result().config)

    return results


if __name__ == "__main__":

    cfg = {
        'logging_path': './raytune_lightning_logs',
        'logging_name': 'test',
        'checkpoint_save_top_k': 10,
        'trainer_max_epochs': 5,
    }

    main(cfg)

Running python3 raytune_full_script.py should reproduce the error. Thanks in advance for your help!

Thanks for sharing! I was able to narrow it down to devices=4, accelerator='cpu' in the constructor of pl.Trainer. I believe that natively PyTorch Lightning will use multiprocessing, which in fact will not work with Tune. I was able to get this script to run by commenting out these lines:

   trainer = pl.Trainer(
        max_epochs=5,
        check_val_every_n_epoch=2,
        log_every_n_steps=100,
#        devices=4,
#        accelerator='cpu',
        fast_dev_run=False,
        logger=logger,
        callbacks=[lr_monitor, checkpoint_callback, tune_report_callback]
    )

However, if you do want the same behavior, I would suggest you check out ray_lightning which allows you to perform distributed training on top of Ray! Your updated script would look like this:

import pytorch_lightning as pl
import ray

from rt_models import RTDeepRegressionModel, RTDataModule
from ray import air, tune
from typing import Dict, List
from pytorch_lightning.loggers import TensorBoardLogger
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray_lightning import RayStrategy
from ray_lightning.tune import TuneReportCallback, get_tune_resources


def inside_tune():
    return ray.tune.is_session_enabled()

def main(cfg: Dict, verbose: bool = True):
    logger = TensorBoardLogger(save_dir=cfg['logging_path'], version=cfg['logging_name'], name="")

    dm = RTDataModule()
    dm.prepare_data()           # make example data available

    # Ray Tune hyperparameter search
    results = ray_tune_train(cfg, dm, logger, verbose=verbose)

    return results


def train_fn(model_cfg: Dict, cfg: Dict, data_module: pl.LightningDataModule, logger: TensorBoardLogger,
             verbose: bool = True):

    model = RTDeepRegressionModel(model_cfg)
    print("Initialized model: Inside raytune session: ", inside_tune())

    lr_monitor = LearningRateMonitor(logging_interval='step')
    checkpoint_callback = ModelCheckpoint(save_top_k=cfg['checkpoint_save_top_k'], monitor="epoch",
                                          mode="max", filename="model-{epoch}")
    tune_report_callback = TuneReportCallback(["ptl/val_loss"],
                                              on="validation_end")

    print("Initializing trainer: Inside raytune session: ", inside_tune())
    trainer = pl.Trainer(
        max_epochs=5,
        check_val_every_n_epoch=2,
        log_every_n_steps=100,
        strategy = RayStrategy(num_workers=4),
        fast_dev_run=False,
        logger=logger,
        callbacks=[lr_monitor, checkpoint_callback, tune_report_callback]
    )
    print("Initialized trainer: Inside raytune session: ", inside_tune())

    print("Trainer fit: Inside raytune session: ", inside_tune())
    trainer.fit(RTDeepRegressionModel(model_cfg), datamodule=data_module)          # ckpt_path=get_checkpoint_path(cfg)

    print("Trainer callback metrics: ", trainer.callback_metrics)


def ray_tune_train(cfg, data_module: pl.LightningDataModule, logger: TensorBoardLogger, verbose: bool = True):
    print("--Started raytune session: ", inside_tune())

    num_epochs = cfg['trainer_max_epochs']
    num_hp_samples = 5  # number of times to sample from hyperparameter space

    hp_configs = {
        "layer_width": tune.randint(8, 32),
        "num_blocks": tune.randint(1, 32),
        "dropout": tune.uniform(0, 0.5),
        "optimizer_lr": tune.loguniform(1e-4, 1e-1),
        "in_length": 2,         # 2 input features
        "out_length": 1,        # 1 output (regression)
    }

    train_fn_with_parameters = tune.with_parameters(train_fn,
                                                    cfg=cfg,
                                                    data_module=data_module,
                                                    logger=logger,
                                                    verbose=verbose
                                                    )                     

    scheduler = ASHAScheduler(
        max_t=num_epochs,
        grace_period=1,
        reduction_factor=2)

    reporter = CLIReporter(
        parameter_columns=["layer_width", "num_blocks", "dropout", "optimizer_lr"],
        metric_columns=["ptl/val_loss", "training_iteration"])

    tuner = tune.Tuner(
        tune.with_resources(
            train_fn_with_parameters,
            resources=get_tune_resources(num_workers=4)
        ),
        tune_config=tune.TuneConfig(
            metric="ptl/val_loss",
            mode="min",
            scheduler=scheduler,
            num_samples=num_hp_samples,
        ),
        run_config=air.RunConfig(
            local_dir="./ray_results",
            name="tune_asha_regression",
            progress_reporter=reporter,
        ),
        param_space=hp_configs,
    )

    results = tuner.fit()

    print("Best hyperparameters found were: ", results.get_best_result().config)

    return results


if __name__ == "__main__":

    cfg = {
        'logging_path': './raytune_lightning_logs',
        'logging_name': 'test',
        'checkpoint_save_top_k': 10,
        'trainer_max_epochs': 5,
    }

    main(cfg)

There are 3 main changes:

  1. Use the TuneReportCallback that is provided by ray_lightning. Additionally, import the other modules needed for the following steps.
- from ray.tune.integration.pytorch_lightning import TuneReportCallback
+ from ray_lightning import RayStrategy
+ from ray_lightning.tune import TuneReportCallback, get_tune_resources
  1. Replace the devices and accelerator with the RayStrategy - in this case we are also training across 4 CPU workers.
    trainer = pl.Trainer(
        max_epochs=5,
        check_val_every_n_epoch=2,
        log_every_n_steps=100,
-        devices=4,
-        accelerator='cpu',
+        strategy = RayStrategy(num_workers=4),
        fast_dev_run=False,
        logger=logger,
        callbacks=[lr_monitor, checkpoint_callback, tune_report_callback]
    )
  1. Update the constructed Trainable with the proper resource specification, which can be generated from the get_tune_resources utility function provided by ray_lightning.
    tuner = tune.Tuner(
-        train_fn_with_parameters,
+        tune.with_resources(
+            train_fn_with_parameters,
+            resources=get_tune_resources(num_workers=4)
+        ),
1 Like

Thanks @matthewdeng! Thank you for identifying the issue and I’ll try out Ray Lightning for multi-worker processing.

Thank you a lot for your help!