Pytorch DistributedTrainable Tune Report on Rank 0 Only

I have a deep reinforcement learning setup where multiple processes work together to train a model using data from child processes. I am trying to call ray tune.report to run hyperparameter optimization. The metrics are computed in a distributed manner and than pushed to rank 0. Ideally, I would do

if rank == 0:
    tunee.report(...)

However, when running with multiple workers per job, the tables printed by tune do not reflex these updates. Is there a way to do this or do all ranks have to report the metrics?

Hey @Matthew could you provide a little bit more details here? What do you mean by the tables reflected by tune do not reflect these updates?

It would help if you could provide the training function that you are passing into tune. If you are calling tune.report on the rank 0 worker, then it must be run in the same process as the training function/Tune trainable and not in a separate actor. Let me know if this makes sense!

Also, if you are doing data parallel training with parallel hyperparameter tuning, I would recommend using Ray Train for this (Ray Train: Distributed Deep Learning — Ray v1.10.0). It has a 2 line integration for Tune and makes it very easy to aggregate metrics from all workers and report just the rank 0 worker stats to Tune.

Below is the function i use for the tune trainable:

def tune_train(config, conf, evalOb, checkpoint_dir = None):
    trainer_type = conf["learn"]["trainer"].lower()
    conf["learn"]["gamma"][0] = config["operator_gamma"]
    conf["learn"]["training_batch_size"][0] = config["operator_batch_size"]

    conf["model"][f"{trainer_type}_hidden_sizes"] = [config["hidden1"], config["hidden2"]]
    conf['model'][f"{trainer_type}_dropout_rate"] = [max(config["dropout1"], 1), max(config["dropout2"], 1)]
    conf["model"][f"{trainer_type}_activation"] = config["activation"]
    
    if "n_steps_update" in config:
        conf["learn"]["n_steps_update"] = config["n_steps_update"]
    
    if checkpoint_dir is not None:
        conf["directories"]["checkpoint"] = os.path.join(checkpoint_dir, "checkpoint")
        conf["learn"]["load_checkpoint"] = True

    problem = define_problem(conf)
    trainer = define_trainer(conf, problem, evalOb)
    trainer.train()

Inside train, I wrap a model with distributedDataParallel and train for a number of steps. Periodically, I ran a validation episode in a child process on each rank and have rank 0 compute the mean steps/reward over all the ranks. I want only rank 0 to tune.report the mean values. This works when rank 0 is the only rank, but the report call does not work when there are multiple ranks for each job.

I create the trainable like this:

 trainer = DistributedTrainableCreator(
        partial(tune_train, conf = conf, evalOb = evalOb),
        use_gpu = False,
        num_workers = conf["learn"]["world_size"],
        num_cpus_per_worker = conf["learn"]["n_agents"] + 1,
        backend = "gloo",
        timeout_s = 3600 * 6
    )

    hyperparam_mutations = {
        "operator_gamma": tune.uniform(0.9, 1),
        "hidden1": [2 ** i for i in range(2, 10)],
        "hidden2": [2 ** i for i in range(2, 10)],
        "dropout1": tune.uniform(0.2, 0.7),
        "dropout2": tune.uniform(0.3, 0.9),
        "operator_batch_size": [2 ** i for i in range(5, 12)],
        "activation": ["relu", "leakyRelu", "tanh"]
    }

    if conf["learn"]["trainer"] in ["DQN", "DRQN"]:
        hyperparam_mutations["n_steps_update"] = [25 * i for i in range(2, 25)]

    scheduler = PopulationBasedTraining(
        hyperparam_mutations = hyperparam_mutations,
    )

    reporter = CLIReporter(
        # parameter_columns = ["operator_gamma", "hidden1", "hidden2", "dropout1",
        # "dropout2", "operator_batch_size", "activation"],
        metric_columns = ["reward", "steps"],
        print_intermediate_tables = True,
        max_report_frequency = 60,
        # metric = "reward",
        # mode = "max",
        )

    analysis = tune.run(
        trainer,
        metric = "reward",
        mode = "max",
        name = conf["learn"]["experiment_name"] + "_tune",
        local_dir = "./results",
        num_samples = conf["learn"]["tune_population_size"],
        scheduler = scheduler,
        resources_per_trial = None,
        progress_reporter  = reporter
    )

I would rather not use ray train because there is a lot of custom logic In my distributed train function.

I was able to fix this by running the tune.report call on all distributed ranks.

Ah I see you are using the DistributedTrainables. In that case, yes calling tune.report on each rank works since internally it will report only from a single rank. But for the general case if you are not using DistributedTrainableCreator, you should report only from a single rank.

Also, I would still recommend using Ray Train, which is the replacement to DistributedTrainableCreator. If your code works with DistributedTrainableCreator then it will work with Ray Train. If this isn’t the case, then can you describe more about the custom logic that you have?

We are running deep reinforcement learning so our data comes from simulations running in child processes spawned by each rank. Also, we have custom distributed communication points to save learning curves and observations from the simulations. My goal is to wrap ray tune around that without changing a lot.

I have things working other than the checkpointing. See Checkpoint Discussion.

Even with Ray Train you can do distributed training with Ray Tune for distributed hyperparameter tuning without changing your code. Ray Train is the newer, better version of DistributedTrainables, and the DistributedTrainables are going to be deprecated soon in favor of Ray Train.

But again, even with Ray Train there is very little code change.