In trajectory_view_API, I want to add "infos" to the model input

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

In trajectory_view_API I want to add “infos” to the model input

Actions and rewards can be added to the model in trajectory_view_API, I now want to add “Infos” to the model, but I tried and failed. The infos data can be filled in during sampling, but errors will be reported during training

My main change was to add “infos” for the StatelessCartPole env, then added in trajectory_view_API

self.view_requirements["infos"] = ViewRequirement(

data_col="infos",

shift=0,

space = infos_space,

used_for_training=True,

used_for_compute_actions=True,

batch_repeat_value=1



)

and

T = torch.reshape(input_dict["infos"]["temp"], [-1, 1 * 1])

input_ = torch.cat([obs, actions, rewards, T], dim=-1)

Here is my script file.

from gym.spaces import Box
import numpy as np

from gym.envs.classic_control import CartPoleEnv


class StatelessCartPole(CartPoleEnv):
    """Partially observable variant of the CartPole gym environment.

    https://github.com/openai/gym/blob/master/gym/envs/classic_control/
    cartpole.py

    We delete the x- and angular velocity components of the state, so that it
    can only be solved by a memory enhanced model (policy).
    """

    def __init__(self, config=None):
        super().__init__()

        # Fix our observation-space (remove 2 velocity components).
        high = np.array(
            [
                self.x_threshold * 2,
                self.theta_threshold_radians * 2,
            ],
            dtype=np.float32,
        )

        self.observation_space = Box(low=-high, high=high, dtype=np.float32)

    def step(self, action):
        next_obs, reward, done, info = super().step(action)
        ###############################
        info = {"temp":1}
        ###############################
        # next_obs is [x-pos, x-veloc, angle, angle-veloc]
        return np.array([next_obs[0], next_obs[2]]), reward, done, info

    def reset(self):
        init_obs = super().reset()
        # init_obs is [x-pos, x-veloc, angle, angle-veloc]
        return np.array([init_obs[0], init_obs[2]])


from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.torch.misc import SlimFC
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.policy.view_requirement import ViewRequirement
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.tf_utils import one_hot
from ray.rllib.utils.torch_utils import one_hot as torch_one_hot

tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()

# __sphinx_doc_begin__





class TorchFrameStackingCartPoleModel(TorchModelV2, nn.Module):
    """A simple FC model that takes the last n observations as input."""

    def __init__(
        self, obs_space, action_space, num_outputs, model_config, name, num_frames=3
    ):
        nn.Module.__init__(self)
        super(TorchFrameStackingCartPoleModel, self).__init__(
            obs_space, action_space, None, model_config, name
        )

        self.num_frames = num_frames
        self.num_outputs = num_outputs

        # Construct actual (very simple) FC model.
        assert len(obs_space.shape) == 1
        in_size = self.num_frames * (obs_space.shape[0] + action_space.n + 1)+1
        self.layer1 = SlimFC(in_size=in_size, out_size=256, activation_fn="relu")
        self.layer2 = SlimFC(in_size=256, out_size=256, activation_fn="relu")
        self.out = SlimFC(
            in_size=256, out_size=self.num_outputs, activation_fn="linear"
        )
        self.values = SlimFC(in_size=256, out_size=1, activation_fn="linear")

        self._last_value = None

        self.view_requirements["prev_n_obs"] = ViewRequirement(
            data_col="obs", shift="-{}:0".format(num_frames - 1), space=obs_space
        )
        self.view_requirements["prev_n_rewards"] = ViewRequirement(
            data_col="rewards", shift="-{}:-1".format(self.num_frames)
        )
        self.view_requirements["prev_n_actions"] = ViewRequirement(
            data_col="actions",
            shift="-{}:-1".format(self.num_frames),
            space=self.action_space,
        )
        ###########################################
        from gym.spaces import Box, Dict, Discrete, Tuple
        infos_space = Dict({"temp": Box(-1.0, 1.0, (1,))})
        self.view_requirements["infos"] = ViewRequirement(
            data_col="infos",
            shift=0,
            space = infos_space,
            used_for_training=True,
            used_for_compute_actions=True,
            batch_repeat_value=1
            
        )
        ###########################################
    def forward(self, input_dict, states, seq_lens):
        obs = input_dict["prev_n_obs"]
        obs = torch.reshape(obs, [-1, self.obs_space.shape[0] * self.num_frames])
        rewards = torch.reshape(input_dict["prev_n_rewards"], [-1, self.num_frames])
        actions = torch_one_hot(input_dict["prev_n_actions"], self.action_space)
        actions = torch.reshape(actions, [-1, self.num_frames * actions.shape[-1]])
        print(type(input_dict["infos"]))
        print(input_dict["infos"])
        ########################################
        T = torch.reshape(input_dict["infos"]["temp"], [-1, 1 * 1])
        input_ = torch.cat([obs, actions, rewards, T], dim=-1)
        ########################################        
        features = self.layer1(input_)
        features = self.layer2(features)
        out = self.out(features)
        self._last_value = self.values(features)
        return out, []

    def value_function(self):
        return torch.squeeze(self._last_value, -1)


import argparse
import numpy as np

import ray
from ray.rllib.agents.ppo import PPOTrainer


from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.test_utils import check_learning_achieved
from ray import tune

tf1, tf, tfv = try_import_tf()

parser = argparse.ArgumentParser()
parser.add_argument(
    "--run", type=str, default="PPO", help="The RLlib-registered algorithm to use."
)
parser.add_argument(
    "--framework",
    choices=["tf", "tf2", "tfe", "torch"],
    default="torch",
    help="The DL framework specifier.",
)
parser.add_argument(
    "--as-test",
    action="store_true",
    help="Whether this script should be run as a test: --stop-reward must "
    "be achieved within --stop-timesteps AND --stop-iters.",
)
parser.add_argument(
    "--stop-iters", type=int, default=50, help="Number of iterations to train."
)
parser.add_argument(
    "--stop-timesteps", type=int, default=200000, help="Number of timesteps to train."
)
parser.add_argument(
    "--stop-reward", type=float, default=150.0, help="Reward at which we stop training."
)

if __name__ == "__main__":
    args = parser.parse_args()
    ray.init(num_cpus=3)

    num_frames = 16

    ModelCatalog.register_custom_model(
        "frame_stack_model",
        FrameStackingCartPoleModel
        if args.framework != "torch"
        else TorchFrameStackingCartPoleModel,
    )

    config = {
        "env": StatelessCartPole,
        "model": {
            "vf_share_layers": True,
            "custom_model": "frame_stack_model",
            "custom_model_config": {
                "num_frames": num_frames,
            },
            # To compare against a simple LSTM:
            # "use_lstm": True,
            # "lstm_use_prev_action": True,
            # "lstm_use_prev_reward": True,
            # To compare against a simple attention net:
            # "use_attention": True,
            # "attention_use_n_prev_actions": 1,
            # "attention_use_n_prev_rewards": 1,
        },
        #"num_sgd_iter": 5,
        #"vf_loss_coeff": 0.0001,
        "framework": args.framework,
    }

    stop = {
        "training_iteration": args.stop_iters,
        "timesteps_total": args.stop_timesteps,
        "episode_reward_mean": args.stop_reward,
    }
    results = tune.run(
        args.run, config=config, stop=stop, verbose=2, checkpoint_at_end=True
    )

    if args.as_test:
        check_learning_achieved(results, args.stop_reward)

    checkpoints = results.get_trial_checkpoints_paths(
        trial=results.get_best_trial("episode_reward_mean", mode="max"),
        metric="episode_reward_mean",
    )

    checkpoint_path = checkpoints[0][0]
    trainer = PPOTrainer(config)
    trainer.restore(checkpoint_path)

    # Inference loop.
    env = StatelessCartPole()

    # Run manual inference loop for n episodes.
    for _ in range(10):
        episode_reward = 0.0
        reward = 0.0
        action = 0
        done = False
        obs = env.reset()
        while not done:
            # Create a dummy action using the same observation n times,
            # as well as dummy prev-n-actions and prev-n-rewards.
            action, state, logits = trainer.compute_single_action(
                input_dict={
                    "obs": obs,
                    "prev_n_obs": np.stack([obs for _ in range(num_frames)]),
                    "prev_n_actions": np.stack([0 for _ in range(num_frames)]),
                    "prev_n_rewards": np.stack([1.0 for _ in range(num_frames)]),
                },
                full_fetch=True,
            )
            obs, reward, done, info = env.step(action)
            episode_reward += reward

        print(f"Episode reward={episode_reward}")

    ray.shutdown()

Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\tune\ray_trial_executor.py", line 901, in get_next_executor_event
    future_result = ray.get(ready_future)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\worker.py", line 1809, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(IndexError): e[36mray::PPOTrainer.train()e[39m (pid=11832, ip=127.0.0.1, repr=PPOTrainer)
  File "python\ray\_raylet.pyx", line 663, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 667, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 614, in ray._raylet.execute_task.function_executor
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\function_manager.py", line 701, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\tune\trainable.py", line 349, in train
    result = self.step()
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 1088, in step
    raise e
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 1074, in step
    step_attempt_results = self.step_attempt()
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 1155, in step_attempt
    step_results = self._exec_plan_or_training_iteration_fn()
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 2174, in _exec_plan_or_training_iteration_fn
    results = next(self.train_exec_impl)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 779, in __next__
    return next(self.built_iterator)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 869, in apply_filter
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 869, in apply_filter
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  [Previous line repeated 1 more time]
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 904, in apply_flatten
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
    for item in it:
  [Previous line repeated 1 more time]
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 492, in base_iterator
    yield ray.get(futures, timeout=timeout)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\worker.py", line 1809, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(IndexError): e[36mray::RolloutWorker.par_iter_next()e[39m (pid=7008, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x0000020C93270C40>)
  File "python\ray\_raylet.pyx", line 663, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 667, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 614, in ray._raylet.execute_task.function_executor
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\function_manager.py", line 701, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 1186, in par_iter_next
    return next(self.local_it)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 404, in gen_rollouts
    yield self.sample()
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 815, in sample
    batches = [self.input_reader.next()]
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 116, in next
    batches = [self.get_data()]
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 289, in get_data
    item = next(self._env_runner)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 702, in _env_runner
    eval_results = _do_policy_eval(
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 1162, in _do_policy_eval
    eval_results[policy_id] = policy.compute_actions_from_input_dict(
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\policy\torch_policy.py", line 328, in compute_actions_from_input_dict
    return self._compute_action_helper(
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\utils\threading.py", line 21, in wrapper
    return func(self, *a, **k)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\policy\torch_policy.py", line 986, in _compute_action_helper
    dist_inputs, state_out = self.model(input_dict, state_batches, seq_lens)
  File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\models\modelv2.py", line 251, in __call__
    res = self.forward(restored, state or [], seq_lens)
  File "C:\Users\XXXX\Downloads\test.py", line 117, in forward
    T = torch.reshape(input_dict["infos"]["temp"], [-1, 1 * 1])
IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices

Hi @robot-xyh,

The issue you are running into is that during training rllib converts all the data in the sample batch into a numpy array. This means that input_dict[“infos”] is not a dictionary but a numpy array of objects and you are trying to get the “temp” index of that numpy array. That is of course not a valid index.

The issue is made more confusing though because during some of the initialization methods for trajectory view it is treating them as dictionaries. So it works on the first few calls while everything is being set up but then fails when training actually starts. I have written about this in the past if you are not familiar with it. You can check out one of these Search results for 'initialize_loss_from @mannyv' - Ray

There are ways to handle this in your forward method but I think it would require changes to other parts of the code base if for example you wanted to use an lstm then you would need to pad the info numpy objects to max_seq_len. That might work out of the box or it might not. I am not sure.

The simplest way to handle this would be for you to add it as part of the observation. Either by adding one more piece of data to the existing information.

np.array([next_obs[0], next_obs[2], temp_value])

or using a dictionary space
`{
“obs”: np.array([next_obs[0], next_obs[2], temp_value]),
“temp”: np.ones(1)
}

Is there a reason one of these options would not work for you?

Thank you @mannyv
Putting Infos into obs would work, but my obs space would probably be images, which would mean using complex_input_net for processing, and I’m not sure I could handle the combination of trajectory_view_API and complex_input_net very well.

My original idea is added episode_timesteps to the program, let the agent know how many steps have been taken in an episode. such as [1712.00378] Time Limits in Reinforcement Learning
I noticed that SampleBatch contains “T”, but this should be the step of the whole experiment, not the step of each episode. I want to be able to implement episode_timesteps in an easy and efficient way in trajectory_view_API.
Can you give me some advice? Thank you

@robot-xyh,

Are you saying if you have 3 episodes of length [5,3,2] you want the timesteps to be.

a. [0,1,2,3,4,0,1,2,0,1]

or

b. [0,1,2,3,4,5,6,7,8,9]

If you included it in your observation I think you have two options.

The first would be to use a Dict space with two keys, one for the image and the other for the time. You could then have a custom model that uses a conv net for the image and a mlp for time.

The other option would be to add one more feature layer that is the timestep. So for example if your image shape is [32,32,3] you could expand it to [32,32,4] and set obs[:,:,3] = temp.

Thank you @mannyv
My intended purpose is
a. [0,1,2,3,4,0,1,2,0,1]

I will first try the dictionary observation space.Thank you

@robot-xyh,

I probably would have gone with the extra feature layer first but either could be a good option.

You do not need to use the trajectory view in your case. In your environment you can set self.t=0 in reset and self.t+=1 in step.

One last thing to consider if you have a fixed max time step is that you could scale your timestep value as t/t_max.

@mannyv
Thank you for your help. I will do as you suggest.