ValueError: The two structures don't have the same nested structure

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m trying to migrate an environment from SB3 to RLLib in oder to enable multi agent functions. I’ve read this issue and think I am dealing with a similar problem, but I can’t for the live of me figure out what I need to change.

I’m assuming that something is wrong with how I am configuring my observation space - it’s meant to be dictionary with the agents as keys and the observations in form of the array mm_state as values.

If anyone has an idea on how to fix it or on how I could better troubleshoot this to find the issue, I would be very grateful.

ValueError: The two structures don't have the same nested structure.

First structure: type=ndarray str=[0. 1.]

Second structure: type=OrderedDict str=OrderedDict([('agent0', array([541.69055, 113.26023], dtype=float32)), ('agent1', array([ 69.30221, 792.6272 ], dtype=float32)), ('agent10', array([805.5128 , 642.98914], dtype=float32)), ('agent2', array([853.7121, 986.2091], dtype=float32)), ('agent3', array([485.2693 , 410.55228], dtype=float32)), ('agent4', array([244.2277 , 345.53674], dtype=float32)), ('agent5', array([974.2656, 700.9535], dtype=float32)), ('agent6', array([594.5883 , 883.23303], dtype=float32)), ('agent7', array([120.42753, 782.34705], dtype=float32)), ('agent8', array([956.6239, 536.2708], dtype=float32)), ('agent9', array([210.62961,  30.17688], dtype=float32))])
mm_state = np.concatenate(

        return mm_state

import gymnasium as gym
from spr_rl.agent.params import Params
from gymnasium import spaces
from gymnasium.utils import seeding
from spr_rl.envs.wrapper import SPRSimWrapper
import numpy as np
import random
import csv
from ray.rllib.env.multi_agent_env import MultiAgentEnv

class SprEnv(MultiAgentEnv):

    # Set this in SOME subclasses
    metadata = {'render.modes': ['human']}

    def __init__(self, config, render_mode=None):
        seed = config.get('seed', "4321")
        simulator_config = config.get('sim_config', "inputs/config/simulator/mmpp-12-8.yaml")
        network = config.get('network', "inputs/networks/interroute-in2-eg1-rand-cap0-2.graphml")
        if network is None:
            print("Warning: network is None.")

        services = config.get('services', "inputs/services/abc-start_delay0.yaml")
        training_duration = config.get('training_duration', "20000")
        test_mode = config.get('test_mode')
        sim_seed = config.get('sim_seed')
        best = config.get('best')
        self.params = Params(

        self.num_agents = 11
        self.agents = {}
        for i in range(self.num_agents):
            agent_name = f'agent{i}'  # e.g., agent0, agent1, ...
            self.agents[i] = agent_name
        self._agent_ids = set(self.agents)
        self._spaces_in_preferred_format = True
        self.terminateds = set()
        self.truncateds = set()
        observation_spaces = {}
        # Loop through the number of agents to add their observation spaces
        action_spaces = {}
        for i in range(self.num_agents):
            agent_name = f'agent{i}'  # e.g., agent0, agent1, ...
            observation_spaces[agent_name] = spaces.Box(-1, 1000, shape=(2,), dtype=np.float32)
            action_spaces[agent_name] = spaces.Discrete(self.params.action_limit)
        self.observation_space = gym.spaces.Dict(observation_spaces)
        self.action_space = gym.spaces.Dict(action_spaces)
        self.wrapper = SPRSimWrapper(self.params)
        self.episode_number = -1  # -1 here so that first episode reset call makes it 0

    def step(self, action_dict):

        # Get flow information before action
        processing_index = self.last_flow.processing_index
        forward_to_eg = self.last_flow.forward_to_eg
        previous_node_id = self.last_flow.current_node_id
        flow_delay = self.last_flow.end2end_delay

        # Extract numerical part of previous_node_id (e.g., extract '0' from 'pop0')
        num_id = int(previous_node_id.replace('pop', ''))

        # Get corresponding agent name (e.g., 'agent0' for 'pop0')
        agent_name = f'agent{num_id}'

        # Get the action of the corresponding agent
        action = action_dict.get(agent_name)

        # Apply action
        nn_state, sim_state = self.wrapper.apply(action)

        # Initialize an empty dictionary to hold observations
        obs = {}

        # Loop through all nodes up to num_agents
        for i in range(self.num_agents):
            # Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
            node_id = f"pop{i}"

            # Call process_node_state to get mm_state for the current node
            mm_state = self.wrapper.process_node_state(sim_state, node_id)

            # Store the mm_state in the obs dictionary, keyed by node_id
            agent_id = f"agent{i}"
            obs[agent_id] = mm_state


        new_flow = sim_state.flow

        sfc_len = len(sim_state.sfcs[self.last_flow.sfc])

        # Set reward points
        SUCCESS = 10
        PROCESSED = 1 / sfc_len
        EG_MOVED = -(self.last_flow.end2end_delay - flow_delay) / self.params.net_diameter
        EG_KEPT = -1 / self.params.net_diameter
        DROPPED = -10
        MOVED = 0

        # This reward works by using the concept of aliasing and tracking the flow object in memory
        if self.last_flow.success:
            # If flow successful
            reward = SUCCESS
            if self.last_flow.dropped:
                # If the flow was dropped
                reward = DROPPED
                if forward_to_eg:
                    if self.last_flow.current_node_id == self.last_flow.egress_node_id:
                        # Flow arrived at egress, wont ask for more decisions
                        reward = SUCCESS
                        if self.last_flow.current_node_id == previous_node_id:
                            # Flow stayed at the node
                            reward = EG_KEPT
                            # Flow moved
                            reward = EG_MOVED
                    # Flow is still processing
                    # if flow processed more
                    if self.last_flow.processing_index > processing_index:
                        if (
                            self.last_flow.current_node_id == self.last_flow.egress_node_id
                        ) and (
                            self.last_flow.processing_index == sfc_len
                            # Flow was processed at last sf at egress node,
                            # but success wont be triggered as it will automatically depart
                            reward = SUCCESS
                            reward = PROCESSED
                        reward = MOVED
        # Initialize an empty dictionary to hold rewards
        rewards = {}

        # Loop through all agents to initialize their rewards to 0
        for i in range(self.num_agents):
            agent_id = f"agent{i}"
            rewards[agent_id] = 0

        # Extract the numeric part from previous_node_id
        numeric_part = int(previous_node_id.replace("pop", ""))

        # Update the reward for the agent that matches the numeric part of previous_node_id
        matching_agent_id = f"agent{numeric_part}"
        if matching_agent_id in rewards:
            rewards[matching_agent_id] = reward  # Or set to another value of your choice

        done = False
        # Episode length is a set number of flows
        self.episode_reward += reward
        if not self.params.test_mode and >= 20000:
            done = True
            self.episode_reward_writer.writerow([self.episode_number, self.episode_reward])
        self.steps += 1

        # Set last flow to new flow. New actions will be generated for the new flow
        self.last_flow = new_flow
        # Create a dictionary to store any additional information
        truncated = False
        info = {'sim_time':}
        return obs, reward, done, truncated, info

    def reset(self, *, seed=None, options=None):

        self.random_gen, _ = seeding.np_random()
        if self.params.sim_seed is None:

            sim_seed = self.unwrapped.np_random.integers(1, np.iinfo(np.int32).max)
            sim_seed = self.params.sim_seed
        nn_state, sim_state = self.wrapper.init(sim_seed)

        self.steps = 0
        self.episode_reward = 0
        self.episode_number += 1

        self.last_flow = sim_state.flow =
        previous_node_id = self.last_flow.current_node_id

        obs = {}
        for i in range(self.num_agents):
            # Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
            node_id = f"pop{i}"

            # Call process_node_state to get mm_state for the current node
            mm_state = self.wrapper.process_node_state(sim_state, node_id)
            agent_id = f"agent{i}"
            obs[agent_id] = mm_state

        self.terminateds = set()
        self.truncateds = set()

        return obs, {}

    def get_dist_to_eg(network, flow):
        """ Returns the distance to egress node in hops """
        dist_to_egress = network.graph['shortest_paths'][(flow.current_node_id,
                                                          flow.egress_node_id)][1]  # 1: delay; 2: hops
        return dist_to_egress

    def render(self, mode='cli'):
        assert mode in ['human']

    def seed(self, seed=None):
        """Sets the seed for this env's random number generator(s).

            Some environments use multiple pseudorandom number generators.
            We want to capture all such seeds used in order to ensure that
            there aren't accidental correlations between multiple generators.

            list<bigint>: Returns the list of seeds used in this env's random
              number generators. The first value in the list should be the
              "main" seed, or the value which a reproducer should pass to
              'seed'. Often, the main seed equals the provided 'seed', but
              this won't be true if seed=None, for example.
        self.random_gen, seed = seeding.np_random()
        return [seed]