Hi!
I am trying to make a centralised critic PPO for the waterworld environment from Pettingzoo[sisl]
https://www.pettingzoo.ml/sisl/waterworld, that is a continuous multiagent environment with:
-
Observation space: Box(low=np.float32(-np.sqrt(2)), high=np.float32(2 * np.sqrt(2)), shape=(self._obs_dim,), dtype=np.float32)
-
Action space: Box(low=np.float32(-self._max_accel), high=np.float32(self._max_accel), shape=(2,), dtype=np.float32)
I used code from the centralized_critic.py example and this github issue https://github.com/ray-project/ray/issues/12851.
The error I get is: raise ValueError('Input ’ + str(input_index) +
ValueError: Input 0 is incompatible with layer model_1: expected shape=(None, 162), found shape=(None, 242)
The observation space shape for 5 agents is 162. I am new to Rllib so it probably is not a bug, but something I don’t understand, therefore I would appreciate any help. Also I am not using one_hot since the environment is continuous, but I am not sure about it and I would be happy if someone could clarify this, or inform about other things I should change.
I ran it with all the latest versions and in Ubuntu 18.04 (also got the same error when testing it in windows 10)
Best regards,
George
My code:
import argparse
import numpy as np
import os
import ray
from ray import tune
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy, KLCoeffMixin, \
ppo_surrogate_loss as tf_loss
from ray.rllib.agents.ppo.ppo_torch_policy import PPOTorchPolicy, \
KLCoeffMixin as TorchKLCoeffMixin, ppo_surrogate_loss as torch_loss
from ray.rllib.evaluation.postprocessing import compute_advantages, \
Postprocessing
from ray.rllib.examples.models.centralized_critic_models import \
CentralizedCriticModel, TorchCentralizedCriticModel
from ray.rllib.models import ModelCatalog
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy import LearningRateSchedule, \
EntropyCoeffSchedule
from ray.rllib.policy.torch_policy import LearningRateSchedule as TorchLR, \
EntropyCoeffSchedule as TorchEntropyCoeffSchedule
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.rllib.utils.tf_ops import explained_variance, make_tf_callable
from ray.rllib.utils.torch_ops import convert_to_torch_tensor
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.models.torch.misc import SlimFC
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.tune.registry import register_env
from ray.rllib.env.pettingzoo_env import PettingZooEnv
from pettingzoo.sisl import waterworld_v3
tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()
################### environment initialisation #############################
n_pursuers = 5
n_sensors = 20
obs_coord = n_sensors * (5 + 3) # 3 for speed features enabled (default)
obs_dim = obs_coord + 2 # obs_dim = 162 for 5 pursuers (agents)
act_dim = 2
################## TF model #################################################
class CentralizedCriticModel(TFModelV2):
"""Multi-agent model that implements a centralized value function."""
def __init__(self, obs_space, action_space, num_outputs, model_config,name):
super(CentralizedCriticModel, self).__init__(obs_space, action_space, num_outputs, model_config, name)
# Base of the model
self.model = FullyConnectedNetwork(obs_space, action_space, num_outputs, model_config, name)
self.register_variables(self.model.variables())
n_agents = n_pursuers # ---> opp_obs and opp_acts now consist of 4 (n_puesuers - 1)
# obs = obs_dim
# act = 2
opp_obs_dim = obs_dim * (n_agents - 1)
opp_acts_dim = act_dim * (n_agents - 1)
# Central VF maps (obs, opp_obs, opp_act) -> vf_pred
obs = tf.keras.layers.Input(shape=(obs_dim, ), name="obs")
opp_obs = tf.keras.layers.Input(shape=(opp_obs_dim, ), name="opp_obs")
opp_act = tf.keras.layers.Input(shape=(opp_acts_dim, ), name="opp_act")
concat_obs = tf.keras.layers.Concatenate(axis=1)([obs, opp_obs, opp_act])
central_vf_dense = tf.keras.layers.Dense(16, activation=tf.nn.tanh, name="c_vf_dense")(concat_obs)
central_vf_out = tf.keras.layers.Dense(1, activation=None, name="c_vf_out")(central_vf_dense)
self.central_vf = tf.keras.Model(inputs=[obs, opp_obs, opp_act], outputs=central_vf_out)
self.register_variables(self.central_vf.variables)
@override(ModelV2)
def forward(self, input_dict, state, seq_lens):
return self.model.forward(input_dict, state, seq_lens)
# def central_value_function(self, obs, opponent_obs, opponent_actions):
# return tf.reshape(
# self.central_vf([
# obs, opponent_obs,
# tf.one_hot(tf.cast(opponent_actions, tf.int32), 2) # waterworld has 2 actions
# ]), [-1])
def central_value_function(self, obs, opponent_obs, opponent_actions):
return tf.reshape(
self.central_vf([
obs, opponent_obs, opponent_actions]), [-1])
@override(ModelV2)
def value_function(self):
return self.model.value_function() # not used
################## Torch model #################################################
class TorchCentralizedCriticModel(TorchModelV2, nn.Module):
"""Multi-agent model that implements a centralized VF."""
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
model_config, name)
nn.Module.__init__(self)
n_agents = n_pursuers # ---> opp_obs and opp_acts now consist of 4 (n_puesuers - 1) different agent information
# obs = obs_dim
# act = 2
opp_obs_dim = obs_dim * (n_agents - 1)
opp_acts_dim = act_dim * (n_agents - 1)
# Base of the model
self.model = TorchFC(obs_space, action_space, num_outputs,
model_config, name)
# Central VF maps (obs, opp_obs, opp_act) -> vf_pred
input_size = obs_dim + opp_obs_dim + opp_acts_dim # obs + opp_obs + opp_act
self.central_vf = nn.Sequential(
SlimFC(input_size, 16, activation_fn=nn.Tanh),
SlimFC(16, 1),
)
@override(ModelV2)
def forward(self, input_dict, state, seq_lens):
model_out, _ = self.model(input_dict, state, seq_lens)
return model_out, []
# def central_value_function(self, obs, opponent_obs, opponent_actions):
# input_ = torch.cat([
# obs, opponent_obs,
# torch.nn.functional.one_hot(opponent_actions.long(), 2).float()
# ], 1)
# return torch.reshape(self.central_vf(input_), [-1])
def central_value_function(self, obs, opponent_obs, opponent_actions):
input_ = torch.cat([obs, opponent_obs, opponent_actions], 1)
return torch.reshape(self.central_vf(input_), [-1])
@override(ModelV2)
def value_function(self):
return self.model.value_function() # not used
##################################################################################
OPPONENT_OBS = "opponent_obs"
OPPONENT_ACTION = "opponent_action"
parser = argparse.ArgumentParser()
parser.add_argument("--torch", action="store_true")
parser.add_argument("--as-test", action="store_true")
parser.add_argument("--stop-iters", type=int, default=100)
parser.add_argument("--stop-timesteps", type=int, default=100000)
parser.add_argument("--stop-reward", type=float, default=7.99)
class CentralizedValueMixin:
"""Add method to evaluate the central value function from the model."""
def __init__(self):
if self.config["framework"] != "torch":
self.compute_central_vf = make_tf_callable(self.get_session())(
self.model.central_value_function)
else:
self.compute_central_vf = self.model.central_value_function
# Grabs the opponent obs/act and includes it in the experience train_batch,
# and computes GAE using the central vf predictions.
def centralized_critic_postprocessing(policy,
sample_batch,
other_agent_batches=None,
episode=None):
pytorch = policy.config["framework"] == "torch"
if (pytorch and hasattr(policy, "compute_central_vf")) or \
(not pytorch and policy.loss_initialized()):
assert other_agent_batches is not None
# [(_, opponent_batch)] = list(other_agent_batches.values())
# ---> opponent batch now consists of 4 SampleBatches, so I concatenate them
concat_opponent_batch = SampleBatch.concat_samples(
[opponent_n_batch for _, opponent_n_batch in other_agent_batches.values()])
opponent_batch = concat_opponent_batch
# also record the opponent obs and actions in the trajectory
sample_batch[OPPONENT_OBS] = opponent_batch[SampleBatch.CUR_OBS]
sample_batch[OPPONENT_ACTION] = opponent_batch[SampleBatch.ACTIONS]
# overwrite default VF prediction with the central VF
if args.torch:
sample_batch[SampleBatch.VF_PREDS] = policy.compute_central_vf(
convert_to_torch_tensor(
sample_batch[SampleBatch.CUR_OBS], policy.device),
convert_to_torch_tensor(
sample_batch[OPPONENT_OBS], policy.device),
convert_to_torch_tensor(
sample_batch[OPPONENT_ACTION], policy.device)) \
.cpu().detach().numpy()
else:
sample_batch[SampleBatch.VF_PREDS] = policy.compute_central_vf(
sample_batch[SampleBatch.CUR_OBS], sample_batch[OPPONENT_OBS],
sample_batch[OPPONENT_ACTION])
else:
# Policy hasn't been initialized yet, use zeros.
sample_batch[OPPONENT_OBS] = np.zeros_like([np.zeros(obs_dim * (n_pursuers - 1))])
sample_batch[OPPONENT_ACTION] = np.zeros_like([np.zeros(act_dim * (n_pursuers - 1))])
### I think I don't have to change this
sample_batch[SampleBatch.VF_PREDS] = np.zeros_like(sample_batch[SampleBatch.REWARDS], dtype=np.float32)
completed = sample_batch["dones"][-1]
if completed:
last_r = 0.0
else:
last_r = sample_batch[SampleBatch.VF_PREDS][-1]
train_batch = compute_advantages(
sample_batch,
last_r,
policy.config["gamma"],
policy.config["lambda"],
use_gae=policy.config["use_gae"])
return train_batch
# Copied from PPO but optimizing the central value function.
def loss_with_central_critic(policy, model, dist_class, train_batch):
CentralizedValueMixin.__init__(policy)
func = tf_loss if not policy.config["framework"] == "torch" else torch_loss
vf_saved = model.value_function
model.value_function = lambda: policy.model.central_value_function(
train_batch[SampleBatch.CUR_OBS], train_batch[OPPONENT_OBS],
train_batch[OPPONENT_ACTION])
policy._central_value_out = model.value_function()
loss = func(policy, model, dist_class, train_batch)
model.value_function = vf_saved
return loss
def setup_tf_mixins(policy, obs_space, action_space, config):
# Copied from PPOTFPolicy (w/o ValueNetworkMixin).
KLCoeffMixin.__init__(policy, config)
EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
config["entropy_coeff_schedule"])
LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])
def setup_torch_mixins(policy, obs_space, action_space, config):
# Copied from PPOTorchPolicy (w/o ValueNetworkMixin).
TorchKLCoeffMixin.__init__(policy, config)
TorchEntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
config["entropy_coeff_schedule"])
TorchLR.__init__(policy, config["lr"], config["lr_schedule"])
def central_vf_stats(policy, train_batch, grads):
# Report the explained variance of the central value function.
return {
"vf_explained_var": explained_variance(
train_batch[Postprocessing.VALUE_TARGETS],
policy._central_value_out),
}
CCPPOTFPolicy = PPOTFPolicy.with_updates(
name="CCPPOTFPolicy",
postprocess_fn=centralized_critic_postprocessing,
loss_fn=loss_with_central_critic,
before_loss_init=setup_tf_mixins,
grad_stats_fn=central_vf_stats,
mixins=[
LearningRateSchedule, EntropyCoeffSchedule, KLCoeffMixin,
CentralizedValueMixin
])
CCPPOTorchPolicy = PPOTorchPolicy.with_updates(
name="CCPPOTorchPolicy",
postprocess_fn=centralized_critic_postprocessing,
loss_fn=loss_with_central_critic,
before_init=setup_torch_mixins,
mixins=[
TorchLR, TorchEntropyCoeffSchedule, TorchKLCoeffMixin,
CentralizedValueMixin
])
def get_policy_class(config):
if config["framework"] == "torch":
return CCPPOTorchPolicy
CCTrainer = PPOTrainer.with_updates(
name="CCPPOTrainer",
default_policy=CCPPOTFPolicy,
get_policy_class=get_policy_class,
)
if __name__ == "__main__":
ray.init()
args = parser.parse_args()
def env_creator(args):
return PettingZooEnv(waterworld_v3.env(n_pursuers=5, n_evaders=5))
env = env_creator({})
register_env("waterworld", env_creator)
obs_space = env.observation_space
action_space = env.action_space
policies = {agent: (None, obs_space, action_space, {}) for agent in env.agents}
ModelCatalog.register_custom_model(
"cc_model", TorchCentralizedCriticModel
if args.torch else CentralizedCriticModel)
config = {
"env": "waterworld",
"batch_mode": "complete_episodes",
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
"num_workers": 1,
"multiagent": {
"policies": policies,
"policy_mapping_fn": (lambda agent_id: agent_id),
},
"model": {
"custom_model": "cc_model",
},
"framework": "torch" if args.torch else "tf",
}
stop = {
"training_iteration": args.stop_iters,
"timesteps_total": args.stop_timesteps,
"episode_reward_mean": args.stop_reward,
}
results = tune.run(CCTrainer, config=config, stop=stop, verbose=1)
if args.as_test:
check_learning_achieved(results, args.stop_reward)
Full error message:
/home/george/anaconda3/envs/rllib_latest/bin/python /home/george/PycharmProjects/rllib_latest/centralised_critic.py
WARNING:tensorflow:From /home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/tensorflow/python/compat/v2_compat.py:96: disable_resource_variables (from tensorflow.python.ops.variable_scope) is deprecated and will be removed in a future version.
Instructions for updating:
non-resource variables are not supported in the long term
2021-03-06 14:41:09,529 WARNING deprecation.py:33 -- DeprecationWarning: `ray.rllib.env.pettingzoo_env.PettingZooEnv` has been deprecated. Use `ray.rllib.env.wrappers.pettingzoo_env.PettingZooEnv` instead. This will raise an error in the future!
2021-03-06 14:41:10,136 INFO services.py:1172 -- View the Ray dashboard at http://127.0.0.1:8265
== Status ==
Memory usage on this node: 10.3/15.6 GiB
Using FIFO scheduling algorithm.
Resources requested: 2/8 CPUs, 0/0 GPUs, 0.0/4.0 GiB heap, 0.0/1.37 GiB objects
Result logdir: /home/george/ray_results/CCPPOTrainer_2021-03-06_14-41-11
Number of trials: 1/1 (1 RUNNING)
(pid=16757) WARNING:tensorflow:From /home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/tensorflow/python/compat/v2_compat.py:96: disable_resource_variables (from tensorflow.python.ops.variable_scope) is deprecated and will be removed in a future version.
(pid=16757) Instructions for updating:
(pid=16757) non-resource variables are not supported in the long term
(pid=16757) 2021-03-06 14:41:13,801 INFO trainer.py:616 -- Tip: set framework=tfe or the --eager flag to enable TensorFlow eager execution
(pid=16757) 2021-03-06 14:41:13,801 INFO trainer.py:641 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
(pid=16756) WARNING:tensorflow:From /home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/tensorflow/python/compat/v2_compat.py:96: disable_resource_variables (from tensorflow.python.ops.variable_scope) is deprecated and will be removed in a future version.
(pid=16756) Instructions for updating:
(pid=16756) non-resource variables are not supported in the long term
(pid=16756) 2021-03-06 14:41:15,828 WARNING deprecation.py:33 -- DeprecationWarning: `framestack` has been deprecated. Use `num_framestacks (int)` instead. This will raise an error in the future!
(pid=16756) 2021-03-06 14:41:15,965 WARNING deprecation.py:33 -- DeprecationWarning: `TFModelV2.register_variables` has been deprecated. This will raise an error in the future!
2021-03-06 14:41:16,254 ERROR trial_runner.py:616 -- Trial CCPPOTrainer_waterworld_3b5fe_00000: Error processing event.
Traceback (most recent call last):
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/tune/trial_runner.py", line 586, in _process_trial
results = self.trial_executor.fetch_result(trial)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 609, in fetch_result
result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
return func(*args, **kwargs)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/worker.py", line 1456, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::CCPPOTrainer.train_buffered() (pid=16757, ip=192.168.1.6)
File "python/ray/_raylet.pyx", line 439, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 473, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 476, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/agents/trainer_template.py", line 107, in __init__
Trainer.__init__(self, config, env, logger_creator)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/agents/trainer.py", line 486, in __init__
super().__init__(config, logger_creator)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/tune/trainable.py", line 97, in __init__
self.setup(copy.deepcopy(self.config))
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/agents/trainer.py", line 654, in setup
self._init(self.config, self.env_creator)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/agents/trainer_template.py", line 134, in _init
self.workers = self._make_workers(
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/agents/trainer.py", line 725, in _make_workers
return WorkerSet(
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 79, in __init__
remote_spaces = ray.get(self.remote_workers(
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
return func(*args, **kwargs)
ray.exceptions.RayTaskError(ValueError): ray::RolloutWorker.foreach_policy() (pid=16756, ip=192.168.1.6)
File "python/ray/_raylet.pyx", line 439, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 473, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 476, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 480, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 432, in ray._raylet.execute_task.function_executor
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 477, in __init__
self._build_policy_map(policy_dict, policy_config)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1108, in _build_policy_map
policy_map[name] = cls(obs_space, act_space, merged_conf)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/policy/tf_policy_template.py", line 214, in __init__
DynamicTFPolicy.__init__(
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 332, in __init__
self._initialize_loss_from_dummy_batch(
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 557, in _initialize_loss_from_dummy_batch
loss = self._do_loss_init(train_batch)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/rllib/policy/dynamic_tf_policy.py", line 635, in _do_loss_init
loss = self._loss_fn(self, self.model, self.dist_class, train_batch)
File "/home/george/PycharmProjects/rllib_latest/centralised_critic.py", line 253, in loss_with_central_critic
policy._central_value_out = model.value_function()
File "/home/george/PycharmProjects/rllib_latest/centralised_critic.py", line 249, in <lambda>
model.value_function = lambda: policy.model.central_value_function(
File "/home/george/PycharmProjects/rllib_latest/centralised_critic.py", line 103, in central_value_function
self.central_vf([
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/tensorflow/python/keras/engine/base_layer_v1.py", line 760, in __call__
input_spec.assert_input_compatibility(self.input_spec, inputs,
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/tensorflow/python/keras/engine/input_spec.py", line 271, in assert_input_compatibility
raise ValueError('Input ' + str(input_index) +
ValueError: Input 0 is incompatible with layer model_1: expected shape=(None, 162), found shape=(None, 242)
Traceback (most recent call last):
File "/home/george/PycharmProjects/rllib_latest/centralised_critic.py", line 361, in <module>
results = tune.run(CCTrainer, config=config, stop=stop, verbose=1)
File "/home/george/anaconda3/envs/rllib_latest/lib/python3.8/site-packages/ray/tune/tune.py", line 444, in run
raise TuneError("Trials did not complete", incomplete_trials)
ray.tune.error.TuneError: ('Trials did not complete', [CCPPOTrainer_waterworld_3b5fe_00000])
== Status ==
Memory usage on this node: 10.7/15.6 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/8 CPUs, 0/0 GPUs, 0.0/4.0 GiB heap, 0.0/1.37 GiB objects
Result logdir: /home/george/ray_results/CCPPOTrainer_2021-03-06_14-41-11
Number of trials: 1/1 (1 ERROR)
Number of errored trials: 1
+-------------------------------------+--------------+-------------------------------------------------------------------------------------------------------------------------------+
| Trial name | # failures | error file |
|-------------------------------------+--------------+-------------------------------------------------------------------------------------------------------------------------------|
| CCPPOTrainer_waterworld_3b5fe_00000 | 1 | /home/george/ray_results/CCPPOTrainer_2021-03-06_14-41-11/CCPPOTrainer_waterworld_3b5fe_00000_0_2021-03-06_14-41-11/error.txt |
+-------------------------------------+--------------+-------------------------------------------------------------------------------------------------------------------------------+
Process finished with exit code 1