`training_step()` fails with custom environment

High: It blocks me to complete my task.
Ray 2.5.0
Gymnasium 0.26.3

Background
I am using a custom environment and a custom action masking model (based on the Ray example). The model appears to be unrelated to the error I am experiencing, but I’m including it because the custom environment inherently uses action masking in the observation space, which does appear to be the source of the error.

Problem
When I run ppo.training_step() I with RandomEnv that has the same observation space as the custom environment, it runs normally.
However, when I run ppo.training_step() with the custom environment, it errors in the preprocessors.py module with an array broadcast shape error. I’ve tried disabling preprocessors by setting `config[“param_space”][“model”][“_disable_preprocessor_api”] = True" but that doesn’t resolve/change the error.

The environment builds correctly (responds normally to getting observations and inputting actions).

Here is pseudo-code of a nearly MWE. Fully functional script on GitHub here.

# %% Imports
# Third Party Imports
from gymnasium.spaces import Box, Dict, MultiDiscrete
from ray.air import RunConfig
from ray.rllib.algorithms import ppo
from ray.rllib.examples.env.random_env import RandomEnv
from ray.rllib.models import ModelCatalog
from ray.tune.registry import register_env

# Custom Imports
from utilities import loadJSONFile
from action_mask_model import MyActionMaskModel
from build_tuner import buildEnv

# %% Load config
# config has parameters to build algo
config = loadJSONFile("issues/iss7/iss7_config.json")

# %% Register env and model
register_env("my_env", buildEnv)
ModelCatalog.register_custom_model("action_mask_model", MyActionMaskModel)

# Disable preprocessor
# NOTE: This does not have any effect
# config["param_space"]["model"]["_disable_preprocessor_api"] = True

# %% Random Env -- this works
env_random = RandomEnv(
    {
        "observation_space": Dict(
            {
                "observations": Box(0, 1, shape=(32,), dtype=float),
                "action_mask": Box(0, 1, shape=(10,), dtype=int),
            }
        ),
        "action_space": MultiDiscrete([10]),
    }
)

algo_config_rand = (
    ppo.PPOConfig()
    .training(
        model={**config["param_space"]["model"]},
    )
    .environment(
        env=RandomEnv,
        env_config={
            "observation_space": env_random.observation_space,
            "action_space": env_random.action_space,
        },
    )
    .framework("torch")
)

algo_random = algo_config_rand.build()
results = algo_random.training_step()
print(f"random env results : \n{results}")
# random env results : 
# {'default_policy': {'learner_stats': {'allreduce_latency': 0.0, 'grad_gnorm': 0.8361867665763825, 'cur_kl_coeff': 0.20000000000000004, 'cur_lr': 5.0000000000000016e-05, 'total_loss': 1.9762821364146408, 'policy_loss': -0.009080082304795744, 'vf_loss': 1.9849226349143572, 'vf_explained_var': 0.00012015822113201183, 'kl': 0.0021979112644896492, 'entropy': 2.3002702866831135, 'entropy_coeff': 0.0}, 'model': {}, 'custom_metrics': {}, 'num_agent_steps_trained': 128.0, 'num_grad_updates_lifetime': 465.5, 'diff_num_grad_updates_vs_sampler_policy': 464.5}}

# %% Custom Env -- this section DOES NOT work
algo_config_customenv = (
    ppo.PPOConfig()
    .training(model={**config["param_space"]["model"]})
    .environment(
        env="my_env",
        env_config=config["param_space"]["env_config"],
    )
    .framework("torch")
)
algo_customenv = algo_config_customenv.build()

try:
    results = algo_customenv.training_step()
    print(f"custom env results : \n{results}")
except Exception as e:
    print(e)
# prints a long error message that ends in:
#   File ".../ray/rllib/models/preprocessors.py", line 211, in write
#    array[offset : offset + self._size] = np.array(observation, #copy=False).ravel()
# ValueError: could not broadcast input array from shape (32,) into shape (10,)

Here is the method that the error message points to:

@PublicAPI
class NoPreprocessor(Preprocessor):
...
    @override(Preprocessor)
    def write(self, observation: TensorType, array: np.ndarray, offset: int) -> None:
        array[offset : offset + self._size] = np.array(observation, copy=False).ravel()

I’ve tried setting break points in write(), but Ray’s error handling appears to be circumventing the break points so I am unable to actually check if the array sizes are correct. If anyone can tell me how to allow breakpoints to work correctly within the NoPreprocessor class I would appreciate it.

If anyone can offer any guidance I would very much appreciate it, as this error is in my critical path.

Update: I resolved the issue, posting here for anyone that may come across the same problem. It turns out that environment observation space (a dict) did not match the order of the observation generated from env.step().

The base environment observation space was a Dict with a bunch of entries, but it had an action mask wrapper that made the observation space a Dict with two entries (“action_mask” and “observations”). The order of the entries had “observations” first in both __init__() and observation(). For an unknown reason, the environment wrapped with this wrapper returned (via step()) observations with “observations” first, as opposed to “action_mask”.

I swapped the order to put “action_mask” first and that fixed the issue.

Original:

#wrapper
from numpy import ones 
import gymnasium as gym
from gymnasium.spaces.utils import flatten

class ActionMask(gym.ObservationWrapper):
"""Put flattened obs into "observations".
Copy obs["mask"] to "action_mask.
""""
    def __init__(
        self,
        env: gym.Env,
    ):
        super().__init__(env)
        self.mask_space = flatten_space(self.action_space)
        self.observation_space = gym.spaces.Dict(
            {
                "observations": env.observation_space,
                "action_mask": self.mask_space,
            }
        )
    def observation(self, obs: OrderedDict) -> OrderedDict:
        mask = obs["mask"]
        mask_flat = gym.spaces.flatten(self.mask_space, mask.transpose())

        obs_new = OrderedDict(
            {
                "observations": obs,
                "action_mask": mask_flat,
            }
        )

        return obs_new

Fixed:

#wrapper
from numpy import ones 
import gymnasium as gym
from gymnasium.spaces.utils import flatten

class ActionMask(gym.ObservationWrapper):
    def __init__(
        self,
        env: gym.Env,
    ):
        super().__init__(env)
        self.mask_space = flatten_space(self.action_space)
        self.observation_space = gym.spaces.Dict(
            {
                "action_mask": self.mask_space,  # swapped order
                "observations": env.observation_space,   # swapped order
            }
        )
    def observation(self, obs: OrderedDict) -> OrderedDict:
        mask = obs["mask"]
        mask_flat = gym.spaces.flatten(self.mask_space, mask.transpose())

        obs_new = OrderedDict(
            {
                "action_mask": mask_flat,   # swapped order
                "observations": obs,  # swapped order
            }
        )

        return obs_new

Script:

env = buildEnv(env_config)
check_env(env)
env.reset()
[obs, _, _, _, _] = env.step(env.action_space.sample())
print(f"\nobs space = {env.observation_space}")
print(f"obs = {obs}")

# For original mask, prints:
# obs space = Dict('action_mask': Box(0, 1, (10,), int64), 'observations': Box(0, 1, (8,), int64))
# obs = OrderedDict([('observations', array([0, 0, 0, 0, 0, 0, 0, 0])), ('action_mask', array([0, 0, 0, 0, 1, 0, 0, 0, 0, 1]))])

# For fixed mask, prints:
# obs space = Dict('action_mask': Box(0, 1, (10,), int64), 'observations': Box(0, 1, (8,), int64))
# obs = OrderedDict([('action_mask', array([0, 0, 0, 0, 1, 0, 0, 0, 0, 1])), ('observations', array([0, 0, 0, 0, 0, 0, 0, 0]))])

If anyone has any ideas on the root of the original problem (why dis the original wrapper put “observations” ahead of “action_mask” in the returned OrderedDict?) or any suggestions for best practices to prevent this type of error from going unnoticed as it did for me, I’d love to hear them.