How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Now I have two computers in local network, and I want to be able to use ray for distributed training.
Environment:Windows11, Python3.9.12, Ray2.1.0
- On the head node, I use “ray start --head --node-ip-address=172.20.10.9” to start the head node.
- I followed its prompt and use “ray start --address=172.20.10.9:6379” on the worker node to connect to the cluster.
But if I run the python file on the head node, the program gives the following error:
Failed to get the system config from raylet because it is dead. Worker will terminate. Status: GrpcUnavailable: RPC Error message: failed to connect to all addresses; RPC Error details: .Please see `raylet.out` for more details.
If I run the python file on the worker node, the program stays in PENDING state
Here is my python code:
from typing import Dict
from ray.air import session
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
import ray
import ray.train as train
from ray.train.torch import TorchTrainer,TorchConfig
from ray.air.config import ScalingConfig
training_data = datasets.FashionMNIST(
root="~/data",
train=True,
download=True,
transform=ToTensor(),
)
test_data = datasets.FashionMNIST(
root="~/data",
train=False,
download=True,
transform=ToTensor(),
)
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
nn.ReLU(),
)
def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits
def train_epoch(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset) // session.get_world_size()
model.train()
for batch, (X, y) in enumerate(dataloader):
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)
# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch % 100 == 0:
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")
def validate_epoch(dataloader, model, loss_fn):
size = len(dataloader.dataset) // session.get_world_size()
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(
f"Test Error: \n "
f"Accuracy: {(100 * correct):>0.1f}%, "
f"Avg loss: {test_loss:>8f} \n"
)
return test_loss
def train_func(config: Dict):
batch_size = config["batch_size"]
lr = config["lr"]
epochs = config["epochs"]
worker_batch_size = batch_size // session.get_world_size()
# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)
train_dataloader = train.torch.prepare_data_loader(train_dataloader)
test_dataloader = train.torch.prepare_data_loader(test_dataloader)
model = NeuralNetwork()
model = train.torch.prepare_model(model)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)
loss_results = []
for _ in range(epochs):
train_epoch(train_dataloader, model, loss_fn, optimizer)
loss = validate_epoch(test_dataloader, model, loss_fn)
loss_results.append(loss)
session.report(dict(loss=loss))
return loss_results
def train_fashion_mnist(num_workers=2, use_gpu=False):
trainer = TorchTrainer(
train_func,
train_loop_config={"lr": 1e-3, "batch_size": 12, "epochs": 1},
torch_config=TorchConfig(backend="gloo"),
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu, resources_per_worker={"GPU": 0.5}),
)
result = trainer.fit()
print(f"Results: {result.metrics}")
if __name__ == "__main__":
ray.init("auto")
train_fashion_mnist(num_workers=3, use_gpu=True)