Tune + RLLIB + Wandb integration

I am having trouble adequately integrating Ray Tune, Ray RLLIB and Wandb (Weights and Biases).

What I want to accomplish

I would like to be able to run a large tuning run using the ray.tune.Tunner API for a custom RL environment I have, and log this entire process to Weight and Biases.

What I have tried

I have tried using the examples provided for “Tune + RLLIB” and “Tune + wandb” to construct a script that will accomplish my goal, however this has not been successful.

Problem I have run into

  • Using the following allows tuning over an RL environment but will not log crucial data from the individual training runs.
tune.Tuner(
    "CartPole-V1",
    ...,
    run_config=train.RunConfig(callbacks=[wandb.WandbLoggerCallback(...))
)
  • Next I tried creating a Custom Trainable class that logs the data I need. However this has been troublesome because allocating resource to a Custom Trainable that is an RLLIB Algorithm is somewhat opaque. I get strange behavior where tune always reports that the trail is pending.

My main question

At this point I want to make sure I am even approaching this the correct way. There is no guide of how to use these three utilities together, and I am not well versed in the Ray ecosystem, so I’m not sure whether or not I am just doing something silly.

If this is the right path (defining a custom Trainable and logging everything), how do I ensure resources are allocated properly to this trainable.

Below is a version of my script.

One last note. This is my first time posting on the Ray Forums, so I apologize if this is not the normal format/tone for a post.

import ray
import ttenv
import wandb
import wandb.integration
import numpy as np
from ray import train, tune
from ray.air.integrations.wandb import WandbLoggerCallback, setup_wandb
from ray.rllib.algorithms.sac import SACConfig
from ray.tune.registry import register_env


ray.shutdown()
ray.init(ignore_reinit_error=True)


def env_creator(env_config):
    num_targets = env_config.get("num_targets", 1)
    map_name = env_config.get("map_name", "empty")
    is_training = env_config.get("is_training", True)
    known_noise = env_config.get("known_noise", True)
    return ttenv.make(
        "TargetTracking-Vanilla",
        render=False,
        record=False,
        ros=False,
        num_targets=num_targets,
        map_name=map_name,
        directory=".",
        is_training=is_training,
        im_size=28,
    )


register_env("target_tracking", env_creator)

def explore(config):
    # ensure we collect enough timesteps to do sgd
    if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
        config["train_batch_size"] = config["sgd_minibatch_size"] * 2
    # ensure we run at least one sgd iter
    if config["num_sgd_iter"] < 1:
        config["num_sgd_iter"] = 1
    return config


hyperparam_mutations = {
    "lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5],
    "num_sgd_iter": lambda: np.random.randint(1, 30),
    "sgd_minibatch_size": lambda: np.random.randint(128, 16384),
    "train_batch_size": lambda: np.random.randint(2000, 160000),
}
pbt = PopulationBasedTraining(
    time_attr="time_total_s",
    perturbation_interval=120,
    resample_probability=0.25,
    # Specifies the mutations of these hyperparams
    hyperparam_mutations=hyperparam_mutations,
    custom_explore_fn=explore,
)
stopping_criteria = {"training_iterations": 100}


class CustomSACTrainable(tune.Trainable):
    def setup(self, config):
        self.wandb = setup_wandb(
            config,
            trial_id=self.trial_id,
            trial_name=self.trial_name,
            project="DTL",
        )
        self.algo = ray.rllib.algorithms.sac.SAC(config=config)

    def step(self):
        results = self.algo.train()
        self.wandb.log(results)
        return results

    def save_checkpoint(self, *args, **kwargs):
        return super().save_checkpoint(*args, **kwargs)

    def load_checkpoint(self, checkpoint):
        return super().load_checkpoint(checkpoint)


tuner = tune.Tuner(
    # "SAC",
    tune.with_resources(CustomSACTrainable, {"GPU": 1, "CPU": 12}),
    tune_config=tune.TuneConfig(
        metric="env_runners/episode_reward_mean",
        mode="max",
        num_samples=1,
        max_concurrent_trials=1,
    ),
    param_space={
        "env": "target_tracking",
        "env_config": {"num_targets": 1},  # Example configuration
        # Additional configuration options
        "framework": "torch", 
        # Other relevant configurations
        "evaluation_interval": 1,  # Evaluate every epoch
        "evaluation_duration": 1000,  # Number of timesteps for evaluation
        "evaluation_duration_unit": "timesteps",
        "evaluation_config": {
            "env_config": {"num_targets": 1},  # Example configuration for evaluation
            "explore": False,  # Disable exploration during evaluation
        },
        "kl_coeff": 1.0,
        "num_workers": 4,
        "num_cpus": 12,  # number of CPUs to use per trial
        "num_gpus": 1,  # number of GPUs to use per trial
        "model": {"free_log_std": True},
        "lambda": 0.95,
        "clip_param": 0.2,
        "lr": 1e-4,
        "num_sgd_iter": tune.choice([10, 20, 30]),
        "sgd_minibatch_size": tune.choice([128, 512, 2048]),
        "train_batch_size": tune.choice([10000, 20000, 40000]),
    },
    run_config=train.RunConfig(
        stop=stopping_criteria,
        verbose=3,
    ),
)

results = tuner.fit()