How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I’m trying to migrate an environment from SB3 to RLLib in oder to enable multi agent functions. I’ve read this issue and think I am dealing with a similar problem, but I can’t for the live of me figure out what I need to change.
I’m assuming that something is wrong with how I am configuring my observation space - it’s meant to be dictionary with the agents as keys and the observations in form of the array mm_state as values.
If anyone has an idea on how to fix it or on how I could better troubleshoot this to find the issue, I would be very grateful.
ValueError: The two structures don't have the same nested structure.
First structure: type=ndarray str=[0. 1.]
Second structure: type=OrderedDict str=OrderedDict([('agent0', array([541.69055, 113.26023], dtype=float32)), ('agent1', array([ 69.30221, 792.6272 ], dtype=float32)), ('agent10', array([805.5128 , 642.98914], dtype=float32)), ('agent2', array([853.7121, 986.2091], dtype=float32)), ('agent3', array([485.2693 , 410.55228], dtype=float32)), ('agent4', array([244.2277 , 345.53674], dtype=float32)), ('agent5', array([974.2656, 700.9535], dtype=float32)), ('agent6', array([594.5883 , 883.23303], dtype=float32)), ('agent7', array([120.42753, 782.34705], dtype=float32)), ('agent8', array([956.6239, 536.2708], dtype=float32)), ('agent9', array([210.62961, 30.17688], dtype=float32))])
mm_state = np.concatenate(
(
flow_proc_percentage,
ttll
),
axis=None
).astype(np.float32)
return mm_state
import gymnasium as gym
from spr_rl.agent.params import Params
from gymnasium import spaces
from gymnasium.utils import seeding
from spr_rl.envs.wrapper import SPRSimWrapper
import numpy as np
import random
import csv
from ray.rllib.env.multi_agent_env import MultiAgentEnv
class SprEnv(MultiAgentEnv):
# Set this in SOME subclasses
metadata = {'render.modes': ['human']}
def __init__(self, config, render_mode=None):
print(config)
seed = config.get('seed', "4321")
simulator_config = config.get('sim_config', "inputs/config/simulator/mmpp-12-8.yaml")
network = config.get('network', "inputs/networks/interroute-in2-eg1-rand-cap0-2.graphml")
if network is None:
print("Warning: network is None.")
services = config.get('services', "inputs/services/abc-start_delay0.yaml")
training_duration = config.get('training_duration', "20000")
test_mode = config.get('test_mode')
sim_seed = config.get('sim_seed')
best = config.get('best')
self.params = Params(
seed=seed,
sim_config=simulator_config,
network=network,
services=services,
training_duration=training_duration,
test_mode=test_mode,
sim_seed=sim_seed,
best=best
)
print(self.params)
print(self.params.node_amount)
self.num_agents = 11
print(self.num_agents)
self.agents = {}
for i in range(self.num_agents):
agent_name = f'agent{i}' # e.g., agent0, agent1, ...
self.agents[i] = agent_name
self._agent_ids = set(self.agents)
self._spaces_in_preferred_format = True
self.terminateds = set()
self.truncateds = set()
observation_spaces = {}
# Loop through the number of agents to add their observation spaces
action_spaces = {}
for i in range(self.num_agents):
agent_name = f'agent{i}' # e.g., agent0, agent1, ...
observation_spaces[agent_name] = spaces.Box(-1, 1000, shape=(2,), dtype=np.float32)
action_spaces[agent_name] = spaces.Discrete(self.params.action_limit)
self.observation_space = gym.spaces.Dict(observation_spaces)
self.action_space = gym.spaces.Dict(action_spaces)
print(self.action_space)
self.wrapper = SPRSimWrapper(self.params)
self.episode_number = -1 # -1 here so that first episode reset call makes it 0
def step(self, action_dict):
# Get flow information before action
processing_index = self.last_flow.processing_index
forward_to_eg = self.last_flow.forward_to_eg
previous_node_id = self.last_flow.current_node_id
flow_delay = self.last_flow.end2end_delay
# Extract numerical part of previous_node_id (e.g., extract '0' from 'pop0')
num_id = int(previous_node_id.replace('pop', ''))
# Get corresponding agent name (e.g., 'agent0' for 'pop0')
agent_name = f'agent{num_id}'
# Get the action of the corresponding agent
action = action_dict.get(agent_name)
# Apply action
nn_state, sim_state = self.wrapper.apply(action)
# Initialize an empty dictionary to hold observations
obs = {}
# Loop through all nodes up to num_agents
for i in range(self.num_agents):
# Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
node_id = f"pop{i}"
# Call process_node_state to get mm_state for the current node
mm_state = self.wrapper.process_node_state(sim_state, node_id)
# Store the mm_state in the obs dictionary, keyed by node_id
agent_id = f"agent{i}"
obs[agent_id] = mm_state
print(obs)
new_flow = sim_state.flow
sfc_len = len(sim_state.sfcs[self.last_flow.sfc])
# Set reward points
SUCCESS = 10
PROCESSED = 1 / sfc_len
EG_MOVED = -(self.last_flow.end2end_delay - flow_delay) / self.params.net_diameter
EG_KEPT = -1 / self.params.net_diameter
DROPPED = -10
MOVED = 0
# This reward works by using the concept of aliasing and tracking the flow object in memory
if self.last_flow.success:
# If flow successful
reward = SUCCESS
else:
if self.last_flow.dropped:
# If the flow was dropped
reward = DROPPED
else:
if forward_to_eg:
if self.last_flow.current_node_id == self.last_flow.egress_node_id:
# Flow arrived at egress, wont ask for more decisions
reward = SUCCESS
else:
if self.last_flow.current_node_id == previous_node_id:
# Flow stayed at the node
reward = EG_KEPT
else:
# Flow moved
reward = EG_MOVED
else:
# Flow is still processing
# if flow processed more
if self.last_flow.processing_index > processing_index:
if (
self.last_flow.current_node_id == self.last_flow.egress_node_id
) and (
self.last_flow.processing_index == sfc_len
):
# Flow was processed at last sf at egress node,
# but success wont be triggered as it will automatically depart
reward = SUCCESS
else:
reward = PROCESSED
else:
reward = MOVED
# Initialize an empty dictionary to hold rewards
rewards = {}
# Loop through all agents to initialize their rewards to 0
for i in range(self.num_agents):
agent_id = f"agent{i}"
rewards[agent_id] = 0
# Extract the numeric part from previous_node_id
numeric_part = int(previous_node_id.replace("pop", ""))
# Update the reward for the agent that matches the numeric part of previous_node_id
matching_agent_id = f"agent{numeric_part}"
if matching_agent_id in rewards:
rewards[matching_agent_id] = reward # Or set to another value of your choice
done = False
# Episode length is a set number of flows
self.episode_reward += reward
if not self.params.test_mode and self.wrapper.simulator.env.now >= 20000:
done = True
self.episode_reward_writer.writerow([self.episode_number, self.episode_reward])
self.steps += 1
# Set last flow to new flow. New actions will be generated for the new flow
self.last_flow = new_flow
# Create a dictionary to store any additional information
truncated = False
info = {'sim_time': self.wrapper.simulator.env.now}
return obs, reward, done, truncated, info
def reset(self, *, seed=None, options=None):
self.random_gen, _ = seeding.np_random()
if self.params.sim_seed is None:
sim_seed = self.unwrapped.np_random.integers(1, np.iinfo(np.int32).max)
else:
sim_seed = self.params.sim_seed
nn_state, sim_state = self.wrapper.init(sim_seed)
self.steps = 0
self.episode_reward = 0
self.episode_number += 1
self.last_flow = sim_state.flow
self.network = sim_state.network
previous_node_id = self.last_flow.current_node_id
obs = {}
for i in range(self.num_agents):
# Construct the node_id (assuming nodes are named 'pop0', 'pop1', ...)
node_id = f"pop{i}"
# Call process_node_state to get mm_state for the current node
mm_state = self.wrapper.process_node_state(sim_state, node_id)
agent_id = f"agent{i}"
obs[agent_id] = mm_state
self.terminateds = set()
self.truncateds = set()
return obs, {}
@staticmethod
def get_dist_to_eg(network, flow):
""" Returns the distance to egress node in hops """
dist_to_egress = network.graph['shortest_paths'][(flow.current_node_id,
flow.egress_node_id)][1] # 1: delay; 2: hops
return dist_to_egress
def render(self, mode='cli'):
assert mode in ['human']
def seed(self, seed=None):
"""Sets the seed for this env's random number generator(s).
Note:
Some environments use multiple pseudorandom number generators.
We want to capture all such seeds used in order to ensure that
there aren't accidental correlations between multiple generators.
Returns:
list<bigint>: Returns the list of seeds used in this env's random
number generators. The first value in the list should be the
"main" seed, or the value which a reproducer should pass to
'seed'. Often, the main seed equals the provided 'seed', but
this won't be true if seed=None, for example.
"""
self.random_gen, seed = seeding.np_random()
return [seed]