`modelconfig.get("_time_major")` error

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m working on building a custom pytorch model for Rllib. However, I keep running into a problem with `modelconfig.get(“_time_major”). Here 's the specific error:

I’ve tried to avoid this error with no success. My understanding of the erro is that “_time_major” in this case is a string not a dict. I could be wrong.

Below is the full code snippet of the NN model

CUSTOM_CONFIG = {
    "max_seq_len": 20,
    "cell_size": 512,
    "time_major": True,
    "use_prev_action": True,
    "use_prev_reward": True,
    "use_prev_obs" : True,
    "rnn_num_layers": 10,
    "fcnet_hiddens": [512,1024,1024, 512],
    "rnn_hiddens": [512,1024,2048, 2048, 4096, 4096, 2048, 2048,1024,512],
    "linear_activation": "swish",
    "post_rnn_activation": "swish",
}

class CustomNet(RecurrentNetwork, nn.Module):
    """A CustomNet for Ray RLLib"""

    def __init__(
        self,
        obs_space: gym.spaces.Space,
        action_space: gym.spaces.Space,
        num_outputs: int,
        name: str,
        model_config: dict= CUSTOM_CONFIG,
        init_gru_gate_bias: int = 2,
        num_frames=10,
    ):
        nn.Module.__init__(self)
        super().__init__(
            obs_space, action_space, num_outputs, model_config, name
        )
        self.obs_space = obs_space
        self.action_space =action_space
        self.num_outputs = num_outputs
        self.model_config=model_config
        self.name=name

        self.init_gru_gate_bias = init_gru_gate_bias
        self.num_frames=num_frames

        if self.num_outputs is None:
            self.num_outputs = int(np.product(self.obs_space.shape))  # type: ignore

        
        #self.time_major = self.model_config.get("time_major", False)
        self._time_major = self.model_config["time_major"]
        self.use_prev_action = self.model_config["use_prev_action"]
        self.use_prev_reward = self.model_config["use_prev_reward"]
        self.use_prev_reward = self.model_config["use_prev_obs"]
        self.num_layers = self.model_config["rnn_num_layers"]
        self.cell_size = self.model_config["cell_size"]

        fcnet_hiddens = list(self.model_config.get("fcnet_hiddens"), [])  # type:ignore
        rnn_hiddens = list(self.model_config.get("rnn_hiddens"), [])  # type:ignore

        activation = self.model_config.get("linear_activation")
        post_rnn_activation = self.model_config.get("post_rnn_activation")

        self.action_space_struct = get_base_struct_from_space(self.action_space)
        self.action_dim = sum(
            int(np.product(space.shape)) if space.shape is not None else len(space)  # type: ignore
            for space in tree.flatten(self.action_space_struct)
            if isinstance(space, (Discrete, MultiDiscrete))
        )

        # Add prev-action/reward nodes to input to LSTM.
        if self.use_prev_action:
            self.num_outputs += self.action_dim  # type: ignore
        if self.use_prev_reward:
            self.num_outputs += 1  # type: ignore

        w = torch.empty(3, 5)
        global_init = (
            torch.nn.init.kaiming_uniform_
        )  # (tensor, a=0, mode='fan_in', nonlinearity='leaky_relu')

        # Construct the fully connected layers
        self.fc_layers = nn.Sequential(
            SlimFC(
                in_size=self.num_outputs,  # type: ignore
                out_size=self.cell_size,  # type: ignore
                initializer=global_init,
                activation_fn=activation,
                use_bias=True,
                bias_init=0,
            ),
            *[
                self._create_fc_layer(self.cell_size, size, global_init, activation)
                for size in fcnet_hiddens
            ],
        )

        # Construct the RNN Layers
        dropout = 0.5
        self.rnn_layers = nn.Sequential(
            *[
                self._create_rnn_layer(size, global_init, activation, dropout)
                for size in rnn_hiddens[:-1]
            ]
        )

        # Set self.num_outputs to the number of output nodes desired by the
        # caller of this constructor. Restore to original outputs
        self.num_outputs = num_outputs

        # Postprocess RNN output with another hidden layer.
        self.logits = self._create_output_layer(
            self.cell_size, self.num_outputs, global_init, post_rnn_activation
        )
        self.values_out = self._create_output_layer(
            self.cell_size, 1, global_init, None
        )

        # Setup trajectory views for previous action, reward and previous num_frames observations
        # TODO: Should I shift the actions and previous actions further???

        # Add prev-a/r to this model's view, if required.
        if self.model_config["use_prev_action"]:
            self.view_requirements[SampleBatch.PREV_ACTIONS] = ViewRequirement(
                data_col=SampleBatch.ACTIONS,
                space=self.action_space,
                shift=-1,  # f"-{self.num_frames -1}:0"
            )
        if self.model_config["use_prev_reward"]:
            self.view_requirements[SampleBatch.PREV_REWARDS] = ViewRequirement(
                data_col=SampleBatch.REWARDS, shift=-1  # f"-{self.num_frames -1}:0"
            )
        if self.model_config["use_prev_obs"]:
            self.view_requirements["prev_n_obs"] = ViewRequirement(
                data_col="obs", 
                space=self.obs_space, 
                shift=f"-{self.num_frames -1}:0"
            )
        
        # Holds the current "base" output (before logits layer).
        self._features = None

        # Print out model summary in INFO logging mode.
        # if logger.isEnabledFor(logging.INFO):
        #    self.forward.summary()

    def _create_fc_layer(self, in_size, out_size, initializer, activation):
        layer = SlimFC(
            in_size=in_size,
            out_size=out_size,
            initializer=initializer,
            activation_fn=activation,
        )
        return layer if self.num_outputs is not None else None

    def _create_rnn_layer(self, size, initializer, activation, dropout):
        lstm_layer = nn.LSTM(
            in_size=size,
            hidden_size=size,
            num_layers=self.num_layers,
            bias=True,
            batch_first=not self._time_major,
            dropout=dropout,
            bidirectional=False,
            proj_size=0,
        )
        gru_layer = nn.GRU(
            in_size=size,
            hidden_size=size,
            num_layers=self.num_layers,
            bias=True,
            batch_first=not self._time_major,
            dropout=dropout,
            bidirectional=False,
            proj_size=0,
        )
        return nn.Sequential(
            SkipConnection(nn.Sequential(lstm_layer, nn.Tanh())),
            SkipConnection(
                nn.Sequential(
                    torch.nn.LayerNorm(normalized_shape=size),
                    SlimFC(
                        in_size=size,
                        out_size=size,
                        initializer=initializer,
                        activation_fn=activation,
                        use_bias=True,
                        bias_init=0,
                    ),
                    gru_layer,
                    SlimFC(
                        in_size=size,
                        out_size=size,
                        initializer=initializer,
                        activation_fn=activation,
                        use_bias=True,
                        bias_init=0,
                    ),
                ),
                fan_in_layer=GRUGate(size, self.init_gru_gate_bias),
            ),
        )

    def _create_output_layer(self, in_size, out_size, initializer, activation):
        layer = SlimFC(
            in_size=in_size,
            out_size=out_size,
            initializer=initializer,
            activation_fn=activation,
        )
        return layer if self.num_outputs is not None else None

    @override(RecurrentNetwork)
    def forward(
        self,
        input_dict: Dict[str, TensorType],
        state: List[TensorType],
        seq_lens: TensorType,
    ) -> Tuple[TensorType, List[TensorType]]:
        if seq_lens is None:
            raise ValueError("seq_lens cannot be None")

        """Adds 
        1. trajectory view 
        2. time dimension to batch before sending inputs to forward_rnn().

        You should implement forward_rnn() in your subclass."""
        flat_inputs = input_dict["obs_flat"].float()  # type:ignore

        # Concat. prev-action/reward if required.
        prev_a_r_o = []

        # Prev actions.
        if self.model_config["use_prev_action"]:
            prev_a = input_dict[SampleBatch.PREV_ACTIONS]
            # If actions are not processed yet (in their original form as
            # have been sent to environment):
            # Flatten/one-hot into 1D array.
            if self.model_config["_disable_action_flattening"]:
                prev_a_r_o.append(
                    flatten_inputs_to_1d_tensor(
                        prev_a, spaces_struct=self.action_space_struct, time_axis=False
                    )
                )
            # If actions are already flattened (but not one-hot'd yet!),
            # one-hot discrete/multi-discrete actions here.
            else:
                if prev_a is not None:
                    if isinstance(self.action_space, (Discrete, MultiDiscrete)):
                        prev_a = one_hot(
                            prev_a.float(), self.action_space #type:ignore
                        )  # type:ignore
                    else:
                        prev_a = prev_a.float()  # type:ignore
                    prev_a_r_o.append(
                        torch.reshape(prev_a.float(), [-1, self.action_dim])#type:ignore
                    )  # type:ignore
        # Prev rewards.
        if self.model_config["use_prev_reward"]:
            prev_r = input_dict[SampleBatch.PREV_REWARDS]
            prev_a_r_o.append(torch.reshape(prev_r.float(),  [-1, self.num_frames])) #type:ignore
        """
        The reason for using `unsqueeze` instead of `reshape` is that `prev_r` is a 1D tensor with shape `(batch_size,)`, and we want to concatenate it with `flat_inputs`, which has shape `(batch_size, obs_size)`. 

        When we use `torch.reshape(prev_r, [-1, 1])`, we are reshaping `prev_r` into a 2D tensor with shape `(batch_size, 1)`. This works fine for concatenation, but it's unnecessary because we can achieve the same result by using `unsqueeze(1)`.

        `unsqueeze(1)` adds a new dimension to `prev_r` at index 1, resulting in a 2D tensor with shape `(batch_size, 1)`. This is equivalent to the shape we get from using `reshape`, but it's more efficient because it doesn't require copying the data. 

        In general, `unsqueeze` is preferred over `reshape` when adding a new dimension to a tensor because it's faster and more memory-efficient.
        """

        # Prev observations
        if self.model_config["use _prev_obs"]:
            prev_obs = input_dict["prev_n_obs"]
            prev_obs = torch.reshape(prev_obs, [-1, self.obs_space.shape[0] * self.num_frames])  # type: ignore
            prev_a_r_o.append(prev_obs)

        # Concat prev. actions + rewards to the "main" input.
        if prev_a_r_o:
            flat_inputs = torch.cat([flat_inputs] + prev_a_r_o, dim=1)

        # Push everything through our RNN.
        #input_dict["obs_flat"] = flat_inputs

        # Note that max_seq_len != input_dict.max_seq_len != seq_lens.max()
        # as input_dict may have extra zero-padding beyond seq_lens.max().
        # Use add_time_dimension to handle this
        
        '''inputs = add_time_dimension(
            flat_inputs,
            seq_lens=seq_lens,
            framework="torch",
            time_major=self._time_major,
        )'''
        output, new_state = self.forward_rnn(flat_inputs, state, seq_lens)
        output = torch.reshape(output, [-1, self.num_outputs])  # type:ignore
        return output, new_state

    def forward_rnn(
        self, inputs: TensorType, state: List[TensorType], seq_lens: TensorType
    ) -> Tuple[TensorType, List[TensorType]]:
        """
        The forward_rnn method takes in an additional argument state_in, which represents the previous hidden state of the RNN layer. This allows the RNN layer to maintain its internal state across time steps.
        """
        # Pass input through the fully connected layer
        x = F.relu(self.fc_layers(inputs))

        # Pass input through RNN layer
        self._features, [h, c] = self.rnn_layers(
            x,
            [torch.unsqueeze(state[0], 0), torch.unsqueeze(state[1], 0)],  # type:ignore
        )
        model_out = self.logits(self._features)  # type:ignore
        return model_out, [torch.squeeze(h, 0), torch.squeeze(c, 0)]

    # TODO: (sven) Deprecate this once trajectory view API has fully matured.
    @override(RecurrentNetwork)
    def get_initial_state(self) -> List[np.ndarray]:
        # Place hidden states on same device as model.
        linear = next(self.logits._model.children())  # type:ignore
        h = [
            linear.weight.new(1, self.cell_size).zero_().squeeze(0),  # type:ignore
            linear.weight.new(1, self.cell_size).zero_().squeeze(0),  # type:ignore
        ]
        return h  # type:ignore

    @override(ModelV2)
    def value_function(self) -> TensorType:
        if self._features is None:
            raise ValueError(
                "self._features cannot be None. Must call forward first AND must have value branch!"
            )
        return torch.reshape(self.values_out(self._features), [-1])  # type: ignore # RNN

Any advice of what I could be doing wrong?

Hi @grizzlybearg,

You have swapped the order of the arguments in init.

You have name followed by model_config.
The super class is calling your custom model with model_config followed by name.

Thanks. It worked. I changed it to self, obs_space: gym.spaces.Space, action_space: gym.spaces.Space, num_outputs: int, model_config: ModelConfigDict, name: str,