- High: It blocks me to complete my task.
I’m trying to migrate an environment from SB3 to RLLib in oder to enable multi agent functions. I’ve read this issue and think I am dealing with a similar problem, but I can’t for the live of me figure out what I need to change.
I’m assuming that something is wrong with how I am configuring my observation space - it’s meant to be dictionary with the agents as keys and the observations in form of the array mm_state as values.
If anyone has an idea on how to fix it or on how I could better troubleshoot this to find the issue, I would be very grateful.
ValueError: The two structures don't have the same nested structure.
First structure: type=ndarray str=[0. 1.]
Second structure: type=OrderedDict str=OrderedDict([('agent0', array([541.69055, 113.26023], dtype=float32)), ('agent1', array([ 69.30221, 792.6272 ], dtype=float32)), ('agent10', array([805.5128 , 642.98914], dtype=float32)), ('agent2', array([853.7121, 986.2091], dtype=float32)), ('agent3', array([485.2693 , 410.55228], dtype=float32)), ('agent4', array([244.2277 , 345.53674], dtype=float32)), ('agent5', array([974.2656, 700.9535], dtype=float32)), ('agent6', array([594.5883 , 883.23303], dtype=float32)), ('agent7', array([120.42753, 782.34705], dtype=float32)), ('agent8', array([956.6239, 536.2708], dtype=float32)), ('agent9', array([210.62961, 30.17688], dtype=float32))])
mm_state = np.concatenate(
return mm_state
import gymnasium as gym
from spr_rl.agent.params import Params
from gymnasium import spaces
from gymnasium.utils import seeding
from spr_rl.envs.wrapper import SPRSimWrapper
import numpy as np
import random
import csv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class SprEnv(MultiAgentEnv):
# Set this in SOME subclasses
metadata = {'render.modes': ['human']}
def __init__(self, config, render_mode=None):
seed = config.get('seed', "4321")
simulator_config = config.get('sim_config', "inputs/config/simulator/mmpp-12-8.yaml")
network = config.get('network', "inputs/networks/interroute-in2-eg1-rand-cap0-2.graphml")
if network is None:
print("Warning: network is None.")
services = config.get('services', "inputs/services/abc-start_delay0.yaml")
training_duration = config.get('training_duration', "20000")
test_mode = config.get('test_mode')
sim_seed = config.get('sim_seed')
best = config.get('best')
self.params = Params(
self.num_agents = 11
self.agents = {}
for i in range(self.num_agents):
agent_name = f'agent{i}' # e.g., agent0, agent1, ...
self.agents[i] = agent_name
self._agent_ids = set(self.agents)
self._spaces_in_preferred_format = True
self.terminateds = set()
self.truncateds = set()
observation_spaces = {}
# Loop through the number of agents to add their observation spaces
action_spaces = {}
for i in range(self.num_agents):
agent_name = f'agent{i}' # e.g., agent0, agent1, ...
observation_spaces[agent_name] = spaces.Box(-1, 1000, shape=(2,), dtype=np.float32)
action_spaces[agent_name] = spaces.Discrete(self.params.action_limit)
self.observation_space = gym.spaces.Dict(observation_spaces)
self.action_space = gym.spaces.Dict(action_spaces)
self.wrapper = SPRSimWrapper(self.params)
self.episode_number = -1 # -1 here so that first episode reset call makes it 0
def step(self, action_dict):
# Get flow information before action
processing_index = self.last_flow.processing_index
forward_to_eg = self.last_flow.forward_to_eg
previous_node_id = self.last_flow.current_node_id
flow_delay = self.last_flow.end2end_delay
# Extract numerical part of previous_node_id (e.g., extract '0' from 'pop0')
num_id = int(previous_node_id.replace('pop', ''))
# Get corresponding agent name (e.g., 'agent0' for 'pop0')
agent_name = f'agent{num_id}'
# Get the action of the corresponding agent
action = action_dict.get(agent_name)
# Apply action
nn_state, sim_state = self.wrapper.apply(action)
# Initialize an empty dictionary to hold observations
obs = {}
# Loop through all nodes up to num_agents
for i in range(self.num_agents):
# Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
node_id = f"pop{i}"
# Call process_node_state to get mm_state for the current node
mm_state = self.wrapper.process_node_state(sim_state, node_id)
# Store the mm_state in the obs dictionary, keyed by node_id
agent_id = f"agent{i}"
obs[agent_id] = mm_state
new_flow = sim_state.flow
sfc_len = len(sim_state.sfcs[self.last_flow.sfc])
# Set reward points
PROCESSED = 1 / sfc_len
EG_MOVED = -(self.last_flow.end2end_delay - flow_delay) / self.params.net_diameter
EG_KEPT = -1 / self.params.net_diameter
# This reward works by using the concept of aliasing and tracking the flow object in memory
if self.last_flow.success:
# If flow successful
reward = SUCCESS
if self.last_flow.dropped:
# If the flow was dropped
reward = DROPPED
if forward_to_eg:
if self.last_flow.current_node_id == self.last_flow.egress_node_id:
# Flow arrived at egress, wont ask for more decisions
reward = SUCCESS
if self.last_flow.current_node_id == previous_node_id:
# Flow stayed at the node
reward = EG_KEPT
# Flow moved
reward = EG_MOVED
# Flow is still processing
# if flow processed more
if self.last_flow.processing_index > processing_index:
if (
self.last_flow.current_node_id == self.last_flow.egress_node_id
) and (
self.last_flow.processing_index == sfc_len
# Flow was processed at last sf at egress node,
# but success wont be triggered as it will automatically depart
reward = SUCCESS
reward = PROCESSED
reward = MOVED
# Initialize an empty dictionary to hold rewards
rewards = {}
# Loop through all agents to initialize their rewards to 0
for i in range(self.num_agents):
agent_id = f"agent{i}"
rewards[agent_id] = 0
# Extract the numeric part from previous_node_id
numeric_part = int(previous_node_id.replace("pop", ""))
# Update the reward for the agent that matches the numeric part of previous_node_id
matching_agent_id = f"agent{numeric_part}"
if matching_agent_id in rewards:
rewards[matching_agent_id] = reward # Or set to another value of your choice
done = False
# Episode length is a set number of flows
self.episode_reward += reward
if not self.params.test_mode and >= 20000:
done = True
self.episode_reward_writer.writerow([self.episode_number, self.episode_reward])
self.steps += 1
# Set last flow to new flow. New actions will be generated for the new flow
self.last_flow = new_flow
# Create a dictionary to store any additional information
truncated = False
info = {'sim_time':}
return obs, reward, done, truncated, info
def reset(self, *, seed=None, options=None):
self.random_gen, _ = seeding.np_random()
if self.params.sim_seed is None:
sim_seed = self.unwrapped.np_random.integers(1, np.iinfo(np.int32).max)
sim_seed = self.params.sim_seed
nn_state, sim_state = self.wrapper.init(sim_seed)
self.steps = 0
self.episode_reward = 0
self.episode_number += 1
self.last_flow = sim_state.flow =
previous_node_id = self.last_flow.current_node_id
obs = {}
for i in range(self.num_agents):
# Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
node_id = f"pop{i}"
# Call process_node_state to get mm_state for the current node
mm_state = self.wrapper.process_node_state(sim_state, node_id)
agent_id = f"agent{i}"
obs[agent_id] = mm_state
self.terminateds = set()
self.truncateds = set()
return obs, {}
def get_dist_to_eg(network, flow):
""" Returns the distance to egress node in hops """
dist_to_egress = network.graph['shortest_paths'][(flow.current_node_id,
flow.egress_node_id)][1] # 1: delay; 2: hops
return dist_to_egress
def render(self, mode='cli'):
assert mode in ['human']
def seed(self, seed=None):
"""Sets the seed for this env's random number generator(s).
Some environments use multiple pseudorandom number generators.
We want to capture all such seeds used in order to ensure that
there aren't accidental correlations between multiple generators.
list<bigint>: Returns the list of seeds used in this env's random
number generators. The first value in the list should be the
"main" seed, or the value which a reproducer should pass to
'seed'. Often, the main seed equals the provided 'seed', but
this won't be true if seed=None, for example.
self.random_gen, seed = seeding.np_random()
return [seed]