I am trying to run the basic CIRFAR example, but keep running into errors when using ray train + ray tune, here is my training script:
import argparse
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from filelock import FileLock
from torch.utils.data import DataLoader, Subset
from torchvision.datasets import CIFAR10
import ray
import ray.train as train
from ray import tune
from ray.train import Trainer
from ray.tune import CLIReporter
from ray.tune.schedulers import PopulationBasedTraining
from ray.util.ml_utils.resnet import ResNet18
def train_epoch(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset) // train.world_size()
model.train()
for batch, (X, y) in enumerate(dataloader):
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def validate_epoch(dataloader, model, loss_fn):
size = len(dataloader.dataset) // train.world_size()
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(
f"Test Error: \n "
f"Accuracy: {(100 * correct):>0.1f}%, "
f"Avg loss: {test_loss:>8f} \n"
)
return {"loss": test_loss}
def train_func(config):
epochs = config.pop("epochs", 3)
model = ResNet18(config)
model = train.torch.prepare_model(model)
# Create optimizer.
optimizer = torch.optim.SGD(
model.parameters(),
lr=config.get("lr", 0.1),
momentum=config.get("momentum", 0.9),
)
# Load in training and validation data.
transform_train = transforms.Compose(
[
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
]
) # meanstd transformation
transform_test = transforms.Compose(
[
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
]
)
with FileLock(".ray.lock"):
train_dataset = CIFAR10(
root="~/data", train=True, download=True, transform=transform_train
)
validation_dataset = CIFAR10(
root="~/data", train=False, download=False, transform=transform_test
)
if config.get("test_mode"):
train_dataset = Subset(train_dataset, list(range(64)))
validation_dataset = Subset(validation_dataset, list(range(64)))
worker_batch_size = config["batch_size"] // train.world_size()
train_loader = DataLoader(train_dataset, batch_size=worker_batch_size)
validation_loader = DataLoader(validation_dataset, batch_size=worker_batch_size)
train_loader = train.torch.prepare_data_loader(train_loader)
validation_loader = train.torch.prepare_data_loader(validation_loader)
# Create loss.
criterion = nn.CrossEntropyLoss()
results = []
for _ in range(epochs):
train_epoch(train_loader, model, criterion, optimizer)
result = validate_epoch(validation_loader, model, criterion)
train.report(**result)
results.append(result)
return results
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--address", required=False, type=str, help="the address to use for Redis"
)
parser.add_argument(
"--num-workers",
"-n",
type=int,
default=2,
help="Sets number of workers for training.",
)
parser.add_argument(
"--num-epochs", type=int, default=5, help="Number of epochs to train."
)
parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.",
)
parser.add_argument(
"--use-gpu", action="store_true", default=False, help="Enables GPU training"
)
args, _ = parser.parse_known_args()
if args.smoke_test:
ray.init(num_cpus=4)
else:
ray.init(address=args.address)
trainer = Trainer("torch", num_workers=args.num_workers, use_gpu=args.use_gpu)
Trainable = trainer.to_tune_trainable(train_func)
pbt_scheduler = PopulationBasedTraining(
time_attr="training_iteration",
metric="loss",
mode="min",
perturbation_interval=1,
hyperparam_mutations={
# distribution for resampling
"lr": lambda: np.random.uniform(0.001, 1),
# allow perturbations within this set of categorical values
"momentum": [0.8, 0.9, 0.99],
},
)
reporter = CLIReporter()
reporter.add_metric_column("loss", "loss")
analysis = tune.run(
Trainable,
num_samples=4,
config={
"lr": tune.choice([0.001, 0.01, 0.1]),
"momentum": 0.8,
"batch_size": 128 * args.num_workers,
"epochs": args.num_epochs,
"test_mode": args.smoke_test, # whether to to subset the data
},
stop={"training_iteration": 2 if args.smoke_test else 100},
max_failures=3, # used for fault tolerance
checkpoint_freq=3, # used for fault tolerance
keep_checkpoints_num=1, # used for fault tolerance
verbose=2,
progress_reporter=reporter,
scheduler=pbt_scheduler,
)
print(analysis.get_best_config(metric="loss", mode="min"))
Here is the error:
(BackendExecutor pid=49802) 2022-05-16 20:51:11,244 ERROR worker.py:94 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::BaseWorkerMixin._BaseWorkerMixin__execute() (pid=49897, ip=172.17.0.3, repr=<ray.train.worker_group.BaseWorkerMixin object at 0x7ff68bb89580>)
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/worker_group.py", line 26, in __execute
(BackendExecutor pid=49802) return func(*args, **kwargs)
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/backend.py", line 489, in end_training
(BackendExecutor pid=49802) output = session.finish()
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/session.py", line 118, in finish
(BackendExecutor pid=49802) func_output = self.training_thread.join()
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/utils.py", line 96, in join
(BackendExecutor pid=49802) raise self.exc
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/utils.py", line 89, in run
(BackendExecutor pid=49802) self.ret = self._target(*self._args, **self._kwargs)
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/utils.py", line 138, in <lambda>
(BackendExecutor pid=49802) return lambda: train_func(config)
(BackendExecutor pid=49802) File "/home/chen999/mage/detr-test/detreg/tune_cifar_pytorch.py", line 81, in train_func
(BackendExecutor pid=49802) model = train.torch.prepare_model(model)
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/torch.py", line 614, in prepare_model
(BackendExecutor pid=49802) return get_accelerator(TorchAccelerator).prepare_model(
(BackendExecutor pid=49802) File "/opt/conda/lib/python3.9/site-packages/ray/train/torch.py", line 95, in prepare_model
(BackendExecutor pid=49802) torch.cuda.set_device(device)
(BackendExecutor pid=49802) File "/home/chen999/.local/lib/python3.9/site-packages/torch/cuda/__init__.py", line 264, in set_device
(BackendExecutor pid=49802) torch._C._cuda_setDevice(device)
(BackendExecutor pid=49802) RuntimeError: CUDA error: invalid device ordinal
(BackendExecutor pid=49802) CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
(BackendExecutor pid=49802) For debugging consider passing CUDA_LAUNCH_BLOCKING=1.
I set num_workers = 2, use_gpu = True in the Trainer above so I can run multiple trials with 2 gpus each. I haven’t been able to get any ray train + ray tune to work in a distributed manner (single node, 8 GPUs).