[Tune] lightning without model/dataset parallelism

I’ve been following the lightning with tune tutorial to tune hyperparameters for a fairly simple sequence to sequence model on a host with 3 GPUs. Unfortunately, on the host DDP for whatever reason frequently gets stuck with processes blocking the GPU, not terminating after CTRL+C, endless zombies, … (this is the case even with pure lightning DDP sans ray) and it’s been impossible to effectively debug as my environment is fairly complex (memory mapped dataset, checkpoints on NFS, simultaneous CUDA processes that aren’t under ray control). I see this behavior even when I limit ray to a single GPU. I’m running on pytorch/lightning/ray 2.0.1/2.0.9/2.10.

My model is small enough to train on a single GPU so I’ve been wondering if there is a way to easily circumvent the automatic DDP wrapping of ray lightning and train without it? Alternatively if someone knows a good way to debug DDP stuff I’d be very happy to hear about it.

My code is below although it probably isn’t particularly helpful if I haven’t made an obvious mistake adapting the tutorial somewhere:

def train_model(config, format_type, training_data, evaluation_data):
    from pytorch_lightning import Trainer
    from threadpoolctl import threadpool_limits

    from ray.train.lightning import (
        RayDDPStrategy,
        RayLightningEnvironment,
        RayTrainReportCallback,
        prepare_trainer,
    )

    hyper_params = RECOGNITION_HYPER_PARAMS.copy()
    hyper_params.update(config)

    data_module = TextLineDataModule(training_data=training_data,
                                     evaluation_data=evaluation_data,
                                     pad=hyper_params['pad'],
                                     height=hyper_params['height'],
                                     augmentation=hyper_params['augment'],
                                     partition=0.9,
                                     batch_size=hyper_params['batch_size'],
                                     num_workers=8,
                                     format_type=format_type)
    
    model = RecognitionModel(hyper_params=hyper_params,
                             num_classes=data_module.num_classes,
                             batches_per_epoch=len(data_module.train_dataloader()))

    trainer = Trainer(accelerator="auto",
                      devices="auto",
                      precision=16,
                      max_epochs=hyper_params['epochs'],
                      min_epochs=hyper_params['min_epochs'],
                      enable_progress_bar=False,
                      enable_model_summary=False,
                      enable_checkpointing=False,
                      callbacks=[RayTrainReportCallback()],
                      plugins=[RayLightningEnvironment()],
                      strategy=RayDDPStrategy())

    trainer = prepare_trainer(trainer)
    with threadpool_limits(limits=1):
        trainer.fit(model, data_module)

from functools import partial

from ray import tune
from ray.tune.schedulers import ASHAScheduler

from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig

search_space = {
    "warmup": tune.lograndint(1, 10000),
    "lr": tune.loguniform(1e-6, 1e-1),
    "batch_size": tune.choice([1, 2, 4, 8, 16]),
    'decoder_hidden_dim': tune.lograndint(128, 512),
}

train_cocr = partial(train_model, format_type=format_type, training_data=ground_truth, evaluation_data=evaluation_files)

scaling_config = ScalingConfig(
    num_workers=3, use_gpu=True, resources_per_worker={"CPU": 8, "GPU": 1}
)

run_config = RunConfig(
    checkpoint_config=CheckpointConfig(
        num_to_keep=2,
        checkpoint_score_attribute="val_metric",
        checkpoint_score_order="max",
    ),
    storage_path='/mnt/nfs_data/experiments',
    name="cocr_tune",
)

# Define a TorchTrainer without hyper-parameters for Tuner
ray_trainer = TorchTrainer(
    train_cocr,
    scaling_config=scaling_config,
    run_config=run_config,
)

def tune_cocr_asha(num_samples=25, num_epochs=50):
    scheduler = ASHAScheduler(max_t=num_epochs, grace_period=1, reduction_factor=2)

    tuner = tune.Tuner(
        ray_trainer,
        param_space={"train_loop_config": search_space},
        tune_config=tune.TuneConfig(
            metric="val_metric",
            mode="max",
            num_samples=num_samples,
            scheduler=scheduler,
        ),
    )
    return tuner.fit()

results = tune_cocr_asha()