Model doesn't recognize ObservationWrapper and keeps using orig_observation

According to the doc about preprocessing, Ray depreciates customized preprocessor but recommends using ObservationWrapper instead. However, when trying to run the customized model example script. It shows that processed observations were discard and it keeps using the orig_observation which was before ObservationWrapper.

The ObservationWrapper I used which essentially pad one of the observations to fixed Len of 15:

 import numpy as np
 class PaddedEnv(gym.ObservationWrapper):
     def __init__(self, env):
         super().__init__(env)
     # Override `observation` to custom process the original observation
     # coming from the env.
     def observation(self, observation):
         # E.g. padding to fixed len
         pad_len = 15 - len(observation['obs1'])
         observation['obs1'] = np.pad(observation['obs1'],(0,pad_len))
         observation = {
             'observations': np.concatenate([i for i in observation.values()]),\
             'action_mask': observation['obs1'] > 0
             }
         return observation
 
 env = PaddedEnv(gym.make('myenv'))

However, it will fail because input_dict doesn’t have anything of the processed observations from PaddedEnv (neither observations nor action_mask). It still has obs1 though.

     res = self.forward(restored, state or [], seq_lens)
   File "play_ray.py", line 53, in forward
     action_mask = input_dict["obs"]["action_mask"]
 KeyError: 'action_mask'

Tried dig into input_dict but none of them have what PaddedEnv processed:

input_dict
(RolloutWorker pid=97305) SampleBatch(32: ['obs', 'new_obs', 'actions', 'prev_actions', 'rewards', 'prev_rewards', 'dones', 'infos', 't', 'eps_id', 'unroll_id', 'agent_index', 'obs_flat'])

So is applying customized processor the only way to address that?

Hi @zhh210,

Do you have a reproduction script and full error print out you can share?

Hi @mannyv , I’ve created a dummy customized env just to show the behavior:

from gym.spaces import Dict

from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.torch_utils import FLOAT_MIN

torch, nn = try_import_torch()
### Customized Policy NN model
class TorchActionMaskModel(TorchModelV2, nn.Module):
    """PyTorch version of above ActionMaskingModel."""
    def __init__(
        self,
        obs_space,
        action_space,
        num_outputs,
        model_config,
        name,
        **kwargs,
    ):
        print('original_space: ',obs_space)
        orig_space = getattr(obs_space, "original_space", obs_space)
        # assert (
        #     isinstance(orig_space, Dict)
        #     and "action_mask" in orig_space.spaces
        #     and "observations" in orig_space.spaces
        # )

        TorchModelV2.__init__(
            self, obs_space, action_space, num_outputs, model_config, name, **kwargs
        )
        nn.Module.__init__(self)
        # print(orig_space)
        self.internal_model = TorchFC(
            obs_space,
            # orig_space["observations"],
            action_space,
            num_outputs,
            model_config,
            name + "_internal",
        )

        # disable action masking --> will likely lead to invalid actions
        self.no_masking = False
        if "no_masking" in model_config["custom_model_config"]:
            self.no_masking = model_config["custom_model_config"]["no_masking"]

    def forward(self, input_dict, state, seq_lens):
        # Extract the available actions tensor from the observation.
        # print('input_dict', input_dict)
        # import pdb; pdb.set_trace()
        action_mask = input_dict["obs"]["action_mask"]
        
        # Compute the unmasked logits.
        logits, _ = self.internal_model({"obs": input_dict["obs"]})

        # If action masking is disabled, directly return unmasked logits
        if self.no_masking:
            return logits, state

        # Convert action_mask into a [0.0 || -inf]-type mask.
        inf_mask = torch.clamp(torch.log(action_mask), min=FLOAT_MIN)
        masked_logits = logits + inf_mask

        # Return masked logits.
        return masked_logits, state

    def value_function(self):
        return self.internal_model.value_function()

from ray.rllib.models import ModelCatalog
ModelCatalog.register_custom_model("my_torch_model", TorchActionMaskModel)

import gym_roku_session
from ray.rllib.algorithms.ppo import PPO
from ray import tune
import gym
import numpy as np
from gym import Env, spaces
class DummyEnv(Env):
    def __init__(self,*arg,**kwargs):
        super(DummyEnv).__init__(*arg, **kwargs)
        self.obs1 = None
        self.reset()
        
    def reset(self):
        self._get_obs()
        self.observation_space = spaces.Dict({
            'obs1': spaces.Box(low = -1, high = 100, dtype = np.float16, shape = (10,)),\
            'obs2': spaces.Box(low = -1, high = 100, dtype = np.float16, shape = (1,)),\
            'obs3': spaces.Box(low = -1, high = 100, dtype = np.float16, shape = (1,)),\
        })
    @property
    def action_space(self):
        # Action space size is random
        self.obs1 = self.obs1 if self.obs1 is not None else self._get_obs()['obs1']
        return spaces.Discrete(len(self.obs1))

    def _get_obs(self):
        import random
        self.obs1 = np.random.rand(random.randint(1,10))
        return {
            'obs1': self.obs1,
            'obs2': [np.random.rand()],
            'obs3': [np.random.rand()]
        }

    def step(self, action):
        # Generate random values with random len
        
        return self._get_obs(), 1, np.random.rand() < .1, {}

import numpy as np
class PaddedEnv(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
    # Override `observation` to custom process the original observation
    # coming from the env.
    def observation(self, observation):
        # E.g. padding to fixed len
        pad_len = 15 - len(observation['obs1'])
        observation['obs1'] = np.pad(observation['obs1'],(0,pad_len))
        observation = {
            'observations': np.concatenate([i for i in observation.values()]),\
            'action_mask': observation['obs1'] > 0
            }
        return observation
 
env = PaddedEnv(DummyEnv())

tune.register_env('dummyEnv', lambda config: env)


# Configure the algorithm.
config = {
    # Environment (RLlib understands openAI gym registered strings).
    "env": "dummyEnv",
    # Use 2 environment workers (aka "rollout workers") that parallelly
    # collect samples from their own environment clone(s).
    "num_workers": 1,
    # Change this to "framework: torch", if you are using PyTorch.
    # Also, use "framework: tf2" for tf2.x eager execution.
    "framework": "torch",
    # Tweak the default model provided automatically by RLlib,
    # given the environment's observation- and action spaces.
    # "model": {
    #     "fcnet_hiddens": [64, 64],
    #     "fcnet_activation": "relu",
    #     "framestack": True
    # },
    "model": {
        "custom_model": "my_torch_model",
        # Extra kwargs to be passed to your model's c'tor.
        # "custom_model_config": {'no_masking':False},
    },
    # Set up a separate evaluation worker set for the
    # `algo.evaluate()` call after training (see below).
    # "evaluation_num_workers": 1,
    # 'evaluation_interval':1,
    # Only for evaluation runs, render the env.
    "evaluation_config": {
        "render_env": False,
    },
    "disable_env_checking": True,
    #"recreate_failed_workers":True
    "log_level": 'WARN',
}


tune.run(
    run_or_experiment="PPO",
    config = config,
    stop = {"training_iteration": 200},
    num_samples = 1,
    checkpoint_at_end=True,\
    checkpoint_freq=1,
)

Note that I’ve made minor revision on TorchActionMaskModel initialization otherwise it will fail on assert and TorchFC. Running the script will get error of

2022-10-06 20:54:24,115	INFO worker.py:1518 -- Started a local Ray instance.
(PPO pid=51191) 2022-10-06 20:54:34,493	INFO ppo.py:378 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn't work for you.
(PPO pid=51191) 2022-10-06 20:54:34,495	INFO algorithm.py:351 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
(RolloutWorker pid=51255) original_space:  Box([-1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1. -1.], [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.], (12,), float32)
(RolloutWorker pid=51255) 2022-10-06 20:54:42,557	ERROR worker.py:756 -- Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.__init__() (pid=51255, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7fcf388b17c0>)
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 613, in __init__
(RolloutWorker pid=51255)     self._build_policy_map(
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1784, in _build_policy_map
(RolloutWorker pid=51255)     self.policy_map.create_policy(
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy_map.py", line 123, in create_policy
(RolloutWorker pid=51255)     self[policy_id] = create_policy_for_framework(
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/policy.py", line 80, in create_policy_for_framework
(RolloutWorker pid=51255)     return policy_class(observation_space, action_space, merged_config)
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/ppo/ppo_torch_policy.py", line 66, in __init__
(RolloutWorker pid=51255)     self._initialize_loss_from_dummy_batch()
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy.py", line 1050, in _initialize_loss_from_dummy_batch
(RolloutWorker pid=51255)     actions, state_outs, extra_outs = self.compute_actions_from_input_dict(
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 483, in compute_actions_from_input_dict
(RolloutWorker pid=51255)     return self._compute_action_helper(
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/threading.py", line 24, in wrapper
(RolloutWorker pid=51255)     return func(self, *a, **k)
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 1016, in _compute_action_helper
(RolloutWorker pid=51255)     dist_inputs, state_out = self.model(input_dict, state_batches, seq_lens)
(RolloutWorker pid=51255)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/models/modelv2.py", line 259, in __call__
(RolloutWorker pid=51255)     res = self.forward(restored, state or [], seq_lens)
(RolloutWorker pid=51255)   File "tmp.py", line 53, in forward
(RolloutWorker pid=51255)     action_mask = input_dict["obs"]["action_mask"]
(RolloutWorker pid=51255) KeyError: 'action_mask'
== Status ==
Current time: 2022-10-06 20:54:42 (running for 00:00:16.66)
Memory usage on this node: 10.3/16.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 2.0/12 CPUs, 0/0 GPUs, 0.0/4.7 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/zhan/ray_results/PPO
Number of trials: 1/1 (1 RUNNING)
+--------------------------+----------+-------+
| Trial name               | status   | loc   |
|--------------------------+----------+-------|
| PPO_dummyEnv_971fc_00000 | RUNNING  |       |
+--------------------------+----------+-------+


2022-10-06 20:54:42,593	ERROR trial_runner.py:980 -- Trial PPO_dummyEnv_971fc_00000: Error processing event.
ray.tune.error._TuneNoNextExecutorEventError: Traceback (most recent call last):
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/tune/execution/ray_trial_executor.py", line 989, in get_next_executor_event
    future_result = ray.get(ready_future)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/_private/worker.py", line 2277, in get
    raise value
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::PPO.__init__() (pid=51191, ip=127.0.0.1, repr=PPO)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 125, in __init__
    self.add_workers(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 269, in add_workers
    self.foreach_worker(lambda w: w.assert_healthy())
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 391, in foreach_worker
    remote_results = ray.get([w.apply.remote(func) for w in self.remote_workers()])
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::RolloutWorker.__init__() (pid=51255, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7fcf388b17c0>)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 613, in __init__
    self._build_policy_map(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1784, in _build_policy_map
    self.policy_map.create_policy(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy_map.py", line 123, in create_policy
    self[policy_id] = create_policy_for_framework(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/policy.py", line 80, in create_policy_for_framework
    return policy_class(observation_space, action_space, merged_config)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/ppo/ppo_torch_policy.py", line 66, in __init__
    self._initialize_loss_from_dummy_batch()
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy.py", line 1050, in _initialize_loss_from_dummy_batch
    actions, state_outs, extra_outs = self.compute_actions_from_input_dict(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 483, in compute_actions_from_input_dict
    return self._compute_action_helper(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/threading.py", line 24, in wrapper
    return func(self, *a, **k)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 1016, in _compute_action_helper
    dist_inputs, state_out = self.model(input_dict, state_batches, seq_lens)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/models/modelv2.py", line 259, in __call__
    res = self.forward(restored, state or [], seq_lens)
  File "tmp.py", line 53, in forward
    action_mask = input_dict["obs"]["action_mask"]
KeyError: 'action_mask'

During handling of the above exception, another exception occurred:

ray::PPO.__init__() (pid=51191, ip=127.0.0.1, repr=PPO)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm.py", line 308, in __init__
    super().__init__(config=config, logger_creator=logger_creator, **kwargs)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/tune/trainable/trainable.py", line 157, in __init__
    self.setup(copy.deepcopy(self.config))
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm.py", line 443, in setup
    raise e.args[0].args[2]
KeyError: 'action_mask'

Result for PPO_dummyEnv_971fc_00000:
  trial_id: 971fc_00000
  
== Status ==
Current time: 2022-10-06 20:54:42 (running for 00:00:16.69)
Memory usage on this node: 10.3/16.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/12 CPUs, 0/0 GPUs, 0.0/4.7 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/zhan/ray_results/PPO
Number of trials: 1/1 (1 ERROR)
+--------------------------+----------+-------+
| Trial name               | status   | loc   |
|--------------------------+----------+-------|
| PPO_dummyEnv_971fc_00000 | ERROR    |       |
+--------------------------+----------+-------+
Number of errored trials: 1
+--------------------------+--------------+--------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                           |
|--------------------------+--------------+--------------------------------------------------------------------------------------|
| PPO_dummyEnv_971fc_00000 |            1 | /Users/zhan/ray_results/PPO/PPO_dummyEnv_971fc_00000_0_2022-10-06_20-54-26/error.txt |
+--------------------------+--------------+--------------------------------------------------------------------------------------+

== Status ==
Current time: 2022-10-06 20:54:42 (running for 00:00:16.69)
Memory usage on this node: 10.3/16.0 GiB
Using FIFO scheduling algorithm.
Resources requested: 0/12 CPUs, 0/0 GPUs, 0.0/4.7 GiB heap, 0.0/2.0 GiB objects
Result logdir: /Users/zhan/ray_results/PPO
Number of trials: 1/1 (1 ERROR)
+--------------------------+----------+-------+
| Trial name               | status   | loc   |
|--------------------------+----------+-------|
| PPO_dummyEnv_971fc_00000 | ERROR    |       |
+--------------------------+----------+-------+
Number of errored trials: 1
+--------------------------+--------------+--------------------------------------------------------------------------------------+
| Trial name               |   # failures | error file                                                                           |
|--------------------------+--------------+--------------------------------------------------------------------------------------|
| PPO_dummyEnv_971fc_00000 |            1 | /Users/zhan/ray_results/PPO/PPO_dummyEnv_971fc_00000_0_2022-10-06_20-54-26/error.txt |
+--------------------------+--------------+--------------------------------------------------------------------------------------+

2022-10-06 20:54:42,617	ERROR ray_trial_executor.py:103 -- An exception occurred when trying to stop the Ray actor:Traceback (most recent call last):
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/tune/execution/ray_trial_executor.py", line 94, in _post_stop_cleanup
    ray.get(future, timeout=0)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/_private/worker.py", line 2277, in get
    raise value
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::PPO.__init__() (pid=51191, ip=127.0.0.1, repr=PPO)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 125, in __init__
    self.add_workers(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 269, in add_workers
    self.foreach_worker(lambda w: w.assert_healthy())
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 391, in foreach_worker
    remote_results = ray.get([w.apply.remote(func) for w in self.remote_workers()])
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::RolloutWorker.__init__() (pid=51255, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7fcf388b17c0>)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 613, in __init__
    self._build_policy_map(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1784, in _build_policy_map
    self.policy_map.create_policy(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy_map.py", line 123, in create_policy
    self[policy_id] = create_policy_for_framework(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/policy.py", line 80, in create_policy_for_framework
    return policy_class(observation_space, action_space, merged_config)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/ppo/ppo_torch_policy.py", line 66, in __init__
    self._initialize_loss_from_dummy_batch()
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy.py", line 1050, in _initialize_loss_from_dummy_batch
    actions, state_outs, extra_outs = self.compute_actions_from_input_dict(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 483, in compute_actions_from_input_dict
    return self._compute_action_helper(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/threading.py", line 24, in wrapper
    return func(self, *a, **k)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 1016, in _compute_action_helper
    dist_inputs, state_out = self.model(input_dict, state_batches, seq_lens)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/models/modelv2.py", line 259, in __call__
    res = self.forward(restored, state or [], seq_lens)
  File "tmp.py", line 53, in forward
    action_mask = input_dict["obs"]["action_mask"]
KeyError: 'action_mask'

During handling of the above exception, another exception occurred:

ray::PPO.__init__() (pid=51191, ip=127.0.0.1, repr=PPO)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm.py", line 308, in __init__
    super().__init__(config=config, logger_creator=logger_creator, **kwargs)
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/tune/trainable/trainable.py", line 157, in __init__
    self.setup(copy.deepcopy(self.config))
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm.py", line 443, in setup
    raise e.args[0].args[2]
KeyError: 'action_mask'

(PPO pid=51191) 2022-10-06 20:54:42,575	ERROR worker.py:756 -- Exception raised in creation task: The actor died because of an error raised in its creation task, ray::PPO.__init__() (pid=51191, ip=127.0.0.1, repr=PPO)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 125, in __init__
(PPO pid=51191)     self.add_workers(
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 269, in add_workers
(PPO pid=51191)     self.foreach_worker(lambda w: w.assert_healthy())
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/worker_set.py", line 391, in foreach_worker
(PPO pid=51191)     remote_results = ray.get([w.apply.remote(func) for w in self.remote_workers()])
(PPO pid=51191) ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::RolloutWorker.__init__() (pid=51255, ip=127.0.0.1, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x7fcf388b17c0>)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 613, in __init__
(PPO pid=51191)     self._build_policy_map(
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1784, in _build_policy_map
(PPO pid=51191)     self.policy_map.create_policy(
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy_map.py", line 123, in create_policy
(PPO pid=51191)     self[policy_id] = create_policy_for_framework(
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/policy.py", line 80, in create_policy_for_framework
(PPO pid=51191)     return policy_class(observation_space, action_space, merged_config)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/ppo/ppo_torch_policy.py", line 66, in __init__
(PPO pid=51191)     self._initialize_loss_from_dummy_batch()
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/policy.py", line 1050, in _initialize_loss_from_dummy_batch
(PPO pid=51191)     actions, state_outs, extra_outs = self.compute_actions_from_input_dict(
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 483, in compute_actions_from_input_dict
(PPO pid=51191)     return self._compute_action_helper(
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/utils/threading.py", line 24, in wrapper
(PPO pid=51191)     return func(self, *a, **k)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/policy/torch_policy_v2.py", line 1016, in _compute_action_helper
(PPO pid=51191)     dist_inputs, state_out = self.model(input_dict, state_batches, seq_lens)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/models/modelv2.py", line 259, in __call__
(PPO pid=51191)     res = self.forward(restored, state or [], seq_lens)
(PPO pid=51191)   File "tmp.py", line 53, in forward
(PPO pid=51191)     action_mask = input_dict["obs"]["action_mask"]
(PPO pid=51191) KeyError: 'action_mask'
(PPO pid=51191) 
(PPO pid=51191) During handling of the above exception, another exception occurred:
(PPO pid=51191) 
(PPO pid=51191) ray::PPO.__init__() (pid=51191, ip=127.0.0.1, repr=PPO)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm.py", line 308, in __init__
(PPO pid=51191)     super().__init__(config=config, logger_creator=logger_creator, **kwargs)
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/tune/trainable/trainable.py", line 157, in __init__
(PPO pid=51191)     self.setup(copy.deepcopy(self.config))
(PPO pid=51191)   File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/rllib/algorithms/algorithm.py", line 443, in setup
(PPO pid=51191)     raise e.args[0].args[2]
(PPO pid=51191) KeyError: 'action_mask'
Traceback (most recent call last):
  File "tmp.py", line 171, in <module>
    tune.run(
  File "/Users/zhan/Applications/anaconda3/envs/python38/lib/python3.8/site-packages/ray/tune/tune.py", line 752, in run
    raise TuneError("Trials did not complete", incomplete_trials)
ray.tune.error.TuneError: ('Trials did not complete', [PPO_dummyEnv_971fc_00000])

My experiment is running with:

python==3.8.13
gym==0.21.0
ray==2.0.0

Hi @zhh210,

There were a few issues with your script.

  • Your reset function in DummyEnv does not return a value.
  • PaddedEnv redefines the observation space but you did not update self.observation_space to reflect that.
  • You model is creating a FC layer and passing as input both the observation and the action mask
  • The sizes of your action_space and the action_mask do not match. The action_mask is the size of the underlying obs1 but the size of your action_mask is based on the padded obs1.
  • Note: RLLIB does not support the observation or action spaces changing during training.

Here are changes that correct these issues:

class DummyEnv(Env):
    def __init__(self,*arg,**kwargs):
        super(DummyEnv).__init__(*arg, **kwargs)
        self.obs1 = None
        self.reset()

        self.observation_space = spaces.Dict({
            'obs1': spaces.Box(low = -1, high = 100, dtype = np.float16, shape = (10,)), \
            'obs2': spaces.Box(low = -1, high = 100, dtype = np.float16, shape = (1,)), \
            'obs3': spaces.Box(low = -1, high = 100, dtype = np.float16, shape = (1,)), \
            })


    def reset(self):
        return self._get_obs()

    @property
    def action_space(self):
        # Action space size is random
        self.obs1 = self.obs1 if self.obs1 is not None else self._get_obs()['obs1']
        return spaces.Discrete(len(self.obs1))

    def _get_obs(self):
        import random
        self.obs1 = np.random.rand(random.randint(1,10))
        return {
            'obs1': self.obs1,
            'obs2': [np.random.rand()],
            'obs3': [np.random.rand()]
        }

    def step(self, action):
        # Generate random values with random len
        return self._get_obs(), 1, np.random.rand() < .1, {}
class PaddedEnv(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        obs = self.reset()
        self.observation_space = Dict({"observations": spaces.Box(low = -1, high = 100, dtype = np.float16, shape=obs["observations"].shape,),
                                       "action_mask": spaces.Box(low=0, high=1, dtype=np.float16, shape=obs["action_mask"].shape)})

    @property
    def action_space(self):
        # Action space size is random
        self.obs1 = self.obs1 if self.obs1 is not None else self._get_obs()['obs1']
        return spaces.Discrete(len(self.obs1))

    # Override `observation` to custom process the original observation
    # coming from the env.
    def observation(self, observation):
        # E.g. padding to fixed len
        pad_len = 15 - len(observation['obs1'])
        observation['obs1'] = np.pad(observation['obs1'],(0,pad_len))
        self.obs1 = observation["obs1"]
        observation = {
            'observations': np.concatenate([i for i in observation.values()]), \
            'action_mask': observation['obs1'] > 0
        }
        return observation

#env = PaddedEnv(DummyEnv())

tune.register_env('dummyEnv', lambda config: PaddedEnv(DummyEnv()))
class TorchActionMaskModel(TorchModelV2, nn.Module):
    """PyTorch version of above ActionMaskingModel."""
    def __init__(
        self,
        obs_space,
        action_space,
        num_outputs,
        model_config,
        name,
        **kwargs,
    ):
        print('original_space: ',obs_space)
        orig_space = getattr(obs_space, "original_space", obs_space)
        # assert (
        #     isinstance(orig_space, Dict)
        #     and "action_mask" in orig_space.spaces
        #     and "observations" in orig_space.spaces
        # )

        TorchModelV2.__init__(
            self, obs_space, action_space, num_outputs, model_config, name, **kwargs
        )
        nn.Module.__init__(self)
        # print(orig_space)
        self.internal_model = TorchFC(
            obs_space.original_space.spaces["observations"],
            # orig_space["observations"],
            action_space,
            num_outputs,
            model_config,
            name + "_internal",
            )

        # disable action masking --> will likely lead to invalid actions
        self.no_masking = False
        if "no_masking" in model_config["custom_model_config"]:
            self.no_masking = model_config["custom_model_config"]["no_masking"]

    def forward(self, input_dict, state, seq_lens):
        # Extract the available actions tensor from the observation.
        # print('input_dict', input_dict)
        # import pdb; pdb.set_trace()
        action_mask = input_dict["obs"]["action_mask"]

        # Compute the unmasked logits.
        logits, _ = self.internal_model({"obs": input_dict["obs"]["observations"]})

        # If action masking is disabled, directly return unmasked logits
        if self.no_masking:
            return logits, state

        # Convert action_mask into a [0.0 || -inf]-type mask.
        inf_mask = torch.clamp(torch.log(action_mask), min=FLOAT_MIN)
        masked_logits = logits + inf_mask

        # Return masked logits.
        return masked_logits, state

    def value_function(self):
        return self.internal_model.value_function()

1 Like

I didn’t realized observation_space has to be redefined and thought observation() will override it and just use the processed observation. That really worked with the change, thanks a lot!