Is sample_batch[obs] the same obs returned for an env step?

I am trying to implement a centralized critic, following the example on github (centralized critic).
What i still can’t understand is the communication of “obs” between the environment and the centralized critic.
If i try to print the rewards in both step() function in environment and rewards in sample batch i get the same result, but i do not get same results between obs from step() function.
Isn’t the “obs” in environment the same as the one in line 104 of centralized critic? If not, who should be writing the obs in line 104 ?

I don’t know this example, but I guess obs in the sample batch is already flattened and perhaps also preprocessed by RLlib default. If so, then the observation in a sample batch is just a 1-dim array.

If your model is of type ModelV2, there is the function restore_original_dimensions which restores your original observation from the flattened one.

If the problem is that you have different values in this sample batch observation (compared to the obs returned by your env), then this should be caused by observation preprocessors called by RLlib.

Maybe my comments already help you, otherwise one of the “offical maintainers” should know more.

1 Like

i would like to check which how my restored obs is, but i am not sure where i should put that restore_original_dimensions method that you reccomended. Could you maybe guide me ? where would you use it in the example linked in my post?

I would try it at the same position where you have accessed and printed out the rewards from SampleBatch, I guess somewhere in this centralized_critic_postprocessing function.

Try this (no guarantee):
restored_obs = restore_original_dimensions(sample_batch[SampleBatch.CUR_OBS], policy.observation_space, policy.config["framework"])

i am trying but getting this:
‘CentralizedCriticModel’ object has no attribute ‘restore_original_dimensions’
Model has “TFModelv2” type, but shouldn’t it inherit the method from ModelV2 ?

Did you try
from ray.rllib.models.modelv2 importrestore_original_dimensions
and then call this function as previously suggested?

yes, i was using that import already.
i don’t know why but i still get that error related to my model (CentralizedCriticModel)

my code:

 from ray.rllib.models.modelv2 import restore_original_dimensions
obs = restore_original_dimensions(sample_batch[OPPONENT_OBS], DFaaSEnv.obs_space,policy.config["framework"])

that’s inside centralized_critic_postprocessing.
(DFaaSEnv it’s my env)

Hi @Luca_Pretini,

restore_original_dimensions is not a member of the ModelV2 class it is a function defined in the same module as ModelV2.

Do you have a reproduction script and error trace back?

hi @mannyv , seems i solved my issue with restore_original_dimensions function.
Still, i think i do not fully understand how OBS get processed between environment and centralized_critic_postprocessing.

I am uploading a slightly modified version of centralized_critic.py, i am still using twostepgame as env. On env i only added two lines to print out the obs getting outside step() function ( lines 316/317).

In my centralized_critic_postprocessing() i added a few prints ( lines 125-132) , my goal is trying to understand how those obs are correlated to the ones coming out from the env.

Here’s an example of my output:

obs in twostep
{0: 1, 1: 4}
obs in twostep
{0: 1, 1: 4}
sample_batch[SampleBatch.CUR_OBS]
[[1. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0.]]
restored obs version
[[1. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0.]]
My opponent_batch[SampleBatch.CUR_OBS]
[[0. 0. 0. 1. 0. 0.]
[0. 0. 0. 0. 1. 0.]]
restored opponent version
[[1. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0.]]
 sample_batch[SampleBatch.CUR_OBS]
[[0. 0. 0. 1. 0. 0.]
[0. 0. 0. 0. 1. 0.]]
 restored obs version
 [[0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 1. 0.]]
 My opponent_batch[SampleBatch.CUR_OBS]
 [[1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0.]]
 restored opponent version
 [[0. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 1. 0.]]

I see some kind of matching in the results, in every line one of the two obs is matching either the first or second observation, but the other one seems “random” to me. let’s take this:

sample_batch[SampleBatch.CUR_OBS]
[[1. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0.]]

second observation ([0. 1. 0. 0. 0. 0.]) is 1, which is matching with my first obs, but what about [[1. 0. 0. 0. 0. 0.], where is that coming from ?

Hope i explained myself decently, and also please excuse me about the print/debug method :grinning:

thanks in advance!

Attaching all my file here:

import argparse
import numpy as np
from gym.spaces import Discrete
import os

import ray
from ray import tune
from ray.rllib.agents.maml.maml_torch_policy import KLCoeffMixin as \
    TorchKLCoeffMixin
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy, KLCoeffMixin, \
    ppo_surrogate_loss as tf_loss
from ray.rllib.agents.ppo.ppo_torch_policy import PPOTorchPolicy
from ray.rllib.evaluation.postprocessing import compute_advantages, \
    Postprocessing
from ray.rllib.examples.env.two_step_game import TwoStepGame
from ray.rllib.examples.models.centralized_critic_models import \
    CentralizedCriticModel, TorchCentralizedCriticModel
from ray.rllib.models import ModelCatalog
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy import LearningRateSchedule, \
    EntropyCoeffSchedule
from ray.rllib.policy.torch_policy import LearningRateSchedule as TorchLR, \
    EntropyCoeffSchedule as TorchEntropyCoeffSchedule
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.rllib.utils.tf_ops import explained_variance, make_tf_callable
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from gym.spaces.multi_discrete import MultiDiscrete
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.models.modelv2 import ModelV2

import random
import gym

tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()

OPPONENT_OBS = "opponent_obs"
OPPONENT_ACTION = "opponent_action"

parser = argparse.ArgumentParser()
parser.add_argument(
    "--framework",
    choices=["tf", "tf2", "tfe", "torch"],
    default="tf",
    help="The DL framework specifier.")
parser.add_argument(
    "--as-test",
    action="store_true",
    help="Whether this script should be run as a test: --stop-reward must "
    "be achieved within --stop-timesteps AND --stop-iters.")
parser.add_argument(
    "--stop-iters",
    type=int,
    default=100,
    help="Number of iterations to train.")
parser.add_argument(
    "--stop-timesteps",
    type=int,
    default=100000,
    help="Number of timesteps to train.")
parser.add_argument(
    "--stop-reward",
    type=float,
    default=7.99,
    help="Reward at which we stop training.")


class CentralizedValueMixin:
    """Add method to evaluate the central value function from the model."""

    def __init__(self):
        if self.config["framework"] != "torch":
            self.compute_central_vf = make_tf_callable(self.get_session())(
                self.model.central_value_function)
        else:
            self.compute_central_vf = self.model.central_value_function


# Grabs the opponent obs/act and includes it in the experience train_batch,
# and computes GAE using the central vf predictions.
def centralized_critic_postprocessing(policy,
                                      sample_batch,
                                      other_agent_batches=None,
                                      episode=None):

   


    pytorch = policy.config["framework"] == "torch"
    if (pytorch and hasattr(policy, "compute_central_vf")) or \
            (not pytorch and policy.loss_initialized()):
        assert other_agent_batches is not None
        [(_, opponent_batch)] = list(other_agent_batches.values())

        
                    # also record the opponent obs and actions in the trajectory
        sample_batch[OPPONENT_OBS] = opponent_batch[SampleBatch.CUR_OBS]
        sample_batch[OPPONENT_ACTION] = opponent_batch[SampleBatch.ACTIONS]


        from ray.rllib.models.modelv2 import restore_original_dimensions
        restored_obs = restore_original_dimensions(sample_batch[SampleBatch.CUR_OBS], Discrete(6),policy.config["framework"])   
        restored_oppobs = restore_original_dimensions(sample_batch[SampleBatch.CUR_OBS], Discrete(6),policy.config["framework"])   


        print("sample_batch[SampleBatch.CUR_OBS]")
        print(sample_batch[SampleBatch.CUR_OBS])
        print("restored obs version ")
        print(restored_obs)
        print("My opponent_batch[SampleBatch.CUR_OBS]")
        print(opponent_batch[SampleBatch.CUR_OBS])
        print("restored opponent version ")
        print(restored_oppobs)


        sample_batch[SampleBatch.VF_PREDS] = policy.compute_central_vf(
                sample_batch[SampleBatch.CUR_OBS], sample_batch[OPPONENT_OBS],
                sample_batch[OPPONENT_ACTION])
    else:
        # Policy hasn't been initialized yet, use zeros.
        sample_batch[OPPONENT_OBS] = np.zeros_like(
            sample_batch[SampleBatch.CUR_OBS])
        sample_batch[OPPONENT_ACTION] = np.zeros_like(
            sample_batch[SampleBatch.ACTIONS])
        sample_batch[SampleBatch.VF_PREDS] = np.zeros_like(
            sample_batch[SampleBatch.REWARDS], dtype=np.float32)

    completed = sample_batch["dones"][-1]
    if completed:
        last_r = 0.0
    else:
        last_r = sample_batch[SampleBatch.VF_PREDS][-1]

    train_batch = compute_advantages(
        sample_batch,
        last_r,
        policy.config["gamma"],
        policy.config["lambda"],
        use_gae=policy.config["use_gae"])
    return train_batch


# Copied from PPO but optimizing the central value function.
def loss_with_central_critic(policy, model, dist_class, train_batch):
    CentralizedValueMixin.__init__(policy)
    func = tf_loss if not policy.config["framework"] == "torch" \
        else PPOTorchPolicy.loss

    vf_saved = model.value_function
    model.value_function = lambda: policy.model.central_value_function(
        train_batch[SampleBatch.CUR_OBS], train_batch[OPPONENT_OBS],
        train_batch[OPPONENT_ACTION])

    policy._central_value_out = model.value_function()
    loss = func(policy, model, dist_class, train_batch)

    model.value_function = vf_saved

    return loss


def setup_tf_mixins(policy, obs_space, action_space, config):
    # Copied from PPOTFPolicy (w/o ValueNetworkMixin).
    KLCoeffMixin.__init__(policy, config)
    EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
                                  config["entropy_coeff_schedule"])
    LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])


def setup_torch_mixins(policy, obs_space, action_space, config):
    # Copied from PPOTorchPolicy  (w/o ValueNetworkMixin).
    TorchKLCoeffMixin.__init__(policy, config)
    TorchEntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
                                       config["entropy_coeff_schedule"])
    TorchLR.__init__(policy, config["lr"], config["lr_schedule"])


def central_vf_stats(policy, train_batch, grads):
    # Report the explained variance of the central value function.
    return {
        "vf_explained_var": explained_variance(
            train_batch[Postprocessing.VALUE_TARGETS],
            policy._central_value_out)
    }


CCPPOTFPolicy = PPOTFPolicy.with_updates(
    name="CCPPOTFPolicy",
    postprocess_fn=centralized_critic_postprocessing,
    loss_fn=loss_with_central_critic,
    before_loss_init=setup_tf_mixins,
    grad_stats_fn=central_vf_stats,
    mixins=[
        LearningRateSchedule, EntropyCoeffSchedule, KLCoeffMixin,
        CentralizedValueMixin
    ])



#class CCTrainer(PPOTrainer):
#    def get_default_policy_class(self, config):
#            return CCPPOTFPolicy


def get_policy_class(config):
    if config["framework"] == "tf":
        return CCPPOTFPolicy

CCTrainer = PPOTrainer.with_updates(
    name="CCPPOTrainer",
    default_policy=CCPPOTFPolicy,
    get_policy_class=get_policy_class,
)



from gym.spaces import MultiDiscrete, Dict, Discrete
import numpy as np

from ray.rllib.env.multi_agent_env import MultiAgentEnv, ENV_STATE


class MyTwoStepGame(MultiAgentEnv):
    action_space = Discrete(2)

    def __init__(self, env_config):
        self.state = None
        self.agent_1 = 0
        self.agent_2 = 1
        # MADDPG emits action logits instead of actual discrete actions
        self.actions_are_logits = env_config.get("actions_are_logits", False)
        self.one_hot_state_encoding = env_config.get("one_hot_state_encoding",
                                                     False)
        self.with_state = env_config.get("separate_state_space", False)

        if not self.one_hot_state_encoding:
            self.observation_space = Discrete(6)
            self.with_state = False
        else:
            # Each agent gets the full state (one-hot encoding of which of the
            # three states are active) as input with the receiving agent's
            # ID (1 or 2) concatenated onto the end.
            if self.with_state:
                self.observation_space = Dict({
                    "obs": MultiDiscrete([2, 2, 2, 3]),
                    ENV_STATE: MultiDiscrete([2, 2, 2])
                })
            else:
                self.observation_space = MultiDiscrete([2, 2, 2, 3])

    def seed(self, seed=None):
        if seed:
            np.random.seed(seed)

    def reset(self):
        self.state = np.array([1, 0, 0])
        return self._obs()

    def step(self, action_dict):
        if self.actions_are_logits:
            action_dict = {
                k: np.random.choice([0, 1], p=v)
                for k, v in action_dict.items()
            }

        state_index = np.flatnonzero(self.state)
        if state_index == 0:
            action = action_dict[self.agent_1]
            assert action in [0, 1], action
            if action == 0:
                self.state = np.array([0, 1, 0])
            else:
                self.state = np.array([0, 0, 1])
            global_rew = 0
            done = False
        elif state_index == 1:
            global_rew = 7
            done = True
        else:
            if action_dict[self.agent_1] == 0 and action_dict[self.
                                                              agent_2] == 0:
                global_rew = 0
            elif action_dict[self.agent_1] == 1 and action_dict[self.
                                                                agent_2] == 1:
                global_rew = 8
            else:
                global_rew = 1
            done = True

        rewards = {
            self.agent_1: global_rew / 2.0,
            self.agent_2: global_rew / 2.0
        }
        obs = self._obs()
        dones = {"__all__": done}
        infos = {}
        print("obs in twostep")
        print(obs)
        return obs, rewards, dones, infos

    def _obs(self):
        if self.with_state:
            return {
                self.agent_1: {
                    "obs": self.agent_1_obs(),
                    ENV_STATE: self.state
                },
                self.agent_2: {
                    "obs": self.agent_2_obs(),
                    ENV_STATE: self.state
                }
            }
        else:
            return {
                self.agent_1: self.agent_1_obs(),
                self.agent_2: self.agent_2_obs()
            }

    def agent_1_obs(self):
        if self.one_hot_state_encoding:
            return np.concatenate([self.state, [1]])
        else:
            return np.flatnonzero(self.state)[0]

    def agent_2_obs(self):
        if self.one_hot_state_encoding:
            return np.concatenate([self.state, [2]])
        else:
            return np.flatnonzero(self.state)[0] + 3



if __name__ == "__main__":
    ray.init()
    args = parser.parse_args()

    ModelCatalog.register_custom_model(
        "cc_model", CentralizedCriticModel)

    config = {
        "env": MyTwoStepGame,
        "batch_mode": "complete_episodes",
        # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
        "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        "num_workers": 0,
        "multiagent": {
            "policies": {
                "pol1": (None, Discrete(6), TwoStepGame.action_space, {
                    "framework": args.framework,
                }),
                "pol2": (None, Discrete(6), TwoStepGame.action_space, {
                    "framework": args.framework,
                }),
            },
            "policy_mapping_fn": (
                lambda aid, **kwargs: "pol1" if aid == 0 else "pol2"),
        },
        "model": {
            "custom_model": "cc_model",
        },
        "framework": args.framework,
    }

    stop = {
        "training_iteration": args.stop_iters,
        "timesteps_total": args.stop_timesteps,
        "episode_reward_mean": args.stop_reward,
    }

    results = tune.run(CCTrainer, config=config, stop=stop, verbose=1)

    if args.as_test:
        check_learning_achieved(results, args.stop_reward)

Hey @Luca_Pretini,

you are restoring both times the same batch obs!!! I guess it should be

restored_oppobs = restore_original_dimensions(sample_batch[OPPONENT_OBS], Discrete(6),policy.config["framework"])

resp.

restored_oppobs = restore_original_dimensions(opponent_batch[SampleBatch.CUR_OBS], Discrete(6),policy.config["framework"])

Please take a look at this docs to understand RLlib’s default behaviors :v:

Btw: I’m sorry, restore_original_dimensions is redundant in this case since your obs space is just a Discrete space and not a Tuple or Dict space.

yes, i actually was restoring the same obs two times, my bad.
But still, as you said, restoring is reduntant in my case.

Do you happen to know what might be in my obs then ?

I see some kind of matching in the results, in every line one of the two obs is matching either the first or second observation, but the other one seems “random” to me. let’s take this:

sample_batch[SampleBatch.CUR_OBS]
[[1. 0. 0. 0. 0. 0.]
[0. 1. 0. 0. 0. 0.]]

second observation ( [0. 1. 0. 0. 0. 0.] ) is 1, which is matching with my first obs, but what about [[1. 0. 0. 0. 0. 0.] , where is that coming from ?

I am talking about this part of my last question. In case you know, it would be very helpful.

Thanks for your time :grinning:

If you print out obs in the env, then you only see the current/recent obs, e.g. {0: 1, 1: 4}.
In contrast, in this postprocessing function you look at a sample BATCH of a (whole) trajectory which here only contains data of two steps (two step game ends after two steps :wink:). Thus, the sample batchs contain the initial obs {0: 0, 1: 3} and then {0: 1, 1: 4} resp. one-hot-encoded [1 0 0 0 0 0], [0 0 0 1 0 0] and [0 1 0 0 0 0], [0 0 0 0 1 0].

Columns of the sample batch (for agent_1):

step t	|			obs			|		new_obs			|	done
0		|	[1. 0. 0. 0. 0. 0.]	|	[0. 1. 0. 0. 0. 0.]	|	False
1		|	[0. 1. 0. 0. 0. 0.]	|	[0. 1. 0. 0. 0. 0.]	|	True

Columns of the sample batch (for agent_2):

step t	|			obs			|		new_obs			|	done
0		|	[0. 0. 0. 1. 0. 0.]	|	[0. 0. 0. 0. 1. 0.]	|	False
1		|	[0. 0. 0. 0. 1. 0.]	|	[0. 0. 0. 0. 1. 0.]	|	True
1 Like

I am not fully understanding this.
on your sample batch examples:
what is in “obs” ( column on the left) is clear, i have the agent obs in the corrisponding step.
But what is the criteria behind what you have put on “new_obs” ( column on the right) ?

new_obs resp. next_obs is the consecutive obs. You collect experience data of the form
obs s_t, action a_t, reward r_t, done d_t, new/next obs s_t+1 and so on, where new/next obs s_t+1 is the obs returned from step()-function.
You start with an initial obs at 0, do a step and get a next new obs at 1. Then the obs at 1 is your current obs and you step again and get a next new obs at 2. And so on and so on…