How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I try to use Parmeter Server with ray and I followed the dome. I fixed the all random seed which I can as follow:
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
print(f"NOTE: Set random seed: {seed}.")
But I never get the same value.
The full code shows below.
import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from filelock import FileLock
import numpy as np
from net_1 import ConvNet
import ray
import os
os.environ['CUDA_VISIBLE_DEVICES']='0,2,3'
import random
def set_seed(seed):
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
torch.backends.cudnn.deterministic = True
print(f"NOTE: Set random seed: {seed}.")
def seed_worker(worker_id):
np.random.seed(42)
random.seed(42)
def get_data_loader():
mnist_transforms = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.1307, ), (0.3081, ))])
with FileLock(os.path.expanduser("./data.lock")):
train_loader = torch.utils.data.DataLoader(
datasets.MNIST(
"./dataset",
train=True,
download=False,
transform=mnist_transforms),
batch_size=128,worker_init_fn=seed_worker,
shuffle=True)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST("./dataset", train=False, transform=mnist_transforms),
batch_size=128,worker_init_fn=seed_worker,
shuffle=True)
return train_loader, test_loader
def evaluate(model, test_loader):
"""Evaluates the accuracy of the model on a validation dataset."""
model.eval()
correct = 0
total = 0
with torch.no_grad():
for batch_idx, (data, target) in enumerate(test_loader):
if batch_idx * len(data) > 1024:
break
outputs = model(data.cuda()).cpu()
_, predicted = torch.max(outputs.data, 1)
total += target.size(0)
correct += (predicted == target).sum().item()
return 100. * correct / total
@ray.remote(num_cpus=1,num_gpus=1)
class ParameterServer(object):
def __init__(self, lr):
# self.model = AlexNet(10).cuda()
self.model = ConvNet().cuda()
self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr)
def apply_gradients(self, *gradients):
summed_gradients = [
np.stack(gradient_zip).sum(axis=0)
for gradient_zip in zip(*gradients)
]
self.optimizer.zero_grad()
self.model.set_gradients(summed_gradients)
self.optimizer.step()
return self.model.get_weights()
def get_weights(self):
return self.model.get_weights()
@ray.remote(num_cpus=1,num_gpus=1)
class DataWorker(object):
def __init__(self):
self.model = ConvNet().cuda()
self.data_iterator = iter(get_data_loader()[0])
def compute_gradients(self, weights):
self.model.set_weights(weights)
try:
data, target = next(self.data_iterator)
except StopIteration: # When the epoch ends, start a new epoch.
self.data_iterator = iter(get_data_loader()[0])
data, target = next(self.data_iterator)
self.model.zero_grad()
output = self.model(data.cuda())
loss = F.nll_loss(output, target.cuda())
print(loss)
loss.backward()
return self.model.get_gradients()
iterations = 2
num_workers = 1
set_seed(42)
ray.init(ignore_reinit_error=True,num_cpus=num_workers+1,num_gpus=num_workers+1)
ps = ParameterServer.remote(1e-2)
workers = [DataWorker.remote() for i in range(num_workers)]
model = ConvNet().cuda()
test_loader = get_data_loader()[1]
print("Running synchronous parameter server training.")
current_weights = ps.get_weights.remote()
for i in range(iterations):
gradients = [
worker.compute_gradients.remote(current_weights) for worker in workers
]
current_weights = ps.apply_gradients.remote(*gradients)
if i % 1 == 0:
# Evaluate the current model.
model.set_weights(ray.get(current_weights))
accuracy = evaluate(model, test_loader)
print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy))
print(ray.get(current_weights))
ray.shutdown()
code in net_1.py
import torch.nn as nn
import torch.nn.functional as F
import torch
class ConvNet(nn.Module):
"""Small ConvNet for MNIST."""
def __init__(self):
super(ConvNet, self).__init__()
self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
self.fc = nn.Linear(192, 10)
nn.init.constant_(self.conv1.weight,1)
nn.init.constant_(self.conv1.bias,0)
nn.init.constant_(self.fc.weight,1)
nn.init.constant_(self.fc.bias,0)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 3))
x = x.view(-1, 192)
x = self.fc(x)
return F.log_softmax(x, dim=1)
def get_weights(self):
return {k: v.cpu() for k, v in self.state_dict().items()}
def set_weights(self, weights):
self.load_state_dict(weights)
def get_gradients(self):
grads = []
for p in self.parameters():
grad = None if p.grad is None else p.grad.data.cpu().numpy()
grads.append(grad)
return grads
def set_gradients(self, gradients):
for g, p in zip(gradients, self.parameters()):
if g is not None:
p.grad = torch.from_numpy(g).cuda()