How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
In trajectory_view_API I want to add “infos” to the model input
Actions and rewards can be added to the model in trajectory_view_API, I now want to add “Infos” to the model, but I tried and failed. The infos data can be filled in during sampling, but errors will be reported during training
My main change was to add “infos” for the StatelessCartPole env, then added in trajectory_view_API
self.view_requirements["infos"] = ViewRequirement(
data_col="infos",
shift=0,
space = infos_space,
used_for_training=True,
used_for_compute_actions=True,
batch_repeat_value=1
)
and
T = torch.reshape(input_dict["infos"]["temp"], [-1, 1 * 1])
input_ = torch.cat([obs, actions, rewards, T], dim=-1)
Here is my script file.
from gym.spaces import Box
import numpy as np
from gym.envs.classic_control import CartPoleEnv
class StatelessCartPole(CartPoleEnv):
"""Partially observable variant of the CartPole gym environment.
https://github.com/openai/gym/blob/master/gym/envs/classic_control/
cartpole.py
We delete the x- and angular velocity components of the state, so that it
can only be solved by a memory enhanced model (policy).
"""
def __init__(self, config=None):
super().__init__()
# Fix our observation-space (remove 2 velocity components).
high = np.array(
[
self.x_threshold * 2,
self.theta_threshold_radians * 2,
],
dtype=np.float32,
)
self.observation_space = Box(low=-high, high=high, dtype=np.float32)
def step(self, action):
next_obs, reward, done, info = super().step(action)
###############################
info = {"temp":1}
###############################
# next_obs is [x-pos, x-veloc, angle, angle-veloc]
return np.array([next_obs[0], next_obs[2]]), reward, done, info
def reset(self):
init_obs = super().reset()
# init_obs is [x-pos, x-veloc, angle, angle-veloc]
return np.array([init_obs[0], init_obs[2]])
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.torch.misc import SlimFC
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.policy.view_requirement import ViewRequirement
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.tf_utils import one_hot
from ray.rllib.utils.torch_utils import one_hot as torch_one_hot
tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()
# __sphinx_doc_begin__
class TorchFrameStackingCartPoleModel(TorchModelV2, nn.Module):
"""A simple FC model that takes the last n observations as input."""
def __init__(
self, obs_space, action_space, num_outputs, model_config, name, num_frames=3
):
nn.Module.__init__(self)
super(TorchFrameStackingCartPoleModel, self).__init__(
obs_space, action_space, None, model_config, name
)
self.num_frames = num_frames
self.num_outputs = num_outputs
# Construct actual (very simple) FC model.
assert len(obs_space.shape) == 1
in_size = self.num_frames * (obs_space.shape[0] + action_space.n + 1)+1
self.layer1 = SlimFC(in_size=in_size, out_size=256, activation_fn="relu")
self.layer2 = SlimFC(in_size=256, out_size=256, activation_fn="relu")
self.out = SlimFC(
in_size=256, out_size=self.num_outputs, activation_fn="linear"
)
self.values = SlimFC(in_size=256, out_size=1, activation_fn="linear")
self._last_value = None
self.view_requirements["prev_n_obs"] = ViewRequirement(
data_col="obs", shift="-{}:0".format(num_frames - 1), space=obs_space
)
self.view_requirements["prev_n_rewards"] = ViewRequirement(
data_col="rewards", shift="-{}:-1".format(self.num_frames)
)
self.view_requirements["prev_n_actions"] = ViewRequirement(
data_col="actions",
shift="-{}:-1".format(self.num_frames),
space=self.action_space,
)
###########################################
from gym.spaces import Box, Dict, Discrete, Tuple
infos_space = Dict({"temp": Box(-1.0, 1.0, (1,))})
self.view_requirements["infos"] = ViewRequirement(
data_col="infos",
shift=0,
space = infos_space,
used_for_training=True,
used_for_compute_actions=True,
batch_repeat_value=1
)
###########################################
def forward(self, input_dict, states, seq_lens):
obs = input_dict["prev_n_obs"]
obs = torch.reshape(obs, [-1, self.obs_space.shape[0] * self.num_frames])
rewards = torch.reshape(input_dict["prev_n_rewards"], [-1, self.num_frames])
actions = torch_one_hot(input_dict["prev_n_actions"], self.action_space)
actions = torch.reshape(actions, [-1, self.num_frames * actions.shape[-1]])
print(type(input_dict["infos"]))
print(input_dict["infos"])
########################################
T = torch.reshape(input_dict["infos"]["temp"], [-1, 1 * 1])
input_ = torch.cat([obs, actions, rewards, T], dim=-1)
########################################
features = self.layer1(input_)
features = self.layer2(features)
out = self.out(features)
self._last_value = self.values(features)
return out, []
def value_function(self):
return torch.squeeze(self._last_value, -1)
import argparse
import numpy as np
import ray
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.utils.framework import try_import_tf
from ray.rllib.utils.test_utils import check_learning_achieved
from ray import tune
tf1, tf, tfv = try_import_tf()
parser = argparse.ArgumentParser()
parser.add_argument(
"--run", type=str, default="PPO", help="The RLlib-registered algorithm to use."
)
parser.add_argument(
"--framework",
choices=["tf", "tf2", "tfe", "torch"],
default="torch",
help="The DL framework specifier.",
)
parser.add_argument(
"--as-test",
action="store_true",
help="Whether this script should be run as a test: --stop-reward must "
"be achieved within --stop-timesteps AND --stop-iters.",
)
parser.add_argument(
"--stop-iters", type=int, default=50, help="Number of iterations to train."
)
parser.add_argument(
"--stop-timesteps", type=int, default=200000, help="Number of timesteps to train."
)
parser.add_argument(
"--stop-reward", type=float, default=150.0, help="Reward at which we stop training."
)
if __name__ == "__main__":
args = parser.parse_args()
ray.init(num_cpus=3)
num_frames = 16
ModelCatalog.register_custom_model(
"frame_stack_model",
FrameStackingCartPoleModel
if args.framework != "torch"
else TorchFrameStackingCartPoleModel,
)
config = {
"env": StatelessCartPole,
"model": {
"vf_share_layers": True,
"custom_model": "frame_stack_model",
"custom_model_config": {
"num_frames": num_frames,
},
# To compare against a simple LSTM:
# "use_lstm": True,
# "lstm_use_prev_action": True,
# "lstm_use_prev_reward": True,
# To compare against a simple attention net:
# "use_attention": True,
# "attention_use_n_prev_actions": 1,
# "attention_use_n_prev_rewards": 1,
},
#"num_sgd_iter": 5,
#"vf_loss_coeff": 0.0001,
"framework": args.framework,
}
stop = {
"training_iteration": args.stop_iters,
"timesteps_total": args.stop_timesteps,
"episode_reward_mean": args.stop_reward,
}
results = tune.run(
args.run, config=config, stop=stop, verbose=2, checkpoint_at_end=True
)
if args.as_test:
check_learning_achieved(results, args.stop_reward)
checkpoints = results.get_trial_checkpoints_paths(
trial=results.get_best_trial("episode_reward_mean", mode="max"),
metric="episode_reward_mean",
)
checkpoint_path = checkpoints[0][0]
trainer = PPOTrainer(config)
trainer.restore(checkpoint_path)
# Inference loop.
env = StatelessCartPole()
# Run manual inference loop for n episodes.
for _ in range(10):
episode_reward = 0.0
reward = 0.0
action = 0
done = False
obs = env.reset()
while not done:
# Create a dummy action using the same observation n times,
# as well as dummy prev-n-actions and prev-n-rewards.
action, state, logits = trainer.compute_single_action(
input_dict={
"obs": obs,
"prev_n_obs": np.stack([obs for _ in range(num_frames)]),
"prev_n_actions": np.stack([0 for _ in range(num_frames)]),
"prev_n_rewards": np.stack([1.0 for _ in range(num_frames)]),
},
full_fetch=True,
)
obs, reward, done, info = env.step(action)
episode_reward += reward
print(f"Episode reward={episode_reward}")
ray.shutdown()
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\tune\ray_trial_executor.py", line 901, in get_next_executor_event
future_result = ray.get(ready_future)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\worker.py", line 1809, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(IndexError): e[36mray::PPOTrainer.train()e[39m (pid=11832, ip=127.0.0.1, repr=PPOTrainer)
File "python\ray\_raylet.pyx", line 663, in ray._raylet.execute_task
File "python\ray\_raylet.pyx", line 667, in ray._raylet.execute_task
File "python\ray\_raylet.pyx", line 614, in ray._raylet.execute_task.function_executor
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\function_manager.py", line 701, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
return method(self, *_args, **_kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\tune\trainable.py", line 349, in train
result = self.step()
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
return method(self, *_args, **_kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 1088, in step
raise e
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 1074, in step
step_attempt_results = self.step_attempt()
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
return method(self, *_args, **_kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 1155, in step_attempt
step_results = self._exec_plan_or_training_iteration_fn()
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
return method(self, *_args, **_kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\agents\trainer.py", line 2174, in _exec_plan_or_training_iteration_fn
results = next(self.train_exec_impl)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 779, in __next__
return next(self.built_iterator)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 869, in apply_filter
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 869, in apply_filter
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
[Previous line repeated 1 more time]
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 904, in apply_flatten
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 807, in apply_foreach
for item in it:
[Previous line repeated 1 more time]
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 492, in base_iterator
yield ray.get(futures, timeout=timeout)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\worker.py", line 1809, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(IndexError): e[36mray::RolloutWorker.par_iter_next()e[39m (pid=7008, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x0000020C93270C40>)
File "python\ray\_raylet.pyx", line 663, in ray._raylet.execute_task
File "python\ray\_raylet.pyx", line 667, in ray._raylet.execute_task
File "python\ray\_raylet.pyx", line 614, in ray._raylet.execute_task.function_executor
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\_private\function_manager.py", line 701, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
return method(self, *_args, **_kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\iter.py", line 1186, in par_iter_next
return next(self.local_it)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 404, in gen_rollouts
yield self.sample()
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\util\tracing\tracing_helper.py", line 462, in _resume_span
return method(self, *_args, **_kwargs)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 815, in sample
batches = [self.input_reader.next()]
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 116, in next
batches = [self.get_data()]
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 289, in get_data
item = next(self._env_runner)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 702, in _env_runner
eval_results = _do_policy_eval(
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\evaluation\sampler.py", line 1162, in _do_policy_eval
eval_results[policy_id] = policy.compute_actions_from_input_dict(
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\policy\torch_policy.py", line 328, in compute_actions_from_input_dict
return self._compute_action_helper(
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\utils\threading.py", line 21, in wrapper
return func(self, *a, **k)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\policy\torch_policy.py", line 986, in _compute_action_helper
dist_inputs, state_out = self.model(input_dict, state_batches, seq_lens)
File "C:\ProgramData\Anaconda3\lib\site-packages\ray\rllib\models\modelv2.py", line 251, in __call__
res = self.forward(restored, state or [], seq_lens)
File "C:\Users\XXXX\Downloads\test.py", line 117, in forward
T = torch.reshape(input_dict["infos"]["temp"], [-1, 1 * 1])
IndexError: only integers, slices (`:`), ellipsis (`...`), numpy.newaxis (`None`) and integer or boolean arrays are valid indices