Compute_single_action randomly errors without changing input

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.
    Ray 2.5.0

I’m using a custom LSTM/action masking model based off of this Ray example:

My custom model

"""Custom LSTM + Action Mask model."""
# %% Imports
# Standard Library Imports
from typing import Dict, List, Tuple

# Third Party Imports
import gymnasium as gym
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.models.torch.misc import SlimFC
from ray.rllib.models.torch.recurrent_net import RecurrentNetwork as TorchRNN
from ray.rllib.policy.rnn_sequencing import add_time_dimension
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.torch_utils import FLOAT_MIN
from ray.rllib.utils.typing import TensorType

torch, nn = try_import_torch()


# %% Class
class MaskedLSTM(TorchRNN, nn.Module):
    """Fully-connected layers feed into an LSTM layer."""

    def __init__(
        self,
        obs_space: gym.spaces.Space,
        action_space: gym.spaces.Space,
        num_outputs: int,
        model_config: dict = None,
        name: str = None,
        **custom_model_kwargs,
    ):
        """Initialize MaskedLSTM model.

        Args:
            obs_space (gym.spaces.Space): Environment observation space.
            action_space (gym.spaces.Space): Environment action space.
            num_outputs (int): Number of outputs of model. Should be equal to size
                of flattened action_space.
            model_config (dict, optional): Used for Ray defaults. Defaults to {}.
            name (str, optional): Used for inheritance. Defaults to "MaskedLSTM".
            custom_model_kwargs: Configure size of FC net and LSTM layer. Required.

        Expected items in custom_model_kwargs:
            fcnet_hiddens (list[int]): Number and size of FC layers.
            fcnet_activation (str): Activation function for FC layers. See Ray
                SlimFC documentation for recognized args.
            lstm_state_size (int): Size of LSTM layer.
        """
        # Convert space to proper gym space if handed is as a different type
        orig_space = getattr(obs_space, "original_space", obs_space)
        # Size of observations must include only "observations", not "action_mask".
        # Action mask must be 1d and same len as num_outputs.
        # custom_model_kwargs must include "lstm_state_size", "fcnet_hiddens",
        # and "fcnet_activation".
        assert "observations" in orig_space.spaces
        assert "action_mask" in orig_space.spaces
        assert len(orig_space.spaces) == 2
        assert len(orig_space["action_mask"].shape) == 1
        assert orig_space["action_mask"].shape[0] == num_outputs
        assert "lstm_state_size" in custom_model_kwargs
        assert "fcnet_hiddens" in custom_model_kwargs
        assert "fcnet_activation" in custom_model_kwargs

        lstm_state_size = custom_model_kwargs.get("lstm_state_size")

        # Defaults
        if model_config is None:
            model_config = {}
        if name is None:
            name = "MaskedLSTM"

        # Inheritance
        nn.Module.__init__(self)
        super().__init__(
            obs_space, action_space, num_outputs, model_config, name
        )

        self.obs_size = orig_space["observations"].shape[0]
        # transition layer size: size of output of final hidden layer
        self.trans_layer_size = custom_model_kwargs["fcnet_hiddens"][-1]
        self.lstm_state_size = lstm_state_size

        self.fc_layers = self.makeFCLayers(
            model_config=custom_model_kwargs,
            input_size=self.obs_size,
        )

        self.lstm = nn.LSTM(
            input_size=self.trans_layer_size,
            hidden_size=self.lstm_state_size,
            batch_first=True,
        )
        self.action_branch = nn.Linear(self.lstm_state_size, num_outputs)
        self.value_branch = nn.Linear(self.lstm_state_size, 1)
        # Holds the current "base" output (before logits layer).
        self._features = None

    @override(ModelV2)
    def get_initial_state(self):
        """Initial states of hidden layers are initial states of final FC layer."""
        h = [
            self.fc_layers[-1]
            ._model[0]
            .weight.new(1, self.lstm_state_size)
            .zero_()
            .squeeze(0),
            self.fc_layers[-1]
            ._model[0]
            .weight.new(1, self.lstm_state_size)
            .zero_()
            .squeeze(0),
        ]
        return h

    @override(ModelV2)
    def value_function(self):  # noqa
        assert self._features is not None, "must call forward() first"
        return torch.reshape(self.value_branch(self._features), [-1])

    # Override forward() to add an action mask step
    @override(TorchRNN)
    def forward(
        self,
        input_dict: Dict[str, TensorType],
        state: List[TensorType],
        seq_lens: TensorType,
    ) -> Tuple[TensorType, List[TensorType]]:
        """Adds time dimension to batch before sending inputs to forward_rnn().

        You should implement forward_rnn() in your subclass.
        """
        # When training, input_dict is handed in with an extra nested level from
        # the environment (input_dict["obs"]).
        # Get observations from obs; not observations+action_mask
        flat_inputs = input_dict["obs"]["observations"].float()
        action_mask = input_dict["obs"]["action_mask"]

        # Note that max_seq_len != input_dict.max_seq_len != seq_lens.max()
        # as input_dict may have extra zero-padding beyond seq_lens.max().
        # Use add_time_dimension to handle this
        self.time_major = self.model_config.get("_time_major", False)
        inputs = add_time_dimension(
            flat_inputs,
            seq_lens=seq_lens,
            framework="torch",
            time_major=self.time_major,
        )
        output, new_state = self.forward_rnn(inputs, state, seq_lens)
        output = torch.reshape(output, [-1, self.num_outputs])
        # Mask raw logits here! Then return masked values
        output = self.maskLogits(logits=output, mask=action_mask)
        return output, new_state

    def maskLogits(self, logits: TensorType, mask: TensorType):
        """Apply mask over raw logits."""
        inf_mask = torch.clamp(torch.log(mask), min=FLOAT_MIN)
        masked_logits = logits + inf_mask
        return masked_logits

    @override(TorchRNN)
    def forward_rnn(self, inputs, state, seq_lens):
        """Feeds `inputs` (B x T x ..) through the Gru Unit.

        Returns the resulting outputs as a sequence (B x T x ...).
        Values are stored in self._cur_value in simple (B) shape (where B
        contains both the B and T dims!).

        Returns:
            NN Outputs (B x T x ...) as sequence.
            The state batches as a List of two items (c- and h-states).
        """
        x = nn.functional.relu(self.fc_layers(inputs))
        self._features, [h, c] = self.lstm(
            x, [torch.unsqueeze(state[0], 0), torch.unsqueeze(state[1], 0)]
        )
        action_out = self.action_branch(self._features)
        return action_out, [torch.squeeze(h, 0), torch.squeeze(c, 0)]

    def makeFCLayers(
        self, model_config: dict, input_size: int
    ) -> nn.Sequential:
        """Make fully-connected layers.

        See Ray SlimFC for details.

        Args:
            model_config (dict): {
                "fcnet_hiddens": (list[int]) Numer of hidden layers is number of
                    entries; size of hidden layers is values of entries,
                "fcnet_activation": (str) Recognized activation function
            }
            input_size (int): Input layer size.

        Returns:
            nn.Sequential: Has N layers, where N = len(model_config["fcnet_hiddens"]).
        """
        hiddens = list(model_config.get("fcnet_hiddens", []))
        activation = model_config.get("fcnet_activation")

        self.fc_hiddens = hiddens
        self.fc_activation = activation

        layers = []
        prev_layer_size = input_size

        # Create hidden layers.
        for size in hiddens:
            layers.append(
                SlimFC(
                    in_size=prev_layer_size,
                    out_size=size,
                    activation_fn=activation,
                )
            )
            prev_layer_size = size

        fc_layers = nn.Sequential(*layers)

        return fc_layers

I am having issues when I use policy.compute_single_action() on a policy that uses my custom model, using the script below. The script is a close copy of the example above. I get the below shape error seemingly randomly, but more often than not. Sometimes script works correctly, but I cannot pin down the conditions on when it works correctly. I can literally run it fine, then run it immediately afterwards, with no changes, and the script will error out.

Relevant portion of script:

ModelCatalog.register_custom_model("MaskedLSTM", MaskedLSTM)

lstm_state_size = 10
config = (
    ppo.PPOConfig()
    .environment(RandomEnv, env_config=env_config)
    .framework("torch")
    .training(
        model={
            # Specify our custom model from above.
            "custom_model": "MaskedLSTM",
            # Extra kwargs to be passed to your model's c'tor.
            "custom_model_config": {
                "fcnet_hiddens": [6, 6],
                "fcnet_activation": "relu",
                "lstm_state_size": lstm_state_size,
            },
        }
    )
)
algo = config.build()
policy = algo.get_policy()
print(f"policy = {policy}")

# 2 ways of getting initial state, both should give the same list of zeros.
state = [zeros([lstm_state_size], float32) for _ in range(2)]
print(f"state = {state}")

action, state_out, _ = policy.compute_single_action(
    obs=policy.observation_space.sample(),
    state=state,
)

Error:

Traceback (most recent call last):
  File "/tests/nets/test_lstm_mask.py", line 137, in <module>
    action, state_out, _ = policy.compute_single_action(
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/ray/rllib/policy/policy.py", line 545, in compute_single_action
    out = self.compute_actions_from_input_dict(
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/ray/rllib/policy/torch_policy_v2.py", line 526, in compute_actions_from_input_dict
    return self._compute_action_helper(
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/ray/rllib/utils/threading.py", line 24, in wrapper
    return func(self, *a, **k)
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/ray/rllib/policy/torch_policy_v2.py", line 1171, in _compute_action_helper
    action_dist = dist_class(dist_inputs, self.model)
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/ray/rllib/models/torch/torch_action_dist.py", line 114, in __init__
    self.cats = [
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/ray/rllib/models/torch/torch_action_dist.py", line 115, in <listcomp>
    torch.distributions.categorical.Categorical(logits=input_)
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/torch/distributions/categorical.py", line 66, in __init__
    super(Categorical, self).__init__(batch_shape, validate_args=validate_args)
  File "/anaconda3/envs/punch/lib/python3.10/site-packages/torch/distributions/distribution.py", line 56, in __init__
    raise ValueError(
ValueError: Expected parameter logits (Tensor of shape (1, 2)) of distribution Categorical(logits: torch.Size([1, 2])) to satisfy the constraint IndependentConstraint(Real(), 1), but found invalid values:
tensor([[nan, nan]])

Given that the script does work sometimes, I think my model is probably implemented correctly, and I’m just writing my test poorly. But I could be wrong. Any help would be much appreciated.