How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I’m working on building a custom pytorch model for Rllib. However, I keep running into a problem with `modelconfig.get(“_time_major”). Here 's the specific error:
I’ve tried to avoid this error with no success. My understanding of the erro is that “_time_major” in this case is a string not a dict. I could be wrong.
Below is the full code snippet of the NN model
CUSTOM_CONFIG = {
"max_seq_len": 20,
"cell_size": 512,
"time_major": True,
"use_prev_action": True,
"use_prev_reward": True,
"use_prev_obs" : True,
"rnn_num_layers": 10,
"fcnet_hiddens": [512,1024,1024, 512],
"rnn_hiddens": [512,1024,2048, 2048, 4096, 4096, 2048, 2048,1024,512],
"linear_activation": "swish",
"post_rnn_activation": "swish",
}
class CustomNet(RecurrentNetwork, nn.Module):
"""A CustomNet for Ray RLLib"""
def __init__(
self,
obs_space: gym.spaces.Space,
action_space: gym.spaces.Space,
num_outputs: int,
name: str,
model_config: dict= CUSTOM_CONFIG,
init_gru_gate_bias: int = 2,
num_frames=10,
):
nn.Module.__init__(self)
super().__init__(
obs_space, action_space, num_outputs, model_config, name
)
self.obs_space = obs_space
self.action_space =action_space
self.num_outputs = num_outputs
self.model_config=model_config
self.name=name
self.init_gru_gate_bias = init_gru_gate_bias
self.num_frames=num_frames
if self.num_outputs is None:
self.num_outputs = int(np.product(self.obs_space.shape)) # type: ignore
#self.time_major = self.model_config.get("time_major", False)
self._time_major = self.model_config["time_major"]
self.use_prev_action = self.model_config["use_prev_action"]
self.use_prev_reward = self.model_config["use_prev_reward"]
self.use_prev_reward = self.model_config["use_prev_obs"]
self.num_layers = self.model_config["rnn_num_layers"]
self.cell_size = self.model_config["cell_size"]
fcnet_hiddens = list(self.model_config.get("fcnet_hiddens"), []) # type:ignore
rnn_hiddens = list(self.model_config.get("rnn_hiddens"), []) # type:ignore
activation = self.model_config.get("linear_activation")
post_rnn_activation = self.model_config.get("post_rnn_activation")
self.action_space_struct = get_base_struct_from_space(self.action_space)
self.action_dim = sum(
int(np.product(space.shape)) if space.shape is not None else len(space) # type: ignore
for space in tree.flatten(self.action_space_struct)
if isinstance(space, (Discrete, MultiDiscrete))
)
# Add prev-action/reward nodes to input to LSTM.
if self.use_prev_action:
self.num_outputs += self.action_dim # type: ignore
if self.use_prev_reward:
self.num_outputs += 1 # type: ignore
w = torch.empty(3, 5)
global_init = (
torch.nn.init.kaiming_uniform_
) # (tensor, a=0, mode='fan_in', nonlinearity='leaky_relu')
# Construct the fully connected layers
self.fc_layers = nn.Sequential(
SlimFC(
in_size=self.num_outputs, # type: ignore
out_size=self.cell_size, # type: ignore
initializer=global_init,
activation_fn=activation,
use_bias=True,
bias_init=0,
),
*[
self._create_fc_layer(self.cell_size, size, global_init, activation)
for size in fcnet_hiddens
],
)
# Construct the RNN Layers
dropout = 0.5
self.rnn_layers = nn.Sequential(
*[
self._create_rnn_layer(size, global_init, activation, dropout)
for size in rnn_hiddens[:-1]
]
)
# Set self.num_outputs to the number of output nodes desired by the
# caller of this constructor. Restore to original outputs
self.num_outputs = num_outputs
# Postprocess RNN output with another hidden layer.
self.logits = self._create_output_layer(
self.cell_size, self.num_outputs, global_init, post_rnn_activation
)
self.values_out = self._create_output_layer(
self.cell_size, 1, global_init, None
)
# Setup trajectory views for previous action, reward and previous num_frames observations
# TODO: Should I shift the actions and previous actions further???
# Add prev-a/r to this model's view, if required.
if self.model_config["use_prev_action"]:
self.view_requirements[SampleBatch.PREV_ACTIONS] = ViewRequirement(
data_col=SampleBatch.ACTIONS,
space=self.action_space,
shift=-1, # f"-{self.num_frames -1}:0"
)
if self.model_config["use_prev_reward"]:
self.view_requirements[SampleBatch.PREV_REWARDS] = ViewRequirement(
data_col=SampleBatch.REWARDS, shift=-1 # f"-{self.num_frames -1}:0"
)
if self.model_config["use_prev_obs"]:
self.view_requirements["prev_n_obs"] = ViewRequirement(
data_col="obs",
space=self.obs_space,
shift=f"-{self.num_frames -1}:0"
)
# Holds the current "base" output (before logits layer).
self._features = None
# Print out model summary in INFO logging mode.
# if logger.isEnabledFor(logging.INFO):
# self.forward.summary()
def _create_fc_layer(self, in_size, out_size, initializer, activation):
layer = SlimFC(
in_size=in_size,
out_size=out_size,
initializer=initializer,
activation_fn=activation,
)
return layer if self.num_outputs is not None else None
def _create_rnn_layer(self, size, initializer, activation, dropout):
lstm_layer = nn.LSTM(
in_size=size,
hidden_size=size,
num_layers=self.num_layers,
bias=True,
batch_first=not self._time_major,
dropout=dropout,
bidirectional=False,
proj_size=0,
)
gru_layer = nn.GRU(
in_size=size,
hidden_size=size,
num_layers=self.num_layers,
bias=True,
batch_first=not self._time_major,
dropout=dropout,
bidirectional=False,
proj_size=0,
)
return nn.Sequential(
SkipConnection(nn.Sequential(lstm_layer, nn.Tanh())),
SkipConnection(
nn.Sequential(
torch.nn.LayerNorm(normalized_shape=size),
SlimFC(
in_size=size,
out_size=size,
initializer=initializer,
activation_fn=activation,
use_bias=True,
bias_init=0,
),
gru_layer,
SlimFC(
in_size=size,
out_size=size,
initializer=initializer,
activation_fn=activation,
use_bias=True,
bias_init=0,
),
),
fan_in_layer=GRUGate(size, self.init_gru_gate_bias),
),
)
def _create_output_layer(self, in_size, out_size, initializer, activation):
layer = SlimFC(
in_size=in_size,
out_size=out_size,
initializer=initializer,
activation_fn=activation,
)
return layer if self.num_outputs is not None else None
@override(RecurrentNetwork)
def forward(
self,
input_dict: Dict[str, TensorType],
state: List[TensorType],
seq_lens: TensorType,
) -> Tuple[TensorType, List[TensorType]]:
if seq_lens is None:
raise ValueError("seq_lens cannot be None")
"""Adds
1. trajectory view
2. time dimension to batch before sending inputs to forward_rnn().
You should implement forward_rnn() in your subclass."""
flat_inputs = input_dict["obs_flat"].float() # type:ignore
# Concat. prev-action/reward if required.
prev_a_r_o = []
# Prev actions.
if self.model_config["use_prev_action"]:
prev_a = input_dict[SampleBatch.PREV_ACTIONS]
# If actions are not processed yet (in their original form as
# have been sent to environment):
# Flatten/one-hot into 1D array.
if self.model_config["_disable_action_flattening"]:
prev_a_r_o.append(
flatten_inputs_to_1d_tensor(
prev_a, spaces_struct=self.action_space_struct, time_axis=False
)
)
# If actions are already flattened (but not one-hot'd yet!),
# one-hot discrete/multi-discrete actions here.
else:
if prev_a is not None:
if isinstance(self.action_space, (Discrete, MultiDiscrete)):
prev_a = one_hot(
prev_a.float(), self.action_space #type:ignore
) # type:ignore
else:
prev_a = prev_a.float() # type:ignore
prev_a_r_o.append(
torch.reshape(prev_a.float(), [-1, self.action_dim])#type:ignore
) # type:ignore
# Prev rewards.
if self.model_config["use_prev_reward"]:
prev_r = input_dict[SampleBatch.PREV_REWARDS]
prev_a_r_o.append(torch.reshape(prev_r.float(), [-1, self.num_frames])) #type:ignore
"""
The reason for using `unsqueeze` instead of `reshape` is that `prev_r` is a 1D tensor with shape `(batch_size,)`, and we want to concatenate it with `flat_inputs`, which has shape `(batch_size, obs_size)`.
When we use `torch.reshape(prev_r, [-1, 1])`, we are reshaping `prev_r` into a 2D tensor with shape `(batch_size, 1)`. This works fine for concatenation, but it's unnecessary because we can achieve the same result by using `unsqueeze(1)`.
`unsqueeze(1)` adds a new dimension to `prev_r` at index 1, resulting in a 2D tensor with shape `(batch_size, 1)`. This is equivalent to the shape we get from using `reshape`, but it's more efficient because it doesn't require copying the data.
In general, `unsqueeze` is preferred over `reshape` when adding a new dimension to a tensor because it's faster and more memory-efficient.
"""
# Prev observations
if self.model_config["use _prev_obs"]:
prev_obs = input_dict["prev_n_obs"]
prev_obs = torch.reshape(prev_obs, [-1, self.obs_space.shape[0] * self.num_frames]) # type: ignore
prev_a_r_o.append(prev_obs)
# Concat prev. actions + rewards to the "main" input.
if prev_a_r_o:
flat_inputs = torch.cat([flat_inputs] + prev_a_r_o, dim=1)
# Push everything through our RNN.
#input_dict["obs_flat"] = flat_inputs
# Note that max_seq_len != input_dict.max_seq_len != seq_lens.max()
# as input_dict may have extra zero-padding beyond seq_lens.max().
# Use add_time_dimension to handle this
'''inputs = add_time_dimension(
flat_inputs,
seq_lens=seq_lens,
framework="torch",
time_major=self._time_major,
)'''
output, new_state = self.forward_rnn(flat_inputs, state, seq_lens)
output = torch.reshape(output, [-1, self.num_outputs]) # type:ignore
return output, new_state
def forward_rnn(
self, inputs: TensorType, state: List[TensorType], seq_lens: TensorType
) -> Tuple[TensorType, List[TensorType]]:
"""
The forward_rnn method takes in an additional argument state_in, which represents the previous hidden state of the RNN layer. This allows the RNN layer to maintain its internal state across time steps.
"""
# Pass input through the fully connected layer
x = F.relu(self.fc_layers(inputs))
# Pass input through RNN layer
self._features, [h, c] = self.rnn_layers(
x,
[torch.unsqueeze(state[0], 0), torch.unsqueeze(state[1], 0)], # type:ignore
)
model_out = self.logits(self._features) # type:ignore
return model_out, [torch.squeeze(h, 0), torch.squeeze(c, 0)]
# TODO: (sven) Deprecate this once trajectory view API has fully matured.
@override(RecurrentNetwork)
def get_initial_state(self) -> List[np.ndarray]:
# Place hidden states on same device as model.
linear = next(self.logits._model.children()) # type:ignore
h = [
linear.weight.new(1, self.cell_size).zero_().squeeze(0), # type:ignore
linear.weight.new(1, self.cell_size).zero_().squeeze(0), # type:ignore
]
return h # type:ignore
@override(ModelV2)
def value_function(self) -> TensorType:
if self._features is None:
raise ValueError(
"self._features cannot be None. Must call forward first AND must have value branch!"
)
return torch.reshape(self.values_out(self._features), [-1]) # type: ignore # RNN
Any advice of what I could be doing wrong?