Observation space with multiple input

Hi all,

I am working on a Multi-Agent task and I would like to use images and also some other information like the location of the agent as the observation space.

It seems that in Gym we can do it as it is explained here:

However, for me, it is not clear how to do it in RLlib.

I can imagine that I will need to define a custom model. The net architecture should have two starting sub-nets, a CNN for images and an MLP for vector-based inputs. Then I need to fuse the two sub-nets after some layers in order to have only one head for action selection.

But still, I really don’t know how to do feed the net with the observation space. Anyone has a code to show me how can I implement it?

Thanks!

Hi @deepgravity,

You need to define your model (1) and your special environment (2)

(1) You can implement your own custom model that implements the ModelV2 API. Either subclass TFModelV2 oder TorchModelV2. All the RLlib predefined models do this so you have some examples there.

In your model, you implement your own forward() method. This forward method gets called with an input_dict. Print this input_dict to see what you get as an observation and find out what parts of it you would like to use.

(2) Your environment can be defined with the gym API and needs to be registered or you simply pass the env class to tune under the config parameter env.

I think the code that you find in RLlibs examples directory and the links to the code I posted should suffice to get you going.
If you struggle, post your code and I can have a look at it if you like :slight_smile:

Cheers

Just to add to @arturn 's answer, you can define complex obs/act spaces containing multiple simpler spaces. For example, if you use this space for your obs space:

spaces.Tuple(
    spaces.Box(0, 1, [64, 64, 3]),  # Vision observation
    spaces.Discrete(5)              # Some other discrete obs
)

Will give you a tuple containing batched (visual_obs, other_obs) in you input_dict['obs'].

1 Like

Hi @arturn and @Aceticia ,
Thank you very much for your helpful guidance.

So, I defined my observation_space as follows:

observation_space = gym.spaces.Tuple((observation_space_cnn , observation_space_fc))

Then, in the reset method of my custom env, I reset my observation:

observations = {initial_agent_id: 
                       (observations_cnn[initial_agent_id], 
                        observations_fc[initial_agent_id])}

Since I have a multi-agent env, my observation is a dictionary. In each env.step, I only use a single agent. So, I only passed the initial agent id here.

I update the observation in each step in the same way:

observations = {next_agent_id:
                     (observations_cnn[next_agent_id],
                      observations_fc[next_agent_id])}

And this is my simple custom model:

import torch
from torch import nn

from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.models.torch.visionnet import VisionNetwork as TorchVis

# %%
class CnnFcModel(TorchModelV2, nn.Module):
    """Example of a PyTorch custom model that just delegates to a fc-net."""

    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        self.torch_sub_vis_model = TorchVis(obs_space[0], action_space, num_outputs,
                                        model_config, name)
        
        self.torch_sub_fc_model = TorchFC(obs_space[1], action_space, num_outputs,
                                       model_config, name)

    def forward(self, input_dict, state, seq_lens):
        input_dict["obs"] = input_dict["obs"].float()
        fc_out, _ = self.torch_sub_fc_model(input_dict, state, seq_lens)
        return fc_out, []

    def value_function(self):
        return torch.reshape(self.torch_sub_model.value_function(), [-1])

Since obs_space is a gym.spaces.Tuple, I call obs_space[0] and [1] for cnn and fc, respectivelly.
But, I get the following error:

RayActorError: The actor died because of an error raised in its creation task, ray::RolloutWorker.__init__() (pid=52695, ip=192.168.28.146)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 573, in __init__
    self._build_policy_map(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1372, in _build_policy_map
    self.policy_map.create_policy(name, orig_cls, obs_space, act_space,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/policy_map.py", line 136, in create_policy
    self[policy_id] = class_(observation_space, action_space,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/policy_template.py", line 236, in __init__
    self.model, dist_class = make_model_and_action_dist(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/dqn/dqn_torch_policy.py", line 163, in build_q_model_and_distribution
    model = ModelCatalog.get_model_v2(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/models/catalog.py", line 541, in get_model_v2
    raise e
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/models/catalog.py", line 526, in get_model_v2
    instance = model_cls(obs_space, action_space, num_outputs,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/dqn/dqn_torch_model.py", line 61, in __init__
    super(DQNTorchModel, self).__init__(obs_space, action_space,
  File "/home/data/dbt/housing_design/agents_floorplan/my_model.py", line 27, in __init__
    self.torch_sub_vis_model = TorchVis(obs_space[0], action_space, num_outputs,
TypeError: 'Box' object is not subscriptable

Which says, gym Box is not subscribable.

I run the code in the debug mode, but when the python interpreter reaches out to the related line. it says that obs_space is not defined. So, I cannot see what is the issue.

Did I define the custom model correctly (except the forward method as I did not work on it yet, because python does not reach out to this line)?

Do I need to change something in the config when I have this combined observation space?

P.S: Running a simple random agent in the new env with the combined observation space works.

The bug is not obvious, but a few things look suspicious:

  1. Can you check where you define the policy? Check where the definition of your model’s obs_space is coming from. If you had a previous definition in, say, PolicySpec, it will override the space given by the environment.
  2. The fact that your interpreter says obs_space undefined is weird too, maybe this could be a hint as to why it’s not a tuple?

Thanks @Aceticia for your reply.

This is how I defined my policy:

policies = {"policy_{}".format(i+1): (None, env.observation_space, env.action_space, {})
                        for i in range(num_policies)}

And seems it is correct. When the interpreter reaches it, my policy is:


It recognizes that I have both image and discrete observations.

Do I need to change something in the trainer config?

Hi all,

I provided a simple runnable code base on this to test multiple observation space.

import argparse
import gym
from gym.spaces import Discrete, Box
import numpy as np
import os
import random

import ray
from ray import tune
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.models.torch.visionnet import VisionNetwork as TorchVis
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.tune.logger import pretty_print

torch, nn = try_import_torch()

class SimpleCorridor(gym.Env):
    def __init__(self, config: EnvContext):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        
        self.observation_space = Box(0.0, 
                                        self.end_pos, 
                                        shape=(1, ), 
                                        dtype=np.float32)
        
        self.observation_space_cnn = gym.spaces.Box(low=0, 
                                                     high=255,
                                                     shape=(42, 42, 3),
                                                     dtype=np.uint8)
        
        self.observation_space = gym.spaces.Tuple((self.observation_space, 
                                                    self.observation_space_cnn))
        
        self.seed(config.worker_index * config.num_workers)

    def reset(self):
        self.cur_pos = 0
        self.obs_arr_conv = np.zeros((42, 42, 3))
        observations = ([self.cur_pos], self.obs_arr_conv)
        return observations
        # return [self.cur_pos]

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        # return [self.cur_pos], random.random() * 2 if done else -0.1, done, {}
        return ([self.cur_pos], self.obs_arr_conv), random.random() * 2 if done else -0.1, done, {}

    def seed(self, seed=None):
        random.seed(seed)


class TorchCustomModel(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        # self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
        #                                model_config, name)

        self.torch_sub_vis_model = TorchVis(obs_space[0], action_space, num_outputs,
                                        model_config, name)
        
        self.torch_sub_fc_model = TorchFC(obs_space[1], action_space, num_outputs,
                                        model_config, name)
        
        
    def forward(self, input_dict, state, seq_lens):
        input_dict["obs"] = input_dict["obs"].float()
        fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
        return fc_out, []

    def value_function(self):
        return torch.reshape(self.torch_sub_model.value_function(), [-1])


if __name__ == "__main__":
    ray.init(local_mode=True)

    ModelCatalog.register_custom_model("my_model", TorchCustomModel)

    config = {
        "env": SimpleCorridor,  # or "corridor" if registered above
        "env_config": {
            "corridor_length": 5,
        },
        "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        "model": {
            "custom_model": "my_model",
            "vf_share_layers": True,
        },
        "num_workers": 1,  # parallelism
        "framework": 'torch',
    }

    stop = {
        "training_iteration": 50,
        "timesteps_total": 100000,
        "episode_reward_mean": 0.1,
    }

    ppo_config = ppo.DEFAULT_CONFIG.copy()
    ppo_config.update(config)
    ppo_config["lr"] = 1e-3
    trainer = ppo.PPOTrainer(config=ppo_config, env=SimpleCorridor)
    for _ in range(stop['training_iteration']):
        result = trainer.train()
        print(pretty_print(result))
        if result["timesteps_total"] >= stop['training_iteration'] or \
                result["episode_reward_mean"] >= stop['stop_reward']:
            break
    else:
        results = tune.run('ppo', config=config, stop=stop)

    ray.shutdown()

The obs array for conv is always zero only to test how multiple observation space can be implemented.

For some reason, the obs_space variable coming to TorchCustomModel is always Box type although I defined it as a tuple in the step method of the environment.

Python interpreter never reaches out the forward method of TorchCustomModel. So, I did not work on this method yet.

Would you please have a look and see how it can be fixed?

Many thanks,

@deepgravity,

rllib flattens spaces and stores them in its various buffers as flat tensor. It then reconstructs them as the unflattened datastructure before passing to the models.

For complex spaces there should be a space.original space. So you could check obs_space.original_space

1 Like

@mannyv Many thanks for your quick reply.

Yes, it works :slight_smile:

So, I changed my custom model as follows:

class TorchCustomModel(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, model_config,
                 name):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        self.torch_sub_fc_model = TorchFC(obs_space.original_space[0], action_space, num_outputs,
                                        model_config, name)
        
        self.torch_sub_vis_model = TorchVis(obs_space.original_space[1], action_space, num_outputs,
                                        model_config, name)
        
        self.head = nn.Linear(action_space.n*2, num_outputs)
        
        
    def forward(self, input_dict, state, seq_lens):
        input_dict["obs"][0] = input_dict["obs"][0].float()
        input_dict["obs"][1] = input_dict["obs"][1].float()
        fc_out, _ = self.torch_sub_fc_model(input_dict, state, seq_lens)
        cnn_out, _ = self.torch_sub_vis_model(input_dict, state, seq_lens)
        
        x = torch.cat((fc_out, cnn_out), 1)
        out = self.head(x.view(x.size(0), -1))
        return out, []

    def value_function(self):
        return torch.reshape(self.torch_sub_model.value_function(), [-1])

But I got the following error:

File "python\ray\_raylet.pyx", line 568, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 572, in ray._raylet.execute_task
  File "python\ray\_raylet.pyx", line 522, in ray._raylet.execute_task.function_executor
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\_private\function_manager.py", line 576, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\util\tracing\tracing_helper.py", line 448, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 588, in __init__
    seed=seed)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\util\tracing\tracing_helper.py", line 448, in _resume_span
    return method(self, *_args, **_kwargs)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 1377, in _build_policy_map
    conf, merged_conf)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\policy_map.py", line 144, in create_policy
    merged_config)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\policy_template.py", line 281, in __init__
    stats_fn=None if self.config["in_evaluation"] else stats_fn,
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\policy.py", line 748, in _initialize_loss_from_dummy_batch
    self._dummy_batch, explore=False)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\torch_policy.py", line 301, in compute_actions_from_input_dict
    seq_lens, explore, timestep)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\utils\threading.py", line 21, in wrapper
    return func(self, *a, **k)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\torch_policy.py", line 365, in _compute_action_helper
    seq_lens)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\models\modelv2.py", line 243, in __call__
    res = self.forward(restored, state or [], seq_lens)
  File "D:\ETHZ\rlb_pipeline\agent\custom_env.py", line 86, in forward
    fc_out, _ = self.torch_sub_fc_model(input_dict, state, seq_lens)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\models\modelv2.py", line 206, in __call__
    restored = input_dict.copy(shallow=True)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\sample_batch.py", line 272, in copy
    copy_ = SampleBatch(data)
  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\policy\sample_batch.py", line 133, in __init__
    self[k] = np.array(v)
ValueError: only one element tensors can be converted to Python scalars
:actor_name:RolloutWorker
Traceback (most recent call last):

  File "D:\ETHZ\rlb_pipeline\agent\custom_env.py", line 128, in <module>
    trainer = ppo.PPOTrainer(config=ppo_config, env=SimpleCorridor)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\agents\trainer_template.py", line 137, in __init__
    Trainer.__init__(self, config, env, logger_creator)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\agents\trainer.py", line 621, in __init__
    super().__init__(config, logger_creator)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\tune\trainable.py", line 106, in __init__
    self.setup(copy.deepcopy(self.config))

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\agents\trainer_template.py", line 147, in setup
    super().setup(config)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\agents\trainer.py", line 774, in setup
    self._init(self.config, self.env_creator)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\agents\trainer_template.py", line 176, in _init
    num_workers=self.config["num_workers"])

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\agents\trainer.py", line 862, in _make_workers
    logdir=self.logdir)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\evaluation\worker_set.py", line 118, in __init__
    spaces=spaces,

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\evaluation\worker_set.py", line 446, in _make_worker
    spaces=spaces,

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 543, in __init__
    policy_config=policy_config)

  File "C:\Users\Reza\Anaconda3\envs\rlb\lib\site-packages\ray\rllib\evaluation\rollout_worker.py", line 1462, in _determine_spaces_for_multi_agent_dict
    "`observation_space` not provided in PolicySpec for "

ValueError: `observation_space` not provided in PolicySpec for default_policy and env does not have an observation space OR no spaces received from other workers' env(s) OR no `observation_space` specified in config!


b'[2021-10-15 02:10:06,086 E 20968 19772] core_worker.cc:1564: Pushed Error with JobID: 01000000 of type: task with message: \x1b[36mray::RolloutWorker.__init__()\x1b[39m (pid=20968, ip=192.168.28.146, repr=<ray.rllib.evaluation.rollout_worker.modify_class.<locals>.Class object at 0x000002636DB68FD0>)\n  File "python\\ray\\_raylet.pyx", line 568, in ray._raylet.execute_task\n  File "python\\ray\\_raylet.pyx", line 572, in ray._raylet.execute_task\n  File "python\\ray\\_raylet.pyx", line 522, in ray._raylet.execute_task.function_executor\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\_private\\function_manager.py", line 576, in actor_method_executor\n    return method(__ray_actor, *args, **kwargs)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\util\\tracing\\tracing_helper.py", line 448, in _resume_span\n    return method(self, *_args, **_kwargs)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\evaluation\\rollout_worker.py", line 588, in __init__\n    seed=seed)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\util\\tracing\\tracing_helper.py", line 448, in _resume_span\n    return method(self, *_args, **_kwargs)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\evaluation\\rollout_worker.py", line 1377, in _build_policy_map\n    conf, merged_conf)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\policy_map.py", line 144, in create_policy\n    merged_config)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\policy_template.py", line 281, in __init__\n    stats_fn=None if self.config["in_evaluation"] else stats_fn,\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\policy.py", line 748, in _initialize_loss_from_dummy_batch\n    self._dummy_batch, explore=False)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\torch_policy.py", line 301, in compute_actions_from_input_dict\n    seq_lens, explore, timestep)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\utils\\threading.py", line 21, in wrapper\n    return func(self, *a, **k)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\torch_policy.py", line 365, in _compute_action_helper\n    seq_lens)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\models\\modelv2.py", line 243, in __call__\n    res = self.forward(restored, state or [], seq_lens)\n  File "D:\\ETHZ\\rlb_pipeline\\agent\\custom_env.py", line 86, in forward\n    fc_out, _ = self.torch_sub_fc_model(input_dict, state, seq_lens)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\models\\modelv2.py", line 206, in __call__\n    restored = input_dict.copy(shallow=True)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\sample_batch.py", line 272, in copy\n    copy_ = SampleBatch(data)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\policy\\sample_batch.py", line 133, in __init__\n    self[k] = np.array(v)\nValueError: only one element tensors can be converted to Python scalars at time: 1.63426e+09\r\n[2021-10-15 02:10:06,094 E 20968 19772] core_worker.cc:1564: Pushed Error with JobID: 01000000 of type: task with message: \x1b[36mray::RolloutWorker.foreach_env()::Exiting\x1b[39m (pid=20968, ip=192.168.28.146, repr=<ray.rllib.evaluation.rollout_worker.modify_class.<locals>.Class object at 0x000002636DB68FD0>)\n  File "python\\ray\\_raylet.pyx", line 568, in ray._raylet.execute_task\n  File "python\\ray\\_raylet.pyx", line 572, in ray._raylet.execute_task\n  File "python\\ray\\_raylet.pyx", line 522, in ray._raylet.execute_task.function_executor\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\_private\\function_manager.py", line 576, in actor_method_executor\n    return method(__ray_actor, *args, **kwargs)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\util\\tracing\\tracing_helper.py", line 448, in _resume_span\n    return method(self, *_args, **_kwargs)\n  File "C:\\Users\\Reza\\Anaconda3\\envs\\rlb\\lib\\site-packages\\ray\\rllib\\evaluation\\rollout_worker.py", line 1010, in foreach_env\n    if self.async_env is None:\nAttributeError: \'RolloutWorker\' object has no attribute \'async_env\' at time: 1.63426e+09\r\n'

Did I define my forward method correctly?

Thanks!

@deepgravity,

I think I would try something like this:

 def forward(self, input_dict, state, seq_lens):
        input_dict1 = input_dict.copy()
        input_dict2 = input_dict.copy()
        input_dict1["obs"] = input_dict["obs"][0]  
        input_dict2["obs"] = input_dict["obs"][1] 
        fc_out, _ = self.torch_sub_fc_model(input_dict1, state, seq_lens)
        cnn_out, _ = self.torch_sub_vis_model(input_dict2, state, seq_lens)

@mannyv
Thanks for your reply and sample code.

I think now it works. This is the new code for those who need complex observation:

import argparse
import gym
from gym.spaces import Discrete, Box
import numpy as np
import os
import random

import ray
from ray import tune
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.models.torch.visionnet import VisionNetwork as TorchVis
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.tune.logger import pretty_print

torch, nn = try_import_torch()
import torch.nn.functional as F

complex_obs_space = True

class SimpleCorridor(gym.Env):
    def __init__(self, config: EnvContext):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        
        self.observation_space = Box(0.0, 
                                     self.end_pos, 
                                     shape=(1, ), 
                                     dtype=np.float32)
        
        self.observation_space_cnn = gym.spaces.Box(low=0, 
                                                    high=255,
                                                    shape=(42, 42, 3),
                                                    dtype=np.uint8)
        
        if complex_obs_space:
            self.observation_space = gym.spaces.Tuple((self.observation_space, 
                                                self.observation_space_cnn))
        
        self.seed(config.worker_index * config.num_workers)

    def reset(self):
        self.cur_pos = 0
        self.obs_arr_conv = np.zeros((42, 42, 3))
        observations = ([self.cur_pos], self.obs_arr_conv)
        if complex_obs_space:
            return observations
        else:
            return [self.cur_pos]

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        if complex_obs_space:
            return (([self.cur_pos], self.obs_arr_conv)), random.random() * 2 if done else -0.1, done, {}
        else:
            return [self.cur_pos], random.random() * 2 if done else -0.1, done, {}
        

    def seed(self, seed=None):
        random.seed(seed)


class TorchCustomModel(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, 
                     model_config, name):
        
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        if complex_obs_space:
            self.torch_sub_fc_model = TorchFC(obs_space.original_space[0], action_space, num_outputs,
                                        model_config, name)
        
            self.torch_sub_vis_model = TorchVis(obs_space.original_space[1], action_space, num_outputs,
                                            model_config, name)
            
            self.value_f = nn.Linear(2*action_space.n, action_space.n)
            self.head = nn.Linear(action_space.n, num_outputs)
        else:
            self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
                                        model_config, name)
        
        
    def forward(self, input_dict, state, seq_lens):
        if complex_obs_space:
            input_dict1 = input_dict.copy()
            input_dict2 = input_dict.copy()
            input_dict1["obs"] = input_dict["obs"][0]  
            input_dict2["obs"] = input_dict["obs"][1] 
            
            fc_out, _ = self.torch_sub_fc_model(input_dict1, state, seq_lens)
            cnn_out, _ = self.torch_sub_vis_model(input_dict2, state, seq_lens)
            
            x = torch.cat((fc_out, cnn_out), 1)
            x = F.relu(self.value_f(x))
            out = self.head(x.view(x.size(0), -1))
            
            return out, []
        else:
            input_dict["obs"] = input_dict["obs"].float()
            fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
            return fc_out, []


    def value_function(self):
        if complex_obs_space:
            vf_fc = torch.reshape(self.torch_sub_fc_model.value_function(), [-1])
            vf_cnn = torch.reshape(self.torch_sub_vis_model.value_function(), [-1])
            
            return vf_fc
        else:
            return torch.reshape(self.torch_sub_model.value_function(), [-1])


if __name__ == "__main__":
    ray.init(ignore_reinit_error=True,
            log_to_driver=False,
            local_mode=True,
            object_store_memory=10**8)

    ModelCatalog.register_custom_model("my_model", TorchCustomModel)

    config = {
        "env": SimpleCorridor,  # or "corridor" if registered above
        "env_config": {
            "corridor_length": 5,
        },
        "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        "model": {
            "custom_model": "my_model",
            "vf_share_layers": True,
        },
        "num_workers": 1,  # parallelism
        "framework": 'torch',
    }

    stop = {
        "training_iteration": 50,
        "timesteps_total": 100000,
        "episode_reward_mean": 0.1,
    }

    ppo_config = ppo.DEFAULT_CONFIG.copy()
    ppo_config.update(config)
    ppo_config["lr"] = 1e-3
    trainer = ppo.PPOTrainer(config=ppo_config, env=SimpleCorridor)
    for _ in range(stop['training_iteration']):
        result = trainer.train()
        print(pretty_print(result))
        if result["timesteps_total"] >= stop['training_iteration'] or \
                result["episode_reward_mean"] >= stop['stop_reward']:
            break
    else:
        results = tune.run('ppo', config=config, stop=stop)

    ray.shutdown()

Still, I do not know how to deal with value_func. As you see in the code, I return vf_fc.
Do I need to concatenate or average vf_fc with vf_cnn?

Also, I get the following warnings multiple times when copying input_dict:

/home/rdbt/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/sample_batch.py:114: FutureWarning: The input object of type 'Tensor' is an array-like implementing one of the corresponding protocols (`__array__`, `__array_interface__` or `__array_struct__`); but not a sequence (or 0-D). In the future, this object will be coerced as if it was first converted using `np.array(obj)`. To retain the old behaviour, you have to either modify the type 'Tensor', or assign to an empty array created with `np.empty(correct_shape, dtype=object)`.
  self[k] = np.array(v)
/home/rdbt/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/sample_batch.py:114: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  self[k] = np.array(v)

My ray version is: ‘2.0.0.dev0’

Thanks!

@deepgravity,

What I would probably do in this situation is make a layer like this:
self.value_f = nn.Linear(2,1)

Then

    def value_function(self):
        if complex_obs_space:
            vf_fc = self.torch_sub_fc_model.value_function()
            vf_cnn = self.torch_sub_vis_model.value_function()
            vf_combined = torch.cat([vf_fc, vf_cnn], -1)
            return torch.reshape(self.value_f(vf_combined),-1)
        else:
            return torch.reshape(self.torch_sub_model.value_function(), [-1])

@mannyv Many thanks!

I added these lines to my code, but it gives me a weird error.
Here is the code:

import argparse
import gym
from gym.spaces import Discrete, Box
import numpy as np
import os
import random

import ray
from ray import tune
from ray.rllib.agents import ppo
from ray.rllib.env.env_context import EnvContext
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.torch.fcnet import FullyConnectedNetwork as TorchFC
from ray.rllib.models.torch.visionnet import VisionNetwork as TorchVis
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.tune.logger import pretty_print

torch, nn = try_import_torch()
import torch.nn.functional as F

complex_obs_space = True

class SimpleCorridor(gym.Env):
    def __init__(self, config: EnvContext):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        
        self.observation_space = Box(0.0, 
                                     self.end_pos, 
                                     shape=(1, ), 
                                     dtype=np.float32)
        
        self.observation_space_cnn = gym.spaces.Box(low=0, 
                                                    high=255,
                                                    shape=(42, 42, 3),
                                                    dtype=np.uint8)
        
        if complex_obs_space:
            self.observation_space = gym.spaces.Tuple((self.observation_space, 
                                                self.observation_space_cnn))
        
        self.seed(config.worker_index * config.num_workers)

    def reset(self):
        self.cur_pos = 0
        self.obs_arr_conv = np.zeros((42, 42, 3))
        if complex_obs_space:
            return ([self.cur_pos], self.obs_arr_conv)
        else:
            return [self.cur_pos]

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        if complex_obs_space:
            return ([self.cur_pos], self.obs_arr_conv), random.random() * 2 if done else -0.1, done, {}
        else:
            return [self.cur_pos], random.random() * 2 if done else -0.1, done, {}
        

    def seed(self, seed=None):
        random.seed(seed)


class TorchCustomModel(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, 
                     model_config, name):
        
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs,
                              model_config, name)
        nn.Module.__init__(self)

        if complex_obs_space:
            self.torch_sub_fc_model = TorchFC(obs_space.original_space[0], action_space, num_outputs,
                                        model_config, name)
        
            self.torch_sub_vis_model = TorchVis(obs_space.original_space[1], action_space, num_outputs,
                                            model_config, name)
            
            self.value_f = nn.Linear(2*action_space.n, action_space.n)
            self.head = nn.Linear(action_space.n, num_outputs)
        else:
            self.torch_sub_model = TorchFC(obs_space, action_space, num_outputs,
                                        model_config, name)
        
        
    def forward(self, input_dict, state, seq_lens):
        if complex_obs_space:
            input_dict1 = input_dict.copy()
            input_dict2 = input_dict.copy()
            input_dict1["obs"] = input_dict["obs"][0]  
            input_dict2["obs"] = input_dict["obs"][1] 
            
            fc_out, _ = self.torch_sub_fc_model(input_dict1, state, seq_lens)
            cnn_out, _ = self.torch_sub_vis_model(input_dict2, state, seq_lens)
            
            x = torch.cat((fc_out, cnn_out), 1)
            x = F.relu(self.value_f(x))
            out = self.head(x.view(x.size(0), -1))
            
            return out, []
        else:
            input_dict["obs"] = input_dict["obs"].float()
            fc_out, _ = self.torch_sub_model(input_dict, state, seq_lens)
            return fc_out, []


    def value_function(self):
        if complex_obs_space:
            vf_fc = torch.reshape(self.torch_sub_fc_model.value_function(), [-1])
            vf_cnn = torch.reshape(self.torch_sub_vis_model.value_function(), [-1])
            vf_combined = torch.cat([vf_fc, vf_cnn], -1)
            return torch.reshape(self.value_f(vf_combined),-1)
        else:
            return torch.reshape(self.torch_sub_model.value_function(), [-1])


if __name__ == "__main__":
    ray.init(ignore_reinit_error=True,
            log_to_driver=False,
            local_mode=True,
            object_store_memory=10**8)

    ModelCatalog.register_custom_model("my_model", TorchCustomModel)

    config = {
        "env": SimpleCorridor,  # or "corridor" if registered above
        "env_config": {
            "corridor_length": 5,
        },
        "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        "model": {
            "custom_model": "my_model",
            "vf_share_layers": True,
        },
        "num_workers": 1,  # parallelism
        "framework": 'torch',
    }

    stop = {
        "training_iteration": 10,
        "timesteps_total": 12,
        "episode_reward_mean": 0.1,
        "stop_reward": 0.1,
    }

    ppo_config = ppo.DEFAULT_CONFIG.copy()
    ppo_config.update(config)
    ppo_config["lr"] = 1e-3
    
    tune_flag = True
    
    if tune_flag:
        agent = ppo.PPOTrainer
        results = tune.run(agent, config=ppo_config, stop=stop)
    else:
        agent = ppo.PPOTrainer(config=ppo_config, env=SimpleCorridor)
        for _ in range(stop['training_iteration']):
            result = agent.train()
            print(pretty_print(result))
            if result["timesteps_total"] >= stop['training_iteration'] or \
                    result["episode_reward_mean"] >= stop['stop_reward']:
                break
    ray.shutdown()

And here is the error:

2021-10-16 12:01:08,304	INFO services.py:1250 -- View the Ray dashboard at http://127.0.0.1:8265
2021-10-16 12:01:09,405	WARNING worker.py:500 -- `ray.get_gpu_ids()` will always return the empty list when called from the driver. This is because Ray does not manage GPU allocations to the driver process.
2021-10-16 12:01:09,437	INFO ppo.py:158 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn't work for you.
2021-10-16 12:01:09,438	INFO trainer.py:742 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
:task_name:bundle_reservation_check_func
:actor_name:PPO
:actor_name:RolloutWorker
/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/sample_batch.py:114: FutureWarning: The input object of type 'Tensor' is an array-like implementing one of the corresponding protocols (`__array__`, `__array_interface__` or `__array_struct__`); but not a sequence (or 0-D). In the future, this object will be coerced as if it was first converted using `np.array(obj)`. To retain the old behaviour, you have to either modify the type 'Tensor', or assign to an empty array created with `np.empty(correct_shape, dtype=object)`.
  self[k] = np.array(v)
/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/sample_batch.py:114: VisibleDeprecationWarning: Creating an ndarray from ragged nested sequences (which is a list-or-tuple of lists-or-tuples-or ndarrays with different lengths or shapes) is deprecated. If you meant to do this, you must specify 'dtype=object' when creating the ndarray.
  self[k] = np.array(v)
2021-10-16 12:01:12,779	ERROR actor.py:739 -- Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.__init__() (pid=29603, ip=192.168.28.146)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 573, in __init__
    self._build_policy_map(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1372, in _build_policy_map
    self.policy_map.create_policy(name, orig_cls, obs_space, act_space,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/policy_map.py", line 136, in create_policy
    self[policy_id] = class_(observation_space, action_space,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/policy_template.py", line 279, in __init__
    self._initialize_loss_from_dummy_batch(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/policy.py", line 750, in _initialize_loss_from_dummy_batch
    self.compute_actions_from_input_dict(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/torch_policy.py", line 299, in compute_actions_from_input_dict
    return self._compute_action_helper(input_dict, state_batches,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/utils/threading.py", line 21, in wrapper
    return func(self, *a, **k)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/torch_policy.py", line 385, in _compute_action_helper
    extra_fetches = self.extra_action_out(input_dict, state_batches,
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/policy/policy_template.py", line 349, in extra_action_out
    stats_dict = extra_action_out_fn(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/ppo/ppo_torch_policy.py", line 169, in vf_preds_fetches
    SampleBatch.VF_PREDS: model.value_function(),
  File "/home/data/dbt/rlb_pipeline/agent/custom_env.py", line 121, in value_function
    return torch.reshape(self.value_f(vf_combined),-1)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/torch/nn/modules/module.py", line 1051, in _call_impl
    return forward_call(*input, **kwargs)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/torch/nn/modules/linear.py", line 96, in forward
    return F.linear(input, self.weight, self.bias)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/torch/nn/functional.py", line 1847, in linear
    return torch._C._nn.linear(input, weight, bias)
RuntimeError: mat1 and mat2 shapes cannot be multiplied (1x64 and 4x2)
2021-10-16 12:01:12,784	ERROR actor.py:739 -- Exception raised in creation task: The actor died because of an error raised in its creation task, ray::PPO.__init__() (pid=29603, ip=192.168.28.146)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 137, in __init__
    Trainer.__init__(self, config, env, logger_creator)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 603, in __init__
    super().__init__(config, logger_creator)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/tune/trainable.py", line 105, in __init__
    self.setup(copy.deepcopy(self.config))
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 147, in setup
    super().setup(config)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 748, in setup
    self._init(self.config, self.env_creator)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 171, in _init
    self.workers = self._make_workers(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 830, in _make_workers
    return WorkerSet(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/worker_set.py", line 103, in __init__
    self._local_worker = self._make_worker(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/worker_set.py", line 399, in _make_worker
    worker = cls(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 528, in __init__
    policy_dict = _determine_spaces_for_multi_agent_dict(
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 1457, in _determine_spaces_for_multi_agent_dict
    raise ValueError(
ValueError: `observation_space` not provided in PolicySpec for default_policy and env does not have an observation space OR no spaces received from other workers' env(s) OR no `observation_space` specified in config!
<IPython.core.display.HTML object>
2021-10-16 12:01:13,329	ERROR trial_runner.py:844 -- Trial PPO_SimpleCorridor_fc65a_00000: Error processing event.
Traceback (most recent call last):
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 810, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 767, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 89, in wrapper
    return func(*args, **kwargs)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/worker.py", line 1621, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(AttributeError): ray::PPO.train_buffered()::Exiting (pid=29603, ip=192.168.28.146, repr=PPO)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/tune/trainable.py", line 180, in train_buffered
    result = self.train()
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 662, in train
    raise e
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 648, in train
    result = Trainable.train(self)
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/tune/trainable.py", line 239, in train
    result = self.step()
  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 206, in step
    step_results = next(self.train_exec_impl)
AttributeError: 'PPO' object has no attribute 'train_exec_impl'
Result for PPO_SimpleCorridor_fc65a_00000:
  {}
  
<IPython.core.display.HTML object>
Traceback (most recent call last):

  File "/home/data/dbt/rlb_pipeline/agent/custom_env.py", line 163, in <module>
    results = tune.run(agent, config=ppo_config, stop=stop)

  File "/home/data/anaconda3/envs/rllib/lib/python3.9/site-packages/ray/tune/tune.py", line 608, in run
    raise TuneError("Trials did not complete", incomplete_trials)

TuneError: ('Trials did not complete', [PPO_SimpleCorridor_fc65a_00000])

For some reasons it does not recognize the observation_space!

Can it be related to my Ray version? Would you please run the code and see if you get the same error?

Thanks!

@deepgravity,

Try this colab:


@mannyv
Yes, now it works :slight_smile:
Many thanks and enjoy the coffee :slight_smile: