Asking for help: the problem of error reporting caused by distributed use of AIR

I’m trying to use two hosts for pytorch training at the same time, and I’m not sure if I use it correctly.

I executed the following instructions in one of the hosts:

ray start --head --dashboard-host 0.0.0.0 --port 8264

In another host:

ray start --address 192.168.3.14:8264 

I have observed that two hosts can already be seen in the panel.

After that, I tried to execute the sample code in the head host:

import torch
import torch.nn as nn
from ray.air import ScalingConfig
from ray.train.torch import TorchTrainer
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from ray import train


def get_dataset():
    return datasets.FashionMNIST(
        root="/tmp/data",
        train=True,
        download=True,
        transform=ToTensor(),
    )


class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, inputs):
        inputs = self.flatten(inputs)
        logits = self.linear_relu_stack(inputs)
        return logits


def train_func():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)

    model = NeuralNetwork().to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")


def train_func_distributed():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)
    dataloader = train.torch.prepare_data_loader(dataloader)

    model = NeuralNetwork()
    model = train.torch.prepare_model(model)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")


if __name__ == '__main__':
    device = torch.device("cpu")
    use_gpu = False

    trainer = TorchTrainer(
        train_func_distributed,
        scaling_config=ScalingConfig(num_workers=2, use_gpu=use_gpu)
    )

    results = trainer.fit()

After that, I received an error:

2023-06-29 02:26:01,455	ERROR tune.py:1107 -- Trials did not complete: [TorchTrainer_f05d0_00000]
2023-06-29 02:26:01,455	INFO tune.py:1111 -- Total run time: 10.42 seconds (10.39 seconds for the tuning loop).
2023-06-29 02:26:01,457	WARNING experiment_analysis.py:910 -- Failed to read the results for 1 trials:
- /home/pth/ray_results/TorchTrainer_2023-06-29_02-25-51/TorchTrainer_f05d0_00000_0_2023-06-29_02-25-51
ray.exceptions.RayTaskError(RuntimeError): ray::_Inner.train() (pid=12368, ip=192.168.123.21, actor_id=4475cd043ae383c57865110807000000, repr=TorchTrainer)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/tune/trainable/trainable.py", line 389, in train
    raise skipped from exception_cause(skipped)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 336, in entrypoint
    return self._trainable_func(
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/base_trainer.py", line 795, in _trainable_func
    super()._trainable_func(self._merged_config, reporter, checkpoint_dir)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/tune/trainable/function_trainable.py", line 653, in _trainable_func
    output = fn()
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/base_trainer.py", line 705, in train_func
    trainer.training_loop()
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/data_parallel_trainer.py", line 424, in training_loop
    backend_executor.start(initialization_hook=None)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/_internal/backend_executor.py", line 153, in start
    self._backend.on_start(self.worker_group, self._backend_config)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/torch/config.py", line 205, in on_start
    ray.get(setup_futures)
ray.exceptions.RayTaskError(RuntimeError): ray::_RayTrainWorker__execute._setup_torch_process_group() (pid=5563, ip=192.168.123.22, actor_id=89a00115f6ddd46cd53ab78e07000000, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7fa10f7f47c0>)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/_internal/worker_group.py", line 32, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/_internal/worker_group.py", line 29, in __execute
    return func(*args, **kwargs)
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/torch/config.py", line 113, in _setup_torch_process_group
    dist.init_process_group(
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 895, in init_process_group
    default_pg = _new_process_group_helper(
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/torch/distributed/distributed_c10d.py", line 994, in _new_process_group_helper
    backend_class = ProcessGroupGloo(backend_prefix_store, group_rank, group_size, timeout=timeout)
RuntimeError: [../third_party/gloo/gloo/transport/tcp/pair.cc:799] connect [127.0.1.1]:4530: Connection refused

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/pth/Desktop/ray_test/new_test.py", line 88, in <module>
    best_result = trainer.fit()
  File "/home/pth/Desktop/ray_test/venv/lib/python3.9/site-packages/ray/train/base_trainer.py", line 616, in fit
    raise TrainingFailedError(
ray.train.base_trainer.TrainingFailedError: The Ray Train run failed. Please inspect the previous error messages for a cause. After fixing the issue (assuming that the error is not caused by your own application logic, but rather an error such as OOM), you can restart the run from scratch or continue this run.
To continue this run, you can use: `trainer = TorchTrainer.restore("/home/pth/ray_results/TorchTrainer_2023-06-29_02-25-51")`.
To start a new run that will retry on training failures, set `air.RunConfig(failure_config=air.FailureConfig(max_failures))` in the Trainer's `run_config` with `max_failures > 0`, or `max_failures = -1` for unlimited retries.

I hope someone can give me some advice, thank you very much!

@yic @sangcho Could you take a look at this? Thanks:)

In a glance, it looks like your process group cannot communicate each other. I don’t think it is the Ray specific issue, but gloo issue. My guess is your port is not properly open for the gloo communication?

Thank you for your reply. In fact, I tried to solve this problem some time ago, which is a problem caused by the gloo backend.

My method is as follows: Edit the hosts file in Ubuntu (/etc/hosts) and change 127.0.1.1 to the ip address used by the host in the current LAN.

After modifying both machines, the cpu can be used normally for distributed training.