TL;DR: Ray Tune on GCP cluster fails to sync down last checkpoint
Hi,
I am trying the Ray tune mnist_pytorch example (mnist_pytorch — Ray v2.0.0.dev0) on a GCP cluster,
and I have added the checkpointing code in my training loop using tune.checkpoint_dir.
However, I keep encountering the error below, which reports a failure in synchronising the last iteration checkpoint between the worker and the head node:
2021-04-06 09:51:12,876	ERROR syncer.py:190 -- Sync execution failed.
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/syncer.py", line 186, in sync_down
    result = self.sync_client.sync_down(self._remote_path,
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/sync_client.py", line 212, in sync_down
    return self._execute(self.sync_down_template, source, target)
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/sync_client.py", line 271, in _execute
    stdout=self._get_logfile())
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/sync_client.py", line 201, in _get_logfile
    raise RuntimeError(
RuntimeError: [internalerror] The client has been closed. Please report this stacktrace + your cluster configuration on Github!
2021-04-06 09:51:12,877	ERROR syncer.py:413 -- Trial train_mnist_75714_00001: Checkpoint sync skipped. This should not happen.
2021-04-06 09:51:12,878	ERROR trial_runner.py:899 -- Trial train_mnist_75714_00001: Error handling checkpoint /home/ubuntu/ray_results/exp/train_mnist_75714_00001_1_lr=0.00026945,momentum=0.36098_2021-04-06_09-50-03/checkpoint_000100/
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/trial_runner.py", line 891, in _process_trial_save
    self._callbacks.on_checkpoint(
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/callback.py", line 216, in on_checkpoint
    callback.on_checkpoint(**info)
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/syncer.py", line 455, in on_checkpoint
    self._sync_trial_checkpoint(trial, checkpoint)
  File "/home/ubuntu/anaconda3/lib/python3.8/site-packages/ray/tune/syncer.py", line 428, in _sync_trial_checkpoint
    raise TuneError("Trial {}: Checkpoint path {} not "
ray.tune.error.TuneError: Trial train_mnist_75714_00001: Checkpoint path /home/ubuntu/ray_results/exp/train_mnist_75714_00001_1_lr=0.00026945,momentum=0.36098_2021-04-06_09-50-03/checkpoint_000100/ not found after successful sync down.
As a result, the last checkpoint at the end of the training is missing.
I have experienced the same issue also with other examples I have developed, and the same error also happens when a trial is early stopped before reaching the end of training  (e.g. if interrupted by tune when the performance is worse than other trials).
Could you help me understand why this happens and how to prevent it, please?
Here is the script mnist_pytorch.py I am running:
import os
import argparse
from filelock import FileLock
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler
# Change these values if you want the training to run quicker or slower.
EPOCH_SIZE = 512
TEST_SIZE = 256
class ConvNet(nn.Module):
    def __init__(self):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 3, kernel_size=3)
        self.fc = nn.Linear(192, 10)
    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 3))
        x = x.view(-1, 192)
        x = self.fc(x)
        return F.log_softmax(x, dim=1)
def train(model, optimizer, train_loader, device=None):
    device = device or torch.device("cpu")
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        if batch_idx * len(data) > EPOCH_SIZE:
            return
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
def test(model, data_loader, device=None):
    device = device or torch.device("cpu")
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for batch_idx, (data, target) in enumerate(data_loader):
            if batch_idx * len(data) > TEST_SIZE:
                break
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    return correct / total
def get_data_loaders():
    mnist_transforms = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize((0.1307, ), (0.3081, ))])
    # We add FileLock here because multiple workers will want to
    # download data, and this may cause overwrites since
    # DataLoader is not threadsafe.
    with FileLock(os.path.expanduser("~/data.lock")):
        train_loader = torch.utils.data.DataLoader(
            datasets.KMNIST(
                "~/data",
                train=True,
                download=True,
                transform=mnist_transforms),
            batch_size=64,
            shuffle=True)
    test_loader = torch.utils.data.DataLoader(
        datasets.KMNIST("~/data", train=False, transform=mnist_transforms),
        batch_size=64,
        shuffle=True)
    return train_loader, test_loader
def train_mnist(config, checkpoint_dir=None, save_every_n=2):
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    train_loader, test_loader = get_data_loaders()
    model = ConvNet().to(device)
    optimizer = optim.SGD(
        model.parameters(), lr=config["lr"], momentum=config["momentum"])
    if checkpoint_dir:
        checkpoint_file = os.path.join(checkpoint_dir, "checkpoint.pth")
        model_state, optimizer_state = torch.load(checkpoint_file)
        model.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)
    step = 0
    while True:
        train(model, optimizer, train_loader, device)
        acc = test(model, test_loader, device)
        step += 1
        if step % save_every_n == 0:
            with tune.checkpoint_dir(step=step) as checkpoint_dir:
                path = os.path.join(checkpoint_dir, "checkpoint.pth")
                torch.save((model.state_dict(), optimizer.state_dict()), path)
        # Set this to run Tune.
        tune.report(mean_accuracy=acc)
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="PyTorch MNIST Example")
    parser.add_argument(
        "--cuda",
        action="store_true",
        default=False,
        help="Enables GPU training")
    parser.add_argument(
        "--smoke-test", action="store_true", help="Finish quickly for testing")
    args = parser.parse_args()
    try:
        ray.init(address="auto")
    except:
        ray.init()
    # for early stopping
    sched = AsyncHyperBandScheduler()
    analysis = tune.run(
        train_mnist,
        metric="mean_accuracy",
        mode="max",
        name="exp",
        scheduler=sched,
        stop={
            "mean_accuracy": 0.98,
            "training_iteration": 5 if args.smoke_test else 100
        },
        resources_per_trial={
            "cpu": 4,
            "gpu": int(args.cuda)  # set this for GPUs
        },
        num_samples=1 if args.smoke_test else 20,
        config={
            "lr": tune.loguniform(1e-4, 1e-2),
            "momentum": tune.uniform(0.1, 0.9),
        })
    print("Best config is:", analysis.best_config)
(this is the same script as in mnist_pytorch — Ray v2.0.0.dev0, expect for the addition of the checkpointing, the use of kmnist instead of mnist dataset and the required resourcers per trial - no GPU is used).
Below is the yaml file to configure the GCP cluster:
# An unique identifier for the head node and workers of this cluster.
cluster_name: clustername
# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 2
# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2
# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0
# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker: {}
# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5
# Cloud-provider specific configuration.
provider:
   type: gcp
   region: europe-west1
   availability_zone: europe-west1-b
   project_id: <my_project>
# How Ray will authenticate with newly launched nodes.
auth:
   ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below. This requires that you have added the key into the
# project wide meta-data.
#    ssh_private_key: /path/to/your/key.pem
# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as subnets and ssh-keys.
# For more documentation on available fields, see:
# https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
head_node:
   machineType: n1-standard-2
   disks:
     - boot: true
       autoDelete: true
       type: PERSISTENT
       initializeParams:
         diskSizeGb: 50
         # See https://cloud.google.com/compute/docs/images for more images
         sourceImage: projects/deeplearning-platform-release/global/images/family/pytorch-latest-cu101-debian-10
   # Additional options can be found in in the compute docs at
   # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
   # If the network interface is specified as below in both head and worker
   # nodes, the manual network config is used.  Otherwise an existing subnet is
   # used.  To use a shared subnet, ask the subnet owner to grant permission
   # for 'compute.subnetworks.use' to the ray autoscaler account...
   # networkInterfaces:
   #   - kind: compute#networkInterface
   #     subnetwork: path/to/subnet
   #     aliasIpRanges: []
worker_nodes:
   machineType: n1-standard-4
   disks:
     - boot: true
       autoDelete: true
       type: PERSISTENT
       initializeParams:
         diskSizeGb: 50
         # See https://cloud.google.com/compute/docs/images for more images
         sourceImage: projects/deeplearning-platform-release/global/images/family/pytorch-latest-cu101-debian-10
   scheduling:
     - preemptible: false
       onHostMaintenance: TERMINATE
   # Additional options can be found in in the compute docs at
   # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
 "./mnist_pytorch.py": "./mnist_pytorch.py",
#    "/path1/on/remote/machine": "/path1/on/local/machine",
}
# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []
# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False
# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude: []
# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter: []
# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []
# List of shell commands to run to set up nodes.
setup_commands:
   # Note: if you're developing Ray, you probably want to create an AMI that
   # has your Ray repo pre-cloned. Then, you can replace the pip installs
   # below with a git checkout <your_sha> (and possibly a recompile).
   # - echo 'export PATH="$HOME/anaconda3/envs/tensorflow_p36/bin:$PATH"' >> ~/.bashrc
   # Install MiniConda.
   - >-
     sudo apt install -y build-essential
     && wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O ~/anaconda3.sh
     || true
     && bash ~/anaconda3.sh -b -p ~/anaconda3 || true
     && rm ~/anaconda3.sh
     && echo 'export PATH="$HOME/anaconda3/bin:$PATH"' >> ~/.profile
   # Install ray
   - pip install -U https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
   - pip install torch==1.7.1+cu101 torchvision==0.8.2+cu101 torchaudio==0.7.2 -f https://download.pytorch.org/whl/torch_stable.html
   - pip install pandas ray[tune] ax-platform sqlalchemy scikit-optimize tensorboard tensorboardX
# Custom commands that will be run on the head node after common setup.
head_setup_commands:
 - pip install google-api-python-client==1.7.8
# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []
# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
   - ray stop
   - >-
     ulimit -n 65536;
     ray start
     --head
     --port=6379
     --object-manager-port=8076
     --autoscaling-config=~/ray_bootstrap_config.yaml
# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
   - ray stop
   - >-
     ulimit -n 65536;
     ray start
     --address=$RAY_HEAD_IP:6379
     --object-manager-port=8076
Many thanks.