Below is the function i use for the tune trainable:
def tune_train(config, conf, evalOb, checkpoint_dir = None):
trainer_type = conf["learn"]["trainer"].lower()
conf["learn"]["gamma"][0] = config["operator_gamma"]
conf["learn"]["training_batch_size"][0] = config["operator_batch_size"]
conf["model"][f"{trainer_type}_hidden_sizes"] = [config["hidden1"], config["hidden2"]]
conf['model'][f"{trainer_type}_dropout_rate"] = [max(config["dropout1"], 1), max(config["dropout2"], 1)]
conf["model"][f"{trainer_type}_activation"] = config["activation"]
if "n_steps_update" in config:
conf["learn"]["n_steps_update"] = config["n_steps_update"]
if checkpoint_dir is not None:
conf["directories"]["checkpoint"] = os.path.join(checkpoint_dir, "checkpoint")
conf["learn"]["load_checkpoint"] = True
problem = define_problem(conf)
trainer = define_trainer(conf, problem, evalOb)
trainer.train()
Inside train
, I wrap a model with distributedDataParallel and train for a number of steps. Periodically, I ran a validation episode in a child process on each rank and have rank 0 compute the mean steps/reward over all the ranks. I want only rank 0 to tune.report
the mean values. This works when rank 0 is the only rank, but the report call does not work when there are multiple ranks for each job.
I create the trainable like this:
trainer = DistributedTrainableCreator(
partial(tune_train, conf = conf, evalOb = evalOb),
use_gpu = False,
num_workers = conf["learn"]["world_size"],
num_cpus_per_worker = conf["learn"]["n_agents"] + 1,
backend = "gloo",
timeout_s = 3600 * 6
)
hyperparam_mutations = {
"operator_gamma": tune.uniform(0.9, 1),
"hidden1": [2 ** i for i in range(2, 10)],
"hidden2": [2 ** i for i in range(2, 10)],
"dropout1": tune.uniform(0.2, 0.7),
"dropout2": tune.uniform(0.3, 0.9),
"operator_batch_size": [2 ** i for i in range(5, 12)],
"activation": ["relu", "leakyRelu", "tanh"]
}
if conf["learn"]["trainer"] in ["DQN", "DRQN"]:
hyperparam_mutations["n_steps_update"] = [25 * i for i in range(2, 25)]
scheduler = PopulationBasedTraining(
hyperparam_mutations = hyperparam_mutations,
)
reporter = CLIReporter(
# parameter_columns = ["operator_gamma", "hidden1", "hidden2", "dropout1",
# "dropout2", "operator_batch_size", "activation"],
metric_columns = ["reward", "steps"],
print_intermediate_tables = True,
max_report_frequency = 60,
# metric = "reward",
# mode = "max",
)
analysis = tune.run(
trainer,
metric = "reward",
mode = "max",
name = conf["learn"]["experiment_name"] + "_tune",
local_dir = "./results",
num_samples = conf["learn"]["tune_population_size"],
scheduler = scheduler,
resources_per_trial = None,
progress_reporter = reporter
)
I would rather not use ray train because there is a lot of custom logic In my distributed train function.