Reproducibility parmeter server failed

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I try to use Parmeter Server with ray and I followed the dome. I fixed the all random seed which I can as follow:

    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    print(f"NOTE: Set random seed: {seed}.")

But I never get the same value.
The full code shows below.

import torch
import torch.nn.functional as F
from torchvision import datasets, transforms
from filelock import FileLock
import numpy as np

from net_1 import ConvNet

import ray

import os
os.environ['CUDA_VISIBLE_DEVICES']='0,2,3'
import random

def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    print(f"NOTE: Set random seed: {seed}.")


def seed_worker(worker_id):
    np.random.seed(42)
    random.seed(42)
def get_data_loader():
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])


    with FileLock(os.path.expanduser("./data.lock")):
        train_loader = torch.utils.data.DataLoader(
            datasets.MNIST(
                "./dataset",
                train=True,
                download=False,
                transform=mnist_transforms),
            batch_size=128,worker_init_fn=seed_worker,
            shuffle=True)
        test_loader = torch.utils.data.DataLoader(
            datasets.MNIST("./dataset", train=False, transform=mnist_transforms),
            batch_size=128,worker_init_fn=seed_worker,
            shuffle=True)
    return train_loader, test_loader


def evaluate(model, test_loader):
    """Evaluates the accuracy of the model on a validation dataset."""
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(test_loader):
            if batch_idx * len(data) > 1024:
                break
            outputs = model(data.cuda()).cpu()
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    return 100. * correct / total

@ray.remote(num_cpus=1,num_gpus=1)
class ParameterServer(object):
    def __init__(self, lr):
        # self.model = AlexNet(10).cuda()
        self.model = ConvNet().cuda()
        self.optimizer = torch.optim.SGD(self.model.parameters(), lr=lr)

    def apply_gradients(self, *gradients):
        summed_gradients = [
            np.stack(gradient_zip).sum(axis=0)
            for gradient_zip in zip(*gradients)
        ]
        self.optimizer.zero_grad()
        self.model.set_gradients(summed_gradients)
        self.optimizer.step()
        return self.model.get_weights()

    def get_weights(self):
        return self.model.get_weights()

@ray.remote(num_cpus=1,num_gpus=1)
class DataWorker(object):
    def __init__(self):
        self.model = ConvNet().cuda()
        self.data_iterator = iter(get_data_loader()[0])

    def compute_gradients(self, weights):
        self.model.set_weights(weights)
        try:
            data, target = next(self.data_iterator)
        except StopIteration:  # When the epoch ends, start a new epoch.
            self.data_iterator = iter(get_data_loader()[0])
            data, target = next(self.data_iterator)
        self.model.zero_grad()
        output = self.model(data.cuda())
        loss = F.nll_loss(output, target.cuda())
        print(loss)
        loss.backward()
        return self.model.get_gradients()

iterations = 2
num_workers = 1

set_seed(42)
ray.init(ignore_reinit_error=True,num_cpus=num_workers+1,num_gpus=num_workers+1)
ps = ParameterServer.remote(1e-2)
workers = [DataWorker.remote() for i in range(num_workers)]

model = ConvNet().cuda()
test_loader = get_data_loader()[1]

print("Running synchronous parameter server training.")
current_weights = ps.get_weights.remote()
for i in range(iterations):
    gradients = [
        worker.compute_gradients.remote(current_weights) for worker in workers
    ]
    current_weights = ps.apply_gradients.remote(*gradients)

    if i % 1 == 0:
        # Evaluate the current model.
        model.set_weights(ray.get(current_weights))
        accuracy = evaluate(model, test_loader)
        print("Iter {}: \taccuracy is {:.1f}".format(i, accuracy))
        print(ray.get(current_weights))

ray.shutdown()

code in net_1.py

import torch.nn as nn
import torch.nn.functional as F
import torch

class ConvNet(nn.Module):
    """Small ConvNet for MNIST."""

    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)
        nn.init.constant_(self.conv1.weight,1)
        nn.init.constant_(self.conv1.bias,0)
        nn.init.constant_(self.fc.weight,1)
        nn.init.constant_(self.fc.bias,0)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)

    def get_weights(self):
        return {k: v.cpu() for k, v in self.state_dict().items()}

    def set_weights(self, weights):
        self.load_state_dict(weights)

    def get_gradients(self):
        grads = []
        for p in self.parameters():
            grad = None if p.grad is None else p.grad.data.cpu().numpy()
            grads.append(grad)
        return grads

    def set_gradients(self, gradients):
        for g, p in zip(gradients, self.parameters()):
            if g is not None:
                p.grad = torch.from_numpy(g).cuda()

cc ML on-call (@xwjiang2010) and Train team (@amogkam)