Not a Directory error when loading checkpoint population based training

I am connecting a distributed pytorch trainanble to population based training like so:


def tune_train(config, conf, evalOb, checkpoint_dir = None):
    trainer_type = conf["learn"]["trainer"].lower()
    conf["learn"]["gamma"][0] = config["operator_gamma"]
    conf["learn"]["training_batch_size"][0] = config["operator_batch_size"]

    conf["model"][f"{trainer_type}_hidden_sizes"] = [config["hidden1"], config["hidden2"]]
    conf['model'][f"{trainer_type}_dropout_rate"] = [max(config["dropout1"], 1), max(config["dropout2"], 1)]
    conf["model"][f"{trainer_type}_activation"] = config["activation"]
    
    conf["learn"][f"{trainer_type}_lr"][0] = config["lr"]
    conf["learn"][f"{trainer_type}_lr_decay_rate"][0] = config["lr_decay_rate"]
    conf["learn"][f"{trainer_type}_lr_decay_freq"][0] = config["lr_decay_freq"]

    if "n_steps_update" in config:
        conf["learn"]["n_steps_update"] = config["n_steps_update"]
    
    if checkpoint_dir is not None:
        print(checkpoint_dir, "\n\n===================================", flush=True)
        conf["directories"]["checkpoint"] = os.path.join(checkpoint_dir, "checkpoint")
        conf["learn"]["load_checkpoint"] = True

    problem = define_problem(conf)
    trainer = define_trainer(conf, problem, evalOb)
    trainer.train()


def tune_optimize(conf, evalOb):
    if "ip_head" in os.environ:
        ray.init(address='auto', _node_ip_address=os.environ["ip_head"].split(":")[0], _redis_password=os.environ["redis_password"])
    
    if "action_path" in conf["problem"]:
        conf["problem"]["action_path"] = os.path.abspath(conf["problem"]["action_path"])

    for dir_ in conf["directories"]:
        conf["directories"][dir_] = os.path.abspath(conf['directories'][dir_])

    # assert conf["problem"]["learning_curve_frequency"] > 0, "Hyperparameter optimization requires a learning cure actor process  (learning curve frequency > 0)."

    trainer = DistributedTrainableCreator(
        partial(tune_train, conf = conf, evalOb = evalOb),
        use_gpu = False,
        num_workers = conf["learn"]["world_size"],
        num_cpus_per_worker = conf["learn"]["n_agents"] + 1,
        backend = "gloo",
        timeout_s = 3600 * 6
    )

    hyperparam_mutations = {
        "operator_gamma": tune.uniform(0.9, 1),
        "hidden1": [2 ** i for i in range(2, 10)],
        "hidden2": [2 ** i for i in range(2, 10)],
        "dropout1": tune.uniform(0.2, 0.7),
        "dropout2": tune.uniform(0.3, 0.9),
        "operator_batch_size": [2 ** i for i in range(5, 12)],
        "activation": ["relu", "leakyRelu", "tanh"],
        "lr": [10 ** -i for i in range(2, 6)],
        "lr_decay_rate": [0.1 * i for i in range(1, 11)],
        "lr_decay_freq": [500 * i for i in range(1, 10)],
    }

    if conf["learn"]["trainer"] in ["DQN", "DRQN"]:
        hyperparam_mutations["n_steps_update"] = [25 * i for i in range(2, 25)]

    search_alg = BayesOptSearch(metric = "reward", mode = "max")
    scheduler = PopulationBasedTraining(
        hyperparam_mutations = hyperparam_mutations,
    )

    reporter = CLIReporter(
        # parameter_columns = ["operator_gamma", "hidden1", "hidden2", "dropout1",
        # "dropout2", "operator_batch_size", "activation"],
        metric_columns = ["reward", "steps"],
        print_intermediate_tables = True,
        max_report_frequency = 60,
        # metric = "reward",
        # mode = "max",
        )

    analysis = tune.run(
        trainer,
        metric = "reward",
        mode = "max",
        name = conf["learn"]["experiment_name"] + "_tune",
        # local_dir = "./results",
        num_samples = conf["learn"]["tune_population_size"],
        # config = search_space,
        # search_alg = search_alg,
        scheduler = scheduler,
        resources_per_trial = None,
        progress_reporter  = reporter
    )

Inside trainer.train, I check whether load_checkpoint is true, and if so, I load from the checkpoint directory. However, I continue to see the following:

A trail is checkpointed at /WrappedDistributedTorchTrainable_600a8_00000_0_2022-02-22_13-07-00/worker_0/checkpoint_001000
Then, another trial receives a checkpoint_dir value to start a previous run. However, the checkpoint_dir name looks like /WrappedDistributedTorchTrainable_600a8_00000_0_2022-02-22_13-07-00/worker_0/checkpoint_tmp156ab9/./. Where the main directory is correct but the specific checkout contains “tmp…” when this was not part of any previous checkpoints that were created. I’m not sure what is happening and could use some advice.

Porting over conversation from slack channel.

Hi i am trying to run tune over a pytorch distributed group with population based training. Currently, I have the tune set up and running my custom training code. I start with

    analysis = tune.run(
        trainer,
        metric = "reward",
        mode = "max",
        name = conf["learn"]["experiment_name"] + "_tune",
        local_dir = "./results",
        num_samples = conf["learn"]["tune_population_size"],
        # config = search_space,
        # search_alg = search_alg,
        scheduler = scheduler,
        resources_per_trial = None,
        sync_config = tune.SyncConfig(
            syncer = None  # Disable syncing
        ),
        progress_reporter = reporter
    )

Then, in trainer, I have

def tune_train(config, conf, evalOb, checkpoint_dir = None):
    trainer_type = conf["learn"]["trainer"].lower()
    conf["learn"]["gamma"][0] = config["operator_gamma"]
    conf["learn"]["training_batch_size"][0] = config["operator_batch_size"]

    conf["model"][f"{trainer_type}_hidden_sizes"] = [config["hidden1"], config["hidden2"]]
    conf['model'][f"{trainer_type}_dropout_rate"] = [max(config["dropout1"], 1), max(config["dropout2"], 1)]
    conf["model"][f"{trainer_type}_activation"] = config["activation"]
    
    conf["learn"][f"{trainer_type}_lr"][0] = config["lr"]
    conf["learn"][f"{trainer_type}_lr_decay_rate"][0] = config["lr_decay_rate"]
    conf["learn"][f"{trainer_type}_lr_decay_freq"][0] = config["lr_decay_freq"]

    if "n_steps_update" in config:
        conf["learn"]["n_steps_update"] = config["n_steps_update"]
    
    if checkpoint_dir is not None:
        print(checkpoint_dir, "\n\n===================================", flush=True)
        conf["directories"]["checkpoint"] = os.path.join(checkpoint_dir, "checkpoint")
        conf["learn"]["load_checkpoint"] = True

    problem = define_problem(conf)
    trainer = define_trainer(conf, problem, evalOb)
    trainer.train()

which reads in a checkpoint.
Finally, in trainer.train, I call

model_info = self._save_checkpoint(train_type)
        if ray.is_initialized():
            with distributed_checkpoint_dir(step = self.learning_steps_completed[0]) as checkpoint_dir:
                checkpoint_path = os.path.join(checkpoint_dir, f"{self.checkpoint_name}_{train_type}.pt")
                print("Checkpointing", checkpoint_path, flush=True)
                torch.save(model_info, checkpoint_path)

A trail is checkpointed at /WrappedDistributedTorchTrainable_600a8_00000_0_2022-02-22_13-07-00/worker_0/checkpoint_001000
Then, another trial receives a checkpoint_dir value to start a previous run. However, the checkpoint_dir name looks like /WrappedDistributedTorchTrainable_600a8_00000_0_2022-02-22_13-07-00/worker_0/checkpoint_tmp156ab9/./. Where the main directory is correct but the specific checkout contains “tmp…” when this was not part of any previous checkpoints that were created. I’m not sure what is happening and could use some advice.

From Xiaowei:
Hi, short answer is this is expected. Long answer is:
Say we have a good performing trial - trialA that is checkpointed on machine A and we want to start another trial (trialB) from A’s checkpoint and add a little mutation on machine B. What really happens is:

  1. trialA is checkpointed to a temporary file on machine A
  2. the temporary checkpoint is loaded into Ray’s global object store
  3. so it is accessible on machine B as well
  4. transform the object store checkpoint into a temporary file on machine B
  5. restore from that temporary file on machine B (this is where you see that tmp folder)

Implementation details aside, are you running into issues?
I also want to mention that we have recently introduced Ray Train for distributed training. You may want to give it a try. It also supports seamless co-operation with Ray Tune.

From Matthew:
Entire stacktrace

(WrappedDistributedTorchTrainable pid=54757) 2022-02-23 09:03:37,474    INFO trainable.py:472 -- Restored on 192.168.128.2 from checkpoint: /usr/WS1/landen2/drl/drl/results/ieee14_tune/WrappedDistributedTorchTrainable_208a4_00003_3_2022-02-23_09-00-39/tmpuxw0xslerestore_from_object/./
(WrappedDistributedTorchTrainable pid=54757) 2022-02-23 09:03:37,475    INFO trainable.py:480 -- Current state after restoring: {'_iteration': 1, '_timesteps_total': None, '_time_total': 133.75925588607788, '_episodes_total': None}
(func pid=54731) 2022-02-23 09:03:37,473        INFO trainable.py:472 -- Restored on 192.168.128.2 from checkpoint: /usr/WS1/landen2/drl/drl/results/ieee14_tune/WrappedDistributedTorchTrainable_208a4_00003_3_2022-02-23_09-00-39/worker_0/checkpoint_tmp2329bf/./
(func pid=54731) 2022-02-23 09:03:37,474        INFO trainable.py:480 -- Current state after restoring: {'_iteration': 1, '_timesteps_total': None, '_time_total': 133.75925588607788, '_episodes_total': None}
(func pid=54731) Traceback (most recent call last):
(func pid=54731)   File "../drl/trainer/distributed_trainer.py", line 132, in train
(func pid=54731)     self.do_train()
(func pid=54731)   File "../drl/trainer/distributed_trainer.py", line 171, in do_train
(func pid=54731)     self.load_checkpoint()
(func pid=54731)   File "../drl/trainer/dqn_trainer.py", line 202, in load_checkpoint
(func pid=54731)     model_info = torch.load(checkpoint_path)
(func pid=54731)   File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/torch/serialization.py", line 594, in load
(func pid=54731)     with _open_file_like(f, 'rb') as opened_file:
(func pid=54731)   File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/torch/serialization.py", line 230, in _open_file_like
(func pid=54731)     return _open_file(name_or_buffer, mode)
(func pid=54731)   File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/torch/serialization.py", line 211, in __init__
(func pid=54731)     super(_open_file, self).__init__(open(name, mode))
(func pid=54731) FileNotFoundError: [Errno 2] No such file or directory: '/usr/WS1/landen2/drl/drl/results/ieee14_tune/WrappedDistributedTorchTrainable_208a4_00003_3_2022-02-23_09-00-39/worker_0/checkpoint_tmp2329bf/./checkpoint/ieee14_0.pt'
(func pid=54749) 2022-02-23 09:03:37,462        INFO trainable.py:472 -- Restored on 192.168.128.2 from checkpoint: /usr/WS1/landen2/drl/drl/results/ieee14_tune/WrappedDistributedTorchTrainable_208a4_00003_3_2022-02-23_09-00-39/worker_1/checkpoint_tmp848714/./
(func pid=54749) 2022-02-23 09:03:37,462        INFO trainable.py:480 -- Current state after restoring: {'_iteration': 1, '_timesteps_total': None, '_time_total': 133.75925588607788, '_episodes_total': None}
(func pid=54749) Traceback (most recent call last):
(func pid=54749)   File "../drl/trainer/distributed_trainer.py", line 132, in train
(func pid=54749)     self.do_train()
(func pid=54749)   File "../drl/trainer/distributed_trainer.py", line 178, in do_train
(func pid=54749)     self.make_groups()
(func pid=54749)   File "../drl/trainer/distributed_trainer.py", line 128, in make_groups
(func pid=54749)     self.groups.append(dist.new_group([0, i]))
(func pid=54749)   File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 2898, in new_group
(func pid=54749)     pg = _new_process_group_helper(
(func pid=54749)   File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/torch/distributed/distributed_c10d.py", line 684, in _new_process_group_helper
(func pid=54749)     pg = ProcessGroupGloo(prefix_store, rank, world_size, timeout=timeout)
(func pid=54749) RuntimeError: Connection reset by peer
2022-02-23 09:03:38,689 ERROR trial_runner.py:927 -- Trial WrappedDistributedTorchTrainable_208a4_00003: Error processing event.
Traceback (most recent call last):
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/tune/trial_runner.py", line 893, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 707, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/worker.py", line 1733, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(TuneError): ray::WrappedDistributedTorchTrainable.train() (pid=54757, ip=192.168.128.2, repr=<ray.tune.integration.torch.WrappedDistributedTorchTrainable object at 0x2ab4cb576480>)
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/tune/trainable.py", line 315, in train
    result = self.step()
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/tune/integration/torch.py", line 120, in step
    result = ray.get([w.step.remote() for w in self.workers])[0]
ray.exceptions.RayTaskError(TuneError): ray::ImplicitFunc.step() (pid=54731, ip=192.168.128.2, repr=func)
  File "/usr/workspace/landen2/prefix/toss_3_x86_64_ib/conda-2020.11/lib/python3.8/site-packages/ray/tune/function_runner.py", line 388, in step
    raise TuneError(
ray.tune.error.TuneError: Wrapped function ran until completion without reporting results or raising an exception.

Hi Matthew,
Can you compare your set up with this?
This is adapted from one of our examples, using PBT scheduler as well as distributed training.

# Original Code here:
# https://github.com/pytorch/examples/blob/master/mnist/main.py
import argparse
import logging
import os
import torch
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel

from ray.tune.schedulers import PopulationBasedTraining

import ray
from ray import tune
from ray.tune.examples.mnist_pytorch import train, test, get_data_loaders, ConvNet
from ray.tune.integration.torch import (
    DistributedTrainableCreator,
    distributed_checkpoint_dir,
)

logger = logging.getLogger(__name__)


def train_mnist(config, checkpoint_dir=False):
    use_cuda = torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    train_loader, test_loader = get_data_loaders()
    model = ConvNet().to(device)
    optimizer = optim.SGD(model.parameters(), lr=0.1)

    if checkpoint_dir:
        with open(os.path.join(checkpoint_dir, "checkpoint")) as f:
            model_state, optimizer_state = torch.load(f)

        model.load_state_dict(model_state)
        optimizer.load_state_dict(optimizer_state)

    model = DistributedDataParallel(model)

    for epoch in range(40):
        train(model, optimizer, train_loader, device)
        acc = test(model, test_loader, device)

        if epoch % 3 == 0:
            with distributed_checkpoint_dir(step=epoch) as checkpoint_dir:
                path = os.path.join(checkpoint_dir, "checkpoint")
                torch.save((model.state_dict(), optimizer.state_dict()), path)
        tune.report(mean_accuracy=acc)


def run_ddp_tune(num_workers, num_gpus_per_worker, workers_per_node=None):
    trainable_cls = DistributedTrainableCreator(
        train_mnist,
        num_workers=num_workers,
        num_gpus_per_worker=num_gpus_per_worker,
        num_workers_per_host=workers_per_node,
    )

    pbt = PopulationBasedTraining(
        time_attr="training_iteration",
        perturbation_interval=1,
    )
    analysis = tune.run(
        trainable_cls,
        num_samples=4,
        stop={"training_iteration": 10},
        metric="mean_accuracy",
        mode="max",
        scheduler=pbt
    )

    print("Best hyperparameters found were: ", analysis.best_config)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--num-workers",
        "-n",
        type=int,
        default=2,
        help="Sets number of workers for training.",
    )
    parser.add_argument(
        "--num-gpus-per-worker",
        type=int,
        default=0,
        help="Sets number of gpus each worker uses.",
    )
    parser.add_argument(
        "--cluster",
        action="store_true",
        default=False,
        help="enables multi-node tuning",
    )
    parser.add_argument(
        "--workers-per-node",
        type=int,
        help="Forces workers to be colocated on machines if set.",
    )
    parser.add_argument(
        "--server-address",
        type=str,
        default=None,
        required=False,
        help="The address of server to connect to if using " "Ray Client.",
    )

    args = parser.parse_args()

    if args.server_address is not None:
        ray.util.connect(args.server_address)
    else:
        if args.cluster:
            options = dict(address="auto")
        else:
            options = dict(num_cpus=2)
        ray.init(**options)

    run_ddp_tune(
        num_workers=args.num_workers,
        num_gpus_per_worker=args.num_gpus_per_worker,
        workers_per_node=args.workers_per_node,
    )

I was able to see the trial being restored and the checkpoint is loaded from a directory that looks like '/Users/xwjiang/ray_results/WrappedDistributedTorchTrainable_2022-02-23_11-24-34/WrappedDistributedTorchTrainable_3bbd9_00000_0_2022-02-23_11-24-35/worker_0/checkpoint_tmp382041/./'