Preprocessor error on batches of observations

  • High: It blocks me to complete my task.

I am using a custom environment, with PPO. When I run compute_single_action(), the algo performs normally. However, when I tun algo.train(), I get an an array shape mismatch error in the preprocessor.

from ray.tune.registry import register_env
from ray.rllib.algorithms import ppo

register_env("my_env", buildEnv)
test_env = buildEnv(env_config=config["param_space"]["env_config"])
print(f"observations space = {test_env.observation_space}")
print(f"action space = {test_env.action_space}")
# prints:
# observations space = Dict(state_a:Box(0, 9223372036854775805, (1, 4), int64), state_b:Box(0, 1, (4, 2), int64))
# action space = MultiDiscrete([5 5])

# Set algo config, then build algo
algo_config = (
    ppo.PPOConfig()
    .training()
    .environment(
        env="my_env",
        env_config=config["param_space"]["env_config"],
    )
    .framework("torch")
)

algo = algo_config.build()

# check single observation/action
raw_ob = test_env.observation_space.sample()
algo_action = algo.compute_single_action(raw_ob)
print(f"algo_action = {algo_action}")
# prints:
# algo_action = [1 0] # <- works

result = algo.train()  # <- errors

Relevant excerpt from error message (this is printed twice because there are two workers, so I’m omitting the second print):

2023-01-30 14:29:55,334 ERROR actor_manager.py:486 -- Ray error, taking actor 1 out of service. ray::RolloutWorker.apply() (pid=15510, ip=172.19.5.122, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7fa378d72d90>)
  File ".../ray/rllib/utils/actor_manager.py", line 183, in apply
    raise e
  File ".../ray/rllib/utils/actor_manager.py", line 174, in apply
    return func(self, *args, **kwargs)
  File ".../ray/rllib/execution/rollout_ops.py", line 86, in <lambda>
    lambda w: w.sample(), local_worker=False, healthy_only=True
  File ".../ray/rllib/evaluation/rollout_worker.py", line 949, in sample
    batches = [self.input_reader.next()]
  File ".../ray/rllib/evaluation/sampler.py", line 92, in next
    batches = [self.get_data()]
  File ".../ray/rllib/evaluation/sampler.py", line 285, in get_data
    item = next(self._env_runner)
  File ".../ray/rllib/evaluation/sampler.py", line 671, in _env_runner
    active_envs, to_eval, outputs = _process_observations(
  File ".../ray/rllib/evaluation/sampler.py", line 922, in _process_observations
    prep_obs = preprocessor.transform(raw_obs)
  File ".../ray/rllib/models/preprocessors.py", line 287, in transform
    self.write(observation, array, 0)
  File ".../ray/rllib/models/preprocessors.py", line 299, in write
    p.write(o, array, offset)
  File ".../ray/rllib/models/preprocessors.py", line 211, in write
    array[offset : offset + self._size] = np.array(observation, copy=False).ravel()
ValueError: could not broadcast input array from shape (8,) into shape (4,)

My intuition says that the error has something to-do with my observation and/or action space configuration, but the error message doesn’t jive with the dimensions of my spaces. The flattened observation_space and action_space are 12- and 10-long, respectively; given this, I’m not sure where the array shapes (8, ) and (4,) are coming in to play. Most confusing though is that the compute_single_action() works correctly, so there is something specific to train() (I’m guessing batches of data?) that is causing the error.

If anyone has any ideas on the source of the problem, I’d much appreciate them.

Slight update: disabling the preprocessor still results in the same error. However, there appears to be conflicting attributes in algo when checking if the preprocessor is disabled (algo.config vs algo.evaluation_config).

config["param_space"]["model"]["_disable_preprocessor_api"] = True

algo_config = (
    ppo.PPOConfig()
    .training(
        model={**config["param_space"]["model"]},
    )
    .environment(
        env="my_env",
        env_config=config["param_space"]["env_config"],
    )
    .framework("torch")
)

algo = algo_config.build()

print(algo.config.model["_disable_preprocessor_api"])
# prints True
print(algo.evaluation_config._disable_preprocessor_api)
# prints False

I’m not sure if the conflict in _disablepreprocessor_api values is relevant. In any case, the error occurs in the same class of preprocessors, NoPreprocessor. It appears that, regardless of what the algo config is set to, Ray assigns NoPreprocessor to the algo when during the build call. This seems like strange behavior, given that my environment has a Dict observation space and MultiDiscrete action space, and that there are other preprocessors that seem like they might be more appropriate (e.g. DictFlatteningPreprocessor).

Another update: Based on other posts/issues (link1, link2, link3, and link4), I think the problem I’m having has something to-do with my environment’s observation space being a dict, with one item being “observations” and the other being “action_mask”.

Based on some of the comments at the above links, here’s what I’ve tried so far, none of which work:

  1. (baseline) The bare observation space is:
observation_space = Dict({
    "observations": Dict({
        "ob_a": Box(..., dtype=int), # length = 24
        "ob_b": Box(..., dtype=float) # length= 8
    })
    "action_mask": Box(..., dtype=int) # length =10
})
  1. Flatten “observations” so the the observation space looks like:
observation_space = Dict({
    "observations": Box(..., dtype=float), # length = 32
    "action_mask": Box(..., dtype=int)})
  1. Made all entries in “observations” and “action_mask” floats (vice “observations” being “floats” and action_mask being ints)
observation_space = Dict({
    "observations": Box(..., dtype=float),
    "action_mask": Box(..., dtype=float)})
  1. Using a custom model that is a modified version of the Ray’s action mask example. I’ve tried using this model with both #1 and #2 options above.
"""Model with action masking."""
# %% Imports
from __future__ import annotations

# Standard Library Imports
from typing import Any

# Third Party Imports
import torch
import torch.nn as nn
from gym.spaces import Dict, Space
from numpy import stack
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.utils.torch_utils import FLOAT_MIN
from torch import Tensor, tensor


# %% Class


class MyActionMaskModel(TorchModelV2, nn.Module):
    """Model that handles simple discrete action masking.

    Include all regular model configuration parameters (fcnet_hiddens,
    fcnet_activation, etc.) in model_config.

    Custom model parameters are set in model_config["custom_model_config"]. The
    only custom model parameter is "no_masking" (default False).
    """

    def __init__(
        self,
        obs_space: Space,
        action_space: Space,
        num_outputs: int,
        model_config: dict,
        name: str,
        **kwargs,
    ):
        """Initialize action masking model.

        Args:
            obs_space (`Space`): A gym space.
            action_space (`Space`): A gym space.
            num_outputs (`int`): Number of outputs of neural net. Should be the
                size of the flattened action space.
            model_config (`dict`): Model configuration. Required inputs are:
                {
                    "fcnet_hiddens" (`list[int]`): Fully connected hidden layers.
                }
            name (`str`): Name of model.

        To disable action masking, set:
            model_config["custom_model_config"]["no_masking] = True.
        """
        # Check that the observation space is a dict that contains "action_mask"
        # and "observations" as keys.
        orig_space = getattr(obs_space, "original_space", obs_space)
        assert isinstance(orig_space, Dict)
        assert "action_mask" in orig_space.spaces
        assert "observations" in orig_space.spaces

        # Boilerplate Torch stuff.
        TorchModelV2.__init__(
            self,
            obs_space,
            action_space,
            num_outputs,
            model_config,
            name,
            **kwargs,
        )
        nn.Module.__init__(self)

        # Build feed-forward layers
        self.internal_model = TorchFC(
            orig_space["observations"],
            action_space,
            num_outputs,
            model_config,
            name + "_internal",
        )

        last_layer_size = model_config["fcnet_hiddens"][-1]
        self.action_head = nn.Linear(last_layer_size, num_outputs)
        self.value_head = nn.Linear(last_layer_size, 1)

        # disable action masking --> will likely lead to invalid actions
        custom_config = model_config.get("custom_model_config", {})
        self.no_masking = False
        if "no_masking" in custom_config:
            self.no_masking = custom_config["no_masking"]

    def forward(
        self,
        input_dict: dict[dict],
        state: Any,
        seq_lens: Any,
    ) -> [Tensor, Any]:
        """Forward propagate observations through the model.

        Takes a `dict` as an argument with the only key being "obs", which is either
        a sample from the observation space or a list of samples from the observation
        space.

        Can input either a single observation or multiple observations. If using
        a single observation, the input is a dict[dict[dict]]]. If using
        multiple observations, the input is a dict[dict[list_of_dicts]].

        Args:
            input_dict (`dict`[`dict`]):
                {
                    "obs": {
                        "action_mask": `Tensor`,
                        "observations": `Tensor`,
                    }
                }
                or
                {
                    "obs": list[
                        {
                        "action_mask": `Tensor`,
                        "observations": `Tensor`
                        },
                        ...]
                }
            state (`Any`): _description_
            seq_lens (`Any`): _description_

        Returns:
            logits (`Tensor`): Logits in shape of (num_outputs, ).
            state (`Any`): _description_
        """
        # Extract the action mask and observations from the input dict and convert
        # to tensor, if necessary. Stack action masks and observations into larger
        # tensor if multiple obs are passed in. The action mask and observation
        # are different sizes depending on if multiple or single observations are
        # passed in. Convert tensors to floats if not already to input to torch
        # Linear layers
        # (https://pytorch.org/docs/stable/generated/torch.nn.Linear.html#torch.nn.Linear).
        if type(input_dict["obs"]) is list:
            # For multiple observations
            # action_mask is a [num_observations, len_mask] tensor
            # observation is a [num_observations, len_obs] tensor
            array_of_masks = stack(
                [a["action_mask"] for a in input_dict["obs"]], axis=0
            )
            action_mask = tensor(array_of_masks)
            array_of_obs = stack(
                [a["observations"] for a in input_dict["obs"]], axis=0
            )
            observation = tensor(array_of_obs).float()
        else:
            action_mask = input_dict["obs"]["action_mask"]
            observation = input_dict["obs"]["observations"].float()

        # Compute the unmasked logits.
        self.internal_model._features = (
            self.internal_model._hidden_layers.forward(observation)
        )
        # print(
        #     f"internal_model._features.size() = {self.internal_model._features.size()}"
        # )
        logits = self.action_head(self.internal_model._features)

        # If action masking is disabled, skip masking and return unmasked logits.
        # Otherwise, step into masking block.
        if self.no_masking is False:
            # Convert action_mask into a [0.0 || -inf]-type mask.
            inf_mask = torch.clamp(torch.log(action_mask), min=FLOAT_MIN)
            # print(f"logits.size() = {logits.size()}")
            # print(f"inf_mask.size() = {inf_mask.size()}")
            masked_logits = logits + inf_mask
            logits = masked_logits

        return logits, state

    def value_function(self) -> Tensor:
        """Get current value of value function.

        Returns:
            `Tensor[torch.float32]`: Value function value.
        """
        # get features and squeeze extra dimensions out.
        y = self.value_head(self.internal_model._features)
        y = y.squeeze(-1)
        return y
  1. Fully flattening the observation space, just to make sure the environment and constructor are not broken. This works, but I lose the ability to do action masking, which is critical to the environment. So this test was more of a sanity check than a reasonable path forward. Note that this test did not include the action masking model above, because that model relies on interfacing with a dict observation space.

The fact that I get the same errors regardless of the using the action masking model makes me think the error is solely in the interface of the environment.

Notably, the error is different for using the bare environment vice the others. For case #0, the error is “ValueError: could not broadcast input array from shape (8,) into shape (4,)”
But for the other cases, the error is ValueError: could not broadcast input array from shape (32,) into shape (10,).

Hi @dylan906,

It is hard for us to help without seeing the majority of the logic. If you can share that we could have a better idea of what the issue might be.

Is there a reason you are not just using the FCNet as is rather than only using the _hidden_layers and then adding your own logits layer on top?

The way I would approach this is in two parts.

  1. Verify the environment is meeting rllibs expectations.
    For this I would use one of the built in models with the custom environment. This will obviously not have the action masking but if it ran cleanly you would know your environment is running as expected.

  2. I would use the custom model with rllibs randomenv. This environment allows you to specify an observation and action space and then generates random values that match the space you provided. Using this environment would allow you to determine if there are issues with the custom model.

@mannyv,

Thanks for the reply and recommendations. I had already been doing #2 (testing my custom model with RandomEnv), but I was not doing a good job of #1 (testing custom environment against rllib model). When I made a more thorough test of my custom environment using a default rllib model, I discovered that I was not wrapping my environment appropriately. I wasn’t able to figure out the exact source of the problem, but when I flattened the “observations” value of my obs space, the environment worked with the model correctly. I had originally done a sloppy job in case #1; I had thought that my wrapper was flattening “observations”, but evidently it was not. Thanks for the advice!

In regards to your question about my action mask class, I am using internal_model._hidden_layers.forward(observation) vice internal_model({"obs": observation}) because 1) _hidden_layers is a PyTorch Sequential, which I am more familiar with, and 2) using the later command gives me dimensional errors which were not immediately easy to resolve. I’m sure if I spent more time on those errors that I could resolve them and use the more appropriate command, but I don’t see a reason why simply using _hidden_layers.forward() won’t work. Having said that, if you know of a reason why internal_model() would be better, I’d love to hear it.