How to use ray for multi-node training?

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Now I have two computers in local network, and I want to be able to use ray for distributed training.
Environment:Windows11, Python3.9.12, Ray2.1.0

  1. On the head node, I use “ray start --head --node-ip-address=172.20.10.9” to start the head node.
  2. I followed its prompt and use “ray start --address=172.20.10.9:6379” on the worker node to connect to the cluster.

But if I run the python file on the head node, the program gives the following error:

Failed to get the system config from raylet because it is dead. Worker will terminate. Status: GrpcUnavailable: RPC Error message: failed to connect to all addresses; RPC Error details:  .Please see `raylet.out` for more details.

If I run the python file on the worker node, the program stays in PENDING state

Here is my python code:

from typing import Dict
from ray.air import session

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

import ray
import ray.train as train
from ray.train.torch import TorchTrainer,TorchConfig
from ray.air.config import ScalingConfig



training_data = datasets.FashionMNIST(
    root="~/data",
    train=True,
    download=True,
    transform=ToTensor(),
)

test_data = datasets.FashionMNIST(
    root="~/data",
    train=False,
    download=True,
    transform=ToTensor(),
)


class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train_epoch(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) // session.get_world_size()
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def validate_epoch(dataloader, model, loss_fn):
    size = len(dataloader.dataset) // session.get_world_size()
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n "
        f"Accuracy: {(100 * correct):>0.1f}%, "
        f"Avg loss: {test_loss:>8f} \n"
    )
    return test_loss


def train_func(config: Dict):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["epochs"]

    worker_batch_size = batch_size // session.get_world_size()

    # Create data loaders.
    train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
    test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)

    train_dataloader = train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = train.torch.prepare_data_loader(test_dataloader)

    model = NeuralNetwork()
    model = train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    loss_results = []

    for _ in range(epochs):
        train_epoch(train_dataloader, model, loss_fn, optimizer)
        loss = validate_epoch(test_dataloader, model, loss_fn)
        loss_results.append(loss)
        session.report(dict(loss=loss))

    return loss_results


def train_fashion_mnist(num_workers=2, use_gpu=False):
    trainer = TorchTrainer(
        train_func,
        train_loop_config={"lr": 1e-3, "batch_size": 12, "epochs": 1},
    
        torch_config=TorchConfig(backend="gloo"),  
        
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, resources_per_worker={"GPU": 0.5}),

    )

    result = trainer.fit()
    print(f"Results: {result.metrics}")


if __name__ == "__main__":

    ray.init("auto")
    train_fashion_mnist(num_workers=3, use_gpu=True)

It’s not clear to me whether the worker node is successfully connecting to the cluster. After you run ray start --address=... from the worker node, can you run ray status from the head node and post the output?