ValueError: The two structures don't have the same nested structure

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m trying to migrate an environment from SB3 to RLLib in oder to enable multi agent functions. I’ve read this issue and think I am dealing with a similar problem, but I can’t for the live of me figure out what I need to change.

I’m assuming that something is wrong with how I am configuring my observation space - it’s meant to be dictionary with the agents as keys and the observations in form of the array mm_state as values.

If anyone has an idea on how to fix it or on how I could better troubleshoot this to find the issue, I would be very grateful.

ValueError: The two structures don't have the same nested structure.

First structure: type=ndarray str=[0. 1.]

Second structure: type=OrderedDict str=OrderedDict([('agent0', array([541.69055, 113.26023], dtype=float32)), ('agent1', array([ 69.30221, 792.6272 ], dtype=float32)), ('agent10', array([805.5128 , 642.98914], dtype=float32)), ('agent2', array([853.7121, 986.2091], dtype=float32)), ('agent3', array([485.2693 , 410.55228], dtype=float32)), ('agent4', array([244.2277 , 345.53674], dtype=float32)), ('agent5', array([974.2656, 700.9535], dtype=float32)), ('agent6', array([594.5883 , 883.23303], dtype=float32)), ('agent7', array([120.42753, 782.34705], dtype=float32)), ('agent8', array([956.6239, 536.2708], dtype=float32)), ('agent9', array([210.62961,  30.17688], dtype=float32))])
mm_state = np.concatenate(
            (
                flow_proc_percentage,
                ttll
            ),
            axis=None
        ).astype(np.float32)

        return mm_state


import gymnasium as gym
from spr_rl.agent.params import Params
from gymnasium import spaces
from gymnasium.utils import seeding
from spr_rl.envs.wrapper import SPRSimWrapper
import numpy as np
import random
import csv
from ray.rllib.env.multi_agent_env import MultiAgentEnv

class SprEnv(MultiAgentEnv):

    # Set this in SOME subclasses
    metadata = {'render.modes': ['human']}

    def __init__(self, config, render_mode=None):
        print(config)
        seed = config.get('seed', "4321")
        simulator_config = config.get('sim_config', "inputs/config/simulator/mmpp-12-8.yaml")
        network = config.get('network', "inputs/networks/interroute-in2-eg1-rand-cap0-2.graphml")
        if network is None:
            print("Warning: network is None.")

        services = config.get('services', "inputs/services/abc-start_delay0.yaml")
        training_duration = config.get('training_duration', "20000")
        test_mode = config.get('test_mode')
        sim_seed = config.get('sim_seed')
        best = config.get('best')
        self.params = Params(
            seed=seed,
            sim_config=simulator_config,
            network=network,
            services=services,
            training_duration=training_duration,
            test_mode=test_mode,
            sim_seed=sim_seed,
            best=best
        )

        print(self.params)
        print(self.params.node_amount)
        self.num_agents = 11
        print(self.num_agents)
        self.agents = {}
        for i in range(self.num_agents):
            agent_name = f'agent{i}'  # e.g., agent0, agent1, ...
            self.agents[i] = agent_name
        self._agent_ids = set(self.agents)
        self._spaces_in_preferred_format = True
        self.terminateds = set()
        self.truncateds = set()
        observation_spaces = {}
        # Loop through the number of agents to add their observation spaces
        action_spaces = {}
        for i in range(self.num_agents):
            agent_name = f'agent{i}'  # e.g., agent0, agent1, ...
            observation_spaces[agent_name] = spaces.Box(-1, 1000, shape=(2,), dtype=np.float32)
            action_spaces[agent_name] = spaces.Discrete(self.params.action_limit)
        self.observation_space = gym.spaces.Dict(observation_spaces)
        self.action_space = gym.spaces.Dict(action_spaces)
        print(self.action_space)
        self.wrapper = SPRSimWrapper(self.params)
        self.episode_number = -1  # -1 here so that first episode reset call makes it 0

    def step(self, action_dict):

        # Get flow information before action
        processing_index = self.last_flow.processing_index
        forward_to_eg = self.last_flow.forward_to_eg
        previous_node_id = self.last_flow.current_node_id
        flow_delay = self.last_flow.end2end_delay

        # Extract numerical part of previous_node_id (e.g., extract '0' from 'pop0')
        num_id = int(previous_node_id.replace('pop', ''))

        # Get corresponding agent name (e.g., 'agent0' for 'pop0')
        agent_name = f'agent{num_id}'

        # Get the action of the corresponding agent
        action = action_dict.get(agent_name)

        # Apply action
        nn_state, sim_state = self.wrapper.apply(action)

        # Initialize an empty dictionary to hold observations
        obs = {}

        # Loop through all nodes up to num_agents
        for i in range(self.num_agents):
            # Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
            node_id = f"pop{i}"

            # Call process_node_state to get mm_state for the current node
            mm_state = self.wrapper.process_node_state(sim_state, node_id)

            # Store the mm_state in the obs dictionary, keyed by node_id
            agent_id = f"agent{i}"
            obs[agent_id] = mm_state

        print(obs)

        new_flow = sim_state.flow

        sfc_len = len(sim_state.sfcs[self.last_flow.sfc])

        # Set reward points
        SUCCESS = 10
        PROCESSED = 1 / sfc_len
        EG_MOVED = -(self.last_flow.end2end_delay - flow_delay) / self.params.net_diameter
        EG_KEPT = -1 / self.params.net_diameter
        DROPPED = -10
        MOVED = 0

        # This reward works by using the concept of aliasing and tracking the flow object in memory
        if self.last_flow.success:
            # If flow successful
            reward = SUCCESS
        else:
            if self.last_flow.dropped:
                # If the flow was dropped
                reward = DROPPED
            else:
                if forward_to_eg:
                    if self.last_flow.current_node_id == self.last_flow.egress_node_id:
                        # Flow arrived at egress, wont ask for more decisions
                        reward = SUCCESS
                    else:
                        if self.last_flow.current_node_id == previous_node_id:
                            # Flow stayed at the node
                            reward = EG_KEPT
                        else:
                            # Flow moved
                            reward = EG_MOVED
                else:
                    # Flow is still processing
                    # if flow processed more
                    if self.last_flow.processing_index > processing_index:
                        if (
                            self.last_flow.current_node_id == self.last_flow.egress_node_id
                        ) and (
                            self.last_flow.processing_index == sfc_len
                        ):
                            # Flow was processed at last sf at egress node,
                            # but success wont be triggered as it will automatically depart
                            reward = SUCCESS
                        else:
                            reward = PROCESSED
                    else:
                        reward = MOVED
        # Initialize an empty dictionary to hold rewards
        rewards = {}

        # Loop through all agents to initialize their rewards to 0
        for i in range(self.num_agents):
            agent_id = f"agent{i}"
            rewards[agent_id] = 0

        # Extract the numeric part from previous_node_id
        numeric_part = int(previous_node_id.replace("pop", ""))

        # Update the reward for the agent that matches the numeric part of previous_node_id
        matching_agent_id = f"agent{numeric_part}"
        if matching_agent_id in rewards:
            rewards[matching_agent_id] = reward  # Or set to another value of your choice

        done = False
        # Episode length is a set number of flows
        self.episode_reward += reward
        if not self.params.test_mode and self.wrapper.simulator.env.now >= 20000:
            done = True
            self.episode_reward_writer.writerow([self.episode_number, self.episode_reward])
        self.steps += 1

        # Set last flow to new flow. New actions will be generated for the new flow
        self.last_flow = new_flow
        # Create a dictionary to store any additional information
        truncated = False
        info = {'sim_time': self.wrapper.simulator.env.now}
        return obs, reward, done, truncated, info

    def reset(self, *, seed=None, options=None):

        self.random_gen, _ = seeding.np_random()
        if self.params.sim_seed is None:

            sim_seed = self.unwrapped.np_random.integers(1, np.iinfo(np.int32).max)
        else:
            sim_seed = self.params.sim_seed
        nn_state, sim_state = self.wrapper.init(sim_seed)

        self.steps = 0
        self.episode_reward = 0
        self.episode_number += 1

        self.last_flow = sim_state.flow
        self.network = sim_state.network
        previous_node_id = self.last_flow.current_node_id


        obs = {}
        for i in range(self.num_agents):
            # Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
            node_id = f"pop{i}"

            # Call process_node_state to get mm_state for the current node
            mm_state = self.wrapper.process_node_state(sim_state, node_id)
            agent_id = f"agent{i}"
            obs[agent_id] = mm_state

        self.terminateds = set()
        self.truncateds = set()

        return obs, {}

    @staticmethod
    def get_dist_to_eg(network, flow):
        """ Returns the distance to egress node in hops """
        dist_to_egress = network.graph['shortest_paths'][(flow.current_node_id,
                                                          flow.egress_node_id)][1]  # 1: delay; 2: hops
        return dist_to_egress

    def render(self, mode='cli'):
        assert mode in ['human']

    def seed(self, seed=None):
        """Sets the seed for this env's random number generator(s).

        Note:
            Some environments use multiple pseudorandom number generators.
            We want to capture all such seeds used in order to ensure that
            there aren't accidental correlations between multiple generators.

        Returns:
            list<bigint>: Returns the list of seeds used in this env's random
              number generators. The first value in the list should be the
              "main" seed, or the value which a reproducer should pass to
              'seed'. Often, the main seed equals the provided 'seed', but
              this won't be true if seed=None, for example.
        """
        self.random_gen, seed = seeding.np_random()
        return [seed]