Ray.tune with pytorch: only uses 1 of 4 GPUs

The Scripts

# -*- coding: utf-8 -*-
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel

import ray
from ray import air, tune
from ray.tune.schedulers import ASHAScheduler
from ray.air import session
from ray import train
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer
from ray.tune.tuner import Tuner, TuneConfig
import os

os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3"

class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        # In this example, we don't change the model architecture
        # due to simplicity.
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)
    
def train(model, optimizer, train_loader, epoch, num_epochs):
    model.train()
    total_step = len(train_loader)
    loss_fn = nn.CrossEntropyLoss()
    rank = session.get_local_rank()
    for i, (imgs, labels) in enumerate(train_loader):
        imgs = imgs.to(f"cuda:{rank}")
        labels = labels.to(f"cuda:{rank}")
        model_output = model(imgs)
        loss = loss_fn(model_output, labels)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step() 
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                    .format(epoch + 1, num_epochs, i + 1, total_step, loss.cpu().item()))
            
@torch.no_grad()
def test(model, test_loader, epoch, num_epochs):
    total = 0
    rank = session.get_local_rank()
    for images, labels in test_loader:
        images = images.to(f"cuda:{rank}")
        labels = labels.to(f"cuda:{rank}")
        test_output = model(images)
        pred_y = torch.max(test_output, 1)[1].data.squeeze()
        accuracy = (pred_y == labels).sum().item() / float(labels.size(0))
        total += accuracy
    
    return total / len(test_loader)


    
def train_mnist(config):
    # Data Setup
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])
    train_loader = DataLoader(
        datasets.MNIST("~/data", train=True, download=True, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    train_loader = ray.train.torch.prepare_data_loader(train_loader, move_to_device=False)
    test_loader = DataLoader(
        datasets.MNIST("~/data", train=False, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    test_loader = ray.train.torch.prepare_data_loader(test_loader, move_to_device=False)


    

    model = ConvNet()
    model = ray.train.torch.prepare_model(model)

    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])
    num_epochs = 2
    for epoch in range(num_epochs):
        train(model, optimizer, train_loader, epoch, num_epochs)
        acc = test(model, test_loader, epoch, num_epochs)

        # Send the current training result back to Tunen
        session.report(metrics={"mean_accuracy":acc})



if __name__ == "__main__":
    search_space = {
        # "lr": tune.sample_from(lambda spec: 10 ** (-10 * np.random.rand())),
        "lr": tune.loguniform(1e-4, 1e-2),
        "momentum": tune.uniform(0.1, 0.9),
    }

    # Uncomment this to enable distributed execution
    ray.init(address="auto")

    # Download the dataset first
    datasets.MNIST("~/data", train=True, download=True)
    resources_per_trial = {"GPU": 4, "CPU": 8}
    trainer = TorchTrainer(
        train_mnist, 
        scaling_config=ScalingConfig(use_gpu=True,
                                     num_workers=1,
                                     resources_per_worker=resources_per_trial,
                                     _max_cpu_fraction_per_node=0.6
                                     ))
    tuner = Tuner(
        trainer,
        param_space={
            "train_loop_config": {
                "lr": tune.loguniform(1e-4, 1e-2),
                "momentum": tune.uniform(0.1, 0.9),
            }
        },
        tune_config=TuneConfig(num_samples=5, 
                               metric="mean_accuracy", 
                               max_concurrent_trials=1,
                               mode="max"),
    )
    result_grid = tuner.fit()
    print(result_grid.get_best_result().metrics["mean_accuracy"])

When I run the above script, only 1 GPU was working even I have 4 GPUs.

BTW
if I change the statement ray.train.torch.prepare_data_loader(train_loader, move_to_device=False) to ray.train.torch.prepare_data_loader(train_loader, move_to_device=True) and delete all move to device statements imgs.to(f"cuda:{rank}"), the code runing will be hang.


OK, that’s all thank you very much.

The reason for this error is that you are only using 1 worker in ScalingConfig, and try to change the GPU assignments manually. Instead, you should be using 1 worker per GPU, and let Ray Train manage the GPUs. This is how you would modify the script:

# -*- coding: utf-8 -*-
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.nn.functional as F
from torch.nn.parallel import DistributedDataParallel

import ray
from ray import air, tune, train
from ray.tune.schedulers import ASHAScheduler
from ray.air import session
from ray import train
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer
from ray.tune.tuner import Tuner, TuneConfig
import os

# DO NOT SET THIS! This will be managed by Ray.
# os.environ['CUDA_VISIBLE_DEVICES'] = "0,1,2,3"

class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        # In this example, we don't change the model architecture
        # due to simplicity.
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)
    
def train_fn(model, optimizer, train_loader, epoch, num_epochs):
    model.train()
    total_step = len(train_loader)
    loss_fn = nn.CrossEntropyLoss()
    device = ray.train.torch.get_device()
    for i, (imgs, labels) in enumerate(train_loader):
        imgs = imgs.to(device)
        labels = labels.to(device)
        model_output = model(imgs)
        loss = loss_fn(model_output, labels)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step() 
        if (i+1) % 100 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                    .format(epoch + 1, num_epochs, i + 1, total_step, loss.cpu().item()))
            
@torch.no_grad()
def test(model, test_loader, epoch, num_epochs):
    total = 0
    device = ray.train.torch.get_device()
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        test_output = model(images)
        pred_y = torch.max(test_output, 1)[1].data.squeeze()
        accuracy = (pred_y == labels).sum().item() / float(labels.size(0))
        total += accuracy
    
    return total / len(test_loader)


    
def train_mnist(config):
    # Data Setup
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])
    train_loader = DataLoader(
        datasets.MNIST("~/data", train=True, download=True, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    train_loader = ray.train.torch.prepare_data_loader(train_loader, move_to_device=False)
    test_loader = DataLoader(
        datasets.MNIST("~/data", train=False, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    test_loader = ray.train.torch.prepare_data_loader(test_loader, move_to_device=False)


    

    model = ConvNet()
    model = ray.train.torch.prepare_model(model)

    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])
    num_epochs = 2
    for epoch in range(num_epochs):
        train_fn(model, optimizer, train_loader, epoch, num_epochs)
        acc = test(model, test_loader, epoch, num_epochs)

        # Send the current training result back to Tunen
        session.report(metrics={"mean_accuracy":acc})



if __name__ == "__main__":
    search_space = {
        # "lr": tune.sample_from(lambda spec: 10 ** (-10 * np.random.rand())),
        "lr": tune.loguniform(1e-4, 1e-2),
        "momentum": tune.uniform(0.1, 0.9),
    }

    # Uncomment this to enable distributed execution
    ray.init(address="auto")

    # Download the dataset first
    datasets.MNIST("~/data", train=True, download=True)
    resources_per_trial = {"GPU": 1, "CPU": 8}
    trainer = TorchTrainer(
        train_mnist, 
        scaling_config=ScalingConfig(use_gpu=True,
                                     num_workers=4,
                                     resources_per_worker=resources_per_trial,
                                     _max_cpu_fraction_per_node=0.6
                                     ))
    tuner = Tuner(
        trainer,
        param_space={
            "train_loop_config": {
                "lr": tune.loguniform(1e-4, 1e-2),
                "momentum": tune.uniform(0.1, 0.9),
            }
        },
        tune_config=TuneConfig(num_samples=5, 
                               metric="mean_accuracy", 
                               max_concurrent_trials=1,
                               mode="max"),
    )
    result_grid = tuner.fit()
    print(result_grid.get_best_result().metrics["mean_accuracy"])

One worker per replica is the paradigm of PyTorch DDP, which TorchTrainer is using here.

Also, you probably don’t need to set _max_cpu_fraction_per_node so low, especially considering you are not using Ray Datasets. I’d leave it at 0.9.