How to use the wandb logger callback when using algorithm api

I have a multi-trainer setup similar to ray/multi_agent_two_trainers.py at master · ray-project/ray · GitHub

When using the tune API, it’s clear how to use the wandb callback, but when using the algorithm api directly e.g.,

        checkpoints = {'hider': {}, 'seekers': {}}
        for k in tqdm(range(800)):
            w = hider_trainer.get_weights(["hider_1"])
            for j in range(n):
                seekers[j].set_weights({'2': w["hider_1"]})

            for i in range(12):
                # print(f'Iteration {i}')
                for j in range(n):
                    results = seekers[j].train()
                    seekers[j].log_result(results)

            # print('Training Hider')
            hider_trainer.set_weights({'seeker_1': seekers[0].get_weights()['1'],
                                       'seeker_2': seekers[1].get_weights()['1'],
                                       'seeker_3': seekers[2].get_weights()['1'],
                                       'seeker_4': seekers[3].get_weights()['1']})

            for i in range(9):
                w = hider_trainer.get_weights(['hider_1'])
                hider_trainer.set_weights({'hider_2': w['hider_1'],
                                           'hider_3': w['hider_1'],
                                           'hider_4': w['hider_1']})

                results = hider_trainer.train()
                hider_trainer.log_result(results)

            if k % 250 == 0:
                hider_checkpoint = hider_trainer.save()
                seeker_chkpoints = [seekers[i].save() for i in range(n)]
                checkpoints['hider'][k] = hider_checkpoint
                checkpoints['seekers'][k] = seeker_chkpoints

        # save checkpoint dict
        with open(os.path.join('.',
                               'results', f'{env_name}',
                               'checkpoints.pkl'), 'wb+') as f:
            import pickle
            pickle.dump(checkpoints, f)

what’s the correct way to send all the results to wandb?

cc: @avnishn Shall we add documentation (examples using Rllib+wandb?) for this?

ah thats a good ask.

I’m not sure if its possible to use the WANDB callback without using ray tune, since our wandb support is through ray tune only.

That’s unfortunate because if I can get wandb integrated then I’d feel more comfortable using a cluster setup to run my experiments.

Could you provide some pointers for how tune handles the wandb integration and maybe I can just recreate a localized version that that I handle in my runner file?

Thanks!

I wanted to follow up on this with a related question:

When I run the following script, the code hangs at the end when it should be uploading the data to wandb.

import os
import gym
import numpy as np
from gym import spaces

from rank import compute_centered_ranks


# money distributed env where the agent gives the other n agents all money at once
class MoneyDistributionEnv(gym.Env):
    def __init__(self, config_dict):
        self.n = config_dict.get('n', 4)
        self.e = config_dict.get('e', 1)
        self.k = config_dict.get('k', 5)
        self.money_limit = config_dict.get('money_limit', 100)
        self.action_space = spaces.MultiDiscrete([self.money_limit for _ in range(self.n)])
        self.observation_space = spaces.Box(low=0, high=self.money_limit, shape=(self.n,), dtype=float)
        self.previous_money_distributions = [np.array([0, 0, 0, 0])]
        self.ranking = config_dict.get('rank', False)
        self.reset()

    def step(self, action: np.ndarray):
        info = {}
        reward, done = self._get_reward(action)
        # return to the agent the mean of the archive
        # this should let the agent pick something new?
        if not bool(self.previous_money_distributions):
            obs = np.mean(np.array(self.previous_money_distributions), axis=0)
        else:
            obs = np.zeros(self.n)
        return obs, reward, done, info

    def reset(self, **kwargs):
        if not bool(self.previous_money_distributions):
            obs = np.mean(np.array(self.previous_money_distributions), axis=0)
        else:
            obs = np.zeros(self.n)
        return obs

    def _get_reward(self, raw_obs):
        obs = raw_obs if not self.ranking else compute_centered_ranks(raw_obs)
        if not bool(self.previous_money_distributions):
            reward = 1
        else:
            previous_money_distributions = np.array(self.previous_money_distributions)
            current_money_distribution = np.array(obs)
            dists = np.linalg.norm(previous_money_distributions - current_money_distribution, axis=1)
            sorted_dists = np.argsort(dists)
            reward = np.mean(dists[sorted_dists][:self.k])
        if np.sum(raw_obs) > self.money_limit:
            reward = -1
            obs = np.array([0, 0, 0, 0])
        self.previous_money_distributions.append(obs)
        return reward, True


if __name__ == '__main__':
    
    import sys
    import ray
    from ray.rllib.algorithms.ppo import PPOConfig
    from ray.rllib.models import ModelCatalog
    from ray.tune.registry import register_env
    from ray.tune import register_env, tune
    from ray.air.callbacks.wandb import WandbLoggerCallback
    from mlp import MLP
    from callbacks import ActionTracker

    sep = os.pathsep
    os.environ['PYTHONPATH'] = sep.join(sys.path)

    def make_env(config):
        return MoneyDistributionEnv(config)

    env_name = 'distribute_resources_reward_mean_5nn'
    env_config = {
        'n': 4,
        'e': 1,
        'money_limit': 100,
        'rank': False
    }
    ModelCatalog.register_custom_model('mlp', MLP)
    register_env(env_name, make_env)

    ray.init()#local_mode=True)
    config = PPOConfig()\
        .rollouts(num_rollout_workers=1,
              num_envs_per_worker=128)\
        .callbacks(ActionTracker)\
        .training(train_batch_size=1024,
                  model={
            "custom_model": 'mlp'
        },
              lr=3e-4,
              gamma=0.99,
              lambda_=0.95,
              use_gae=True,
              clip_param=0.3,
              grad_clip=None,
              entropy_coeff=0.15,
              vf_loss_coeff=0.6,
              sgd_minibatch_size=128,
              num_sgd_iter=4,
              )\
        .environment(env_config=env_config,
                 env=env_name,
                 clip_actions=True,
                 disable_env_checking=True)\
        .framework(framework="torch")\
        .resources(num_gpus=0)\
        .evaluation(evaluation_interval=100,
                evaluation_num_workers=1,
                always_attach_evaluation_results=True)\
        .checkpointing(export_native_model_files=True)

    # env = make_env(env_config)
    test_dir = f"./results/{env_name}"
    result = tune.run(
        "PPO",
        name="PPO",
        stop={"timesteps_total": 10000},
        local_dir=test_dir,
        config=config.to_dict(),
        callbacks=[
            WandbLoggerCallback(project="mapo", entity="aadharna",
                                save_checkpoints=True,
                                api_key="key")
        ]
    )
    ray.shutdown()

Any help here would be greatly appreciated.

ActionTracker callback:

class ActionTracker:

    def on_episode_start(self, *, worker, base_env, policies, episode, env_index, **kwargs):
        episode.custom_metrics['s1'] = 0
        episode.custom_metrics['s2'] = 0
        episode.custom_metrics['s3'] = 0
        episode.custom_metrics['s4'] = 0

    def on_episode_end(self, *, worker, base_env, policies, episode, env_index, **kwargs) -> None:
        # last_action = episode.last_action_for('default_policy')
        envs = self._get_envs(base_env)
        last_action = envs[env_index].previous_money_distributions[-1]
        episode.custom_metrics['s1'] = last_action[0]
        episode.custom_metrics['s2'] = last_action[1]
        episode.custom_metrics['s3'] = last_action[2]
        episode.custom_metrics['s4'] = last_action[3]

MLP

import gym
import torch
import torch.nn as nn
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.torch_modelv2 import ModelConfigDict
from ray.rllib.utils.annotations import override


class MLP(TorchModelV2, nn.Module):
    """MLP"""
    def __init__(self, obs_space: gym.spaces.Space,
                 action_space: gym.spaces.Space, num_outputs: int,
                 model_config: ModelConfigDict, name: str):

        TorchModelV2.__init__(self,
                              obs_space,
                              action_space,
                              num_outputs,
                              model_config,
                              name)
        nn.Module.__init__(self)

        self.fc1 = nn.Linear(obs_space.shape[0], 128)
        self.relu1 = nn.ReLU()
        self.fc2 = nn.Linear(128, 128)
        self.relu2 = nn.ReLU()
        self.fc3 = nn.Linear(128, num_outputs)
        self.value_fn = nn.Linear(128, 1)

    @override(TorchModelV2)
    def forward(self, input_dict, state, seq_lens):
        x = input_dict['obs']
        x = self.fc1(x)
        x = self.relu1(x)
        x = self.fc2(x)
        x = self.relu2(x)
        self.value = self.value_fn(x)
        x = self.fc3(x)
        return x, state

    @override(TorchModelV2)
    def value_function(self):
        return self.value.squeeze(1)

When I removed the “Save checkpoint” flag in the WandBCallback, the graphs successfully pushed up to w&b! (granted this has been a slightly different use case than the original one in the post. So, this isn’t a solution to the full post but is a solution to the additional comment I made the other day.