I wanted to follow up on this with a related question:
When I run the following script, the code hangs at the end when it should be uploading the data to wandb.
import os
import gym
import numpy as np
from gym import spaces
from rank import compute_centered_ranks
# money distributed env where the agent gives the other n agents all money at once
class MoneyDistributionEnv(gym.Env):
def __init__(self, config_dict):
self.n = config_dict.get('n', 4)
self.e = config_dict.get('e', 1)
self.k = config_dict.get('k', 5)
self.money_limit = config_dict.get('money_limit', 100)
self.action_space = spaces.MultiDiscrete([self.money_limit for _ in range(self.n)])
self.observation_space = spaces.Box(low=0, high=self.money_limit, shape=(self.n,), dtype=float)
self.previous_money_distributions = [np.array([0, 0, 0, 0])]
self.ranking = config_dict.get('rank', False)
self.reset()
def step(self, action: np.ndarray):
info = {}
reward, done = self._get_reward(action)
# return to the agent the mean of the archive
# this should let the agent pick something new?
if not bool(self.previous_money_distributions):
obs = np.mean(np.array(self.previous_money_distributions), axis=0)
else:
obs = np.zeros(self.n)
return obs, reward, done, info
def reset(self, **kwargs):
if not bool(self.previous_money_distributions):
obs = np.mean(np.array(self.previous_money_distributions), axis=0)
else:
obs = np.zeros(self.n)
return obs
def _get_reward(self, raw_obs):
obs = raw_obs if not self.ranking else compute_centered_ranks(raw_obs)
if not bool(self.previous_money_distributions):
reward = 1
else:
previous_money_distributions = np.array(self.previous_money_distributions)
current_money_distribution = np.array(obs)
dists = np.linalg.norm(previous_money_distributions - current_money_distribution, axis=1)
sorted_dists = np.argsort(dists)
reward = np.mean(dists[sorted_dists][:self.k])
if np.sum(raw_obs) > self.money_limit:
reward = -1
obs = np.array([0, 0, 0, 0])
self.previous_money_distributions.append(obs)
return reward, True
if __name__ == '__main__':
import sys
import ray
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.models import ModelCatalog
from ray.tune.registry import register_env
from ray.tune import register_env, tune
from ray.air.callbacks.wandb import WandbLoggerCallback
from mlp import MLP
from callbacks import ActionTracker
sep = os.pathsep
os.environ['PYTHONPATH'] = sep.join(sys.path)
def make_env(config):
return MoneyDistributionEnv(config)
env_name = 'distribute_resources_reward_mean_5nn'
env_config = {
'n': 4,
'e': 1,
'money_limit': 100,
'rank': False
}
ModelCatalog.register_custom_model('mlp', MLP)
register_env(env_name, make_env)
ray.init()#local_mode=True)
config = PPOConfig()\
.rollouts(num_rollout_workers=1,
num_envs_per_worker=128)\
.callbacks(ActionTracker)\
.training(train_batch_size=1024,
model={
"custom_model": 'mlp'
},
lr=3e-4,
gamma=0.99,
lambda_=0.95,
use_gae=True,
clip_param=0.3,
grad_clip=None,
entropy_coeff=0.15,
vf_loss_coeff=0.6,
sgd_minibatch_size=128,
num_sgd_iter=4,
)\
.environment(env_config=env_config,
env=env_name,
clip_actions=True,
disable_env_checking=True)\
.framework(framework="torch")\
.resources(num_gpus=0)\
.evaluation(evaluation_interval=100,
evaluation_num_workers=1,
always_attach_evaluation_results=True)\
.checkpointing(export_native_model_files=True)
# env = make_env(env_config)
test_dir = f"./results/{env_name}"
result = tune.run(
"PPO",
name="PPO",
stop={"timesteps_total": 10000},
local_dir=test_dir,
config=config.to_dict(),
callbacks=[
WandbLoggerCallback(project="mapo", entity="aadharna",
save_checkpoints=True,
api_key="key")
]
)
ray.shutdown()
Any help here would be greatly appreciated.
ActionTracker callback:
class ActionTracker:
def on_episode_start(self, *, worker, base_env, policies, episode, env_index, **kwargs):
episode.custom_metrics['s1'] = 0
episode.custom_metrics['s2'] = 0
episode.custom_metrics['s3'] = 0
episode.custom_metrics['s4'] = 0
def on_episode_end(self, *, worker, base_env, policies, episode, env_index, **kwargs) -> None:
# last_action = episode.last_action_for('default_policy')
envs = self._get_envs(base_env)
last_action = envs[env_index].previous_money_distributions[-1]
episode.custom_metrics['s1'] = last_action[0]
episode.custom_metrics['s2'] = last_action[1]
episode.custom_metrics['s3'] = last_action[2]
episode.custom_metrics['s4'] = last_action[3]
MLP
import gym
import torch
import torch.nn as nn
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.torch_modelv2 import ModelConfigDict
from ray.rllib.utils.annotations import override
class MLP(TorchModelV2, nn.Module):
"""MLP"""
def __init__(self, obs_space: gym.spaces.Space,
action_space: gym.spaces.Space, num_outputs: int,
model_config: ModelConfigDict, name: str):
TorchModelV2.__init__(self,
obs_space,
action_space,
num_outputs,
model_config,
name)
nn.Module.__init__(self)
self.fc1 = nn.Linear(obs_space.shape[0], 128)
self.relu1 = nn.ReLU()
self.fc2 = nn.Linear(128, 128)
self.relu2 = nn.ReLU()
self.fc3 = nn.Linear(128, num_outputs)
self.value_fn = nn.Linear(128, 1)
@override(TorchModelV2)
def forward(self, input_dict, state, seq_lens):
x = input_dict['obs']
x = self.fc1(x)
x = self.relu1(x)
x = self.fc2(x)
x = self.relu2(x)
self.value = self.value_fn(x)
x = self.fc3(x)
return x, state
@override(TorchModelV2)
def value_function(self):
return self.value.squeeze(1)