I am having trouble adequately integrating Ray Tune, Ray RLLIB and Wandb (Weights and Biases).
What I want to accomplish
I would like to be able to run a large tuning run using the ray.tune.Tunner
API for a custom RL environment I have, and log this entire process to Weight and Biases.
What I have tried
I have tried using the examples provided for “Tune + RLLIB” and “Tune + wandb” to construct a script that will accomplish my goal, however this has not been successful.
Problem I have run into
- Using the following allows tuning over an RL environment but will not log crucial data from the individual training runs.
tune.Tuner(
"CartPole-V1",
...,
run_config=train.RunConfig(callbacks=[wandb.WandbLoggerCallback(...))
)
- Next I tried creating a Custom Trainable class that logs the data I need. However this has been troublesome because allocating resource to a Custom Trainable that is an RLLIB Algorithm is somewhat opaque. I get strange behavior where tune always reports that the trail is pending.
My main question
At this point I want to make sure I am even approaching this the correct way. There is no guide of how to use these three utilities together, and I am not well versed in the Ray ecosystem, so I’m not sure whether or not I am just doing something silly.
If this is the right path (defining a custom Trainable and logging everything), how do I ensure resources are allocated properly to this trainable.
Below is a version of my script.
One last note. This is my first time posting on the Ray Forums, so I apologize if this is not the normal format/tone for a post.
import ray
import ttenv
import wandb
import wandb.integration
import numpy as np
from ray import train, tune
from ray.air.integrations.wandb import WandbLoggerCallback, setup_wandb
from ray.rllib.algorithms.sac import SACConfig
from ray.tune.registry import register_env
ray.shutdown()
ray.init(ignore_reinit_error=True)
def env_creator(env_config):
num_targets = env_config.get("num_targets", 1)
map_name = env_config.get("map_name", "empty")
is_training = env_config.get("is_training", True)
known_noise = env_config.get("known_noise", True)
return ttenv.make(
"TargetTracking-Vanilla",
render=False,
record=False,
ros=False,
num_targets=num_targets,
map_name=map_name,
directory=".",
is_training=is_training,
im_size=28,
)
register_env("target_tracking", env_creator)
def explore(config):
# ensure we collect enough timesteps to do sgd
if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
config["train_batch_size"] = config["sgd_minibatch_size"] * 2
# ensure we run at least one sgd iter
if config["num_sgd_iter"] < 1:
config["num_sgd_iter"] = 1
return config
hyperparam_mutations = {
"lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5],
"num_sgd_iter": lambda: np.random.randint(1, 30),
"sgd_minibatch_size": lambda: np.random.randint(128, 16384),
"train_batch_size": lambda: np.random.randint(2000, 160000),
}
pbt = PopulationBasedTraining(
time_attr="time_total_s",
perturbation_interval=120,
resample_probability=0.25,
# Specifies the mutations of these hyperparams
hyperparam_mutations=hyperparam_mutations,
custom_explore_fn=explore,
)
stopping_criteria = {"training_iterations": 100}
class CustomSACTrainable(tune.Trainable):
def setup(self, config):
self.wandb = setup_wandb(
config,
trial_id=self.trial_id,
trial_name=self.trial_name,
project="DTL",
)
self.algo = ray.rllib.algorithms.sac.SAC(config=config)
def step(self):
results = self.algo.train()
self.wandb.log(results)
return results
def save_checkpoint(self, *args, **kwargs):
return super().save_checkpoint(*args, **kwargs)
def load_checkpoint(self, checkpoint):
return super().load_checkpoint(checkpoint)
tuner = tune.Tuner(
# "SAC",
tune.with_resources(CustomSACTrainable, {"GPU": 1, "CPU": 12}),
tune_config=tune.TuneConfig(
metric="env_runners/episode_reward_mean",
mode="max",
num_samples=1,
max_concurrent_trials=1,
),
param_space={
"env": "target_tracking",
"env_config": {"num_targets": 1}, # Example configuration
# Additional configuration options
"framework": "torch",
# Other relevant configurations
"evaluation_interval": 1, # Evaluate every epoch
"evaluation_duration": 1000, # Number of timesteps for evaluation
"evaluation_duration_unit": "timesteps",
"evaluation_config": {
"env_config": {"num_targets": 1}, # Example configuration for evaluation
"explore": False, # Disable exploration during evaluation
},
"kl_coeff": 1.0,
"num_workers": 4,
"num_cpus": 12, # number of CPUs to use per trial
"num_gpus": 1, # number of GPUs to use per trial
"model": {"free_log_std": True},
"lambda": 0.95,
"clip_param": 0.2,
"lr": 1e-4,
"num_sgd_iter": tune.choice([10, 20, 30]),
"sgd_minibatch_size": tune.choice([128, 512, 2048]),
"train_batch_size": tune.choice([10000, 20000, 40000]),
},
run_config=train.RunConfig(
stop=stopping_criteria,
verbose=3,
),
)
results = tuner.fit()