- High: It blocks me to complete my task.
Hi all, thanks ahead for the help!
My issue is that when I spin up a ray actor with 2 gpus I and then start training, only one GPU is utilized although I see and check in the actor that it has access to two GPU devices.
I am trying to spin up a ray actor with two GPUs as the following:
def initialize_persistent_trainer(
cfg: DictConfig, logger: Logger, oracle: nn.Module, trial_name: str = None
) -> ray.actor.ActorHandle:
"""
Initializes the PersistentTrainer Actor.
"""
logger.info("Initializing PersistentTrainer Actor...")
name = "persistent_trainer"
if trial_name:
name += "_" + trial_name
trainer_actor = PersistentTrainer.options(
name=name, max_concurrency=cfg.trainer.get("devices", None), num_gpus=cfg.trainer.get("devices", None)
).remote(cfg, logger, oracle)
logger.info(f"PersistentTrainer Actor initialized with {cfg.trainer.get("devices", None)} GPUs.")
return trainer_actor
My training in my actor is as the following:
def train(self, dataset: torch.utils.data.Dataset, round_num: int) -> str:
"""
Train the model with the provided dataset.
"""
torch.set_float32_matmul_precision("medium")
self.system_logger.info("check")
self.loggers = instantiate_loggers(self.cfg.get("logger"))
self.trainer = setup_lightning_trainer_pytorch_lightning_callbacks(
self.cfg, self.predictor, self.loggers, round_num
)
self.predictor.round = round_num
train_loader = torch.utils.data.DataLoader(
dataset,
batch_size=self.cfg.batch_size,
num_workers=10,
# pin_memory=True
# sampler=torch.utils.data.distributed.DistributedSampler(dataset, shuffle=False)
)
# val_loader = self.datamodule.val_dataloader()
# Fit the model
self.trainer.fit(
self.predictor,
train_dataloaders=train_loader,
# val_dataloaders=val_loader
)
checkpoint_callback = next(
(cb for cb in self.trainer.callbacks if isinstance(cb, ModelCheckpoint)),
None,
)
if checkpoint_callback:
checkpoint_path = checkpoint_callback.best_model_path
self.predictor_ckpt = checkpoint_path
self.loggers[0].experiment.log({"checkpoint": self.predictor_ckpt})
return self.predictor_ckpt
Where the trainer is setup as the following:
def setup_lightning_trainer_pytorch_lightning_callbacks(
cfg: DictConfig, lightning_module, logger, round_num
) -> pl.Trainer:
# must remain here to avoid circular import
# we have this function in order to create a lightning pytorch checkpoint callback
from utils import log_hyperparameters, instantiate_callbacks
# Retrieve the number of GPUs from the configuration
num_gpus = cfg.trainer.get("devices", 2) # Default to 2 if not specified
# Create a list of torch.device objects
parallel_devices = [torch.device(f'cuda:{i}') for i in range(num_gpus)]
log.info("Instantiating callbacks...")
checkpoint_cfg = cfg.callbacks.ray_checkpoint
num_to_keep = checkpoint_cfg.num_to_keep
monitor = f"{round_num}/{checkpoint_cfg.monitor}"
mode = checkpoint_cfg.mode
check_val_every_n_epoch = checkpoint_cfg.check_val_every_n_epoch
dirpath = checkpoint_cfg.checkpoint_path + f"{hash(cfg)}/"
filename = f"round={round_num}-" + "{epoch}"
os.makedirs(dirpath, exist_ok=True)
checkpoint_callback = ModelCheckpoint(
monitor=monitor,
mode=mode,
save_top_k=num_to_keep,
dirpath=dirpath,
every_n_epochs=check_val_every_n_epoch,
filename=filename,
)
############# SET TRAINER ###############
trainer = Trainer(
max_epochs=cfg.trainer.get("max_epochs", 10),
devices=cfg.trainer.get("devices", "auto"),
accelerator=cfg.trainer.get("accelerator", "auto"),
limit_train_batches=cfg.trainer.get("limit_train_batches", None),
strategy=ray.train.lightning.RayDDPStrategy(),
plugins=[ray.train.lightning.RayLightningEnvironment()],
callbacks=[checkpoint_callback],
logger=logger,
log_every_n_steps=1, # TODO: figure out if this is needed + hydra argument
)
trainer = ray.train.lightning.prepare_trainer(trainer)
# TODO: set what we want to log at setup, e.g. experiment name
if logger:
log.info("Logging hyperparameters!")
log_hyperparameters(cfg, lightning_module, trainer)
return trainer
I am not sure whether this is clear so will appreciate any comment!