Multiagent environment crashes when more than two agents are initiated

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi everyone,

Im trying to implement a multiagent environment with PPO.

For some reason, when I try my implementation considering only two agents, I do not get any problem (for the tested number of train rounds), however, when I consider more than two agents, suddenly I get an error saying that my structures don’t have the same number of elements and the particualr worker crashes.

I have the feeling that I’m doing something wrong, but so far I’m not being able to figure out what it is.

I thought that it was because I don’t return the right dictionaries at every step but I think I do -only active agents. I’m not sure if this a problem because of my Repeated space or because I missing something basic.

Below is part of the code I’m running that gives me the error with more than two agents.

I appreciate if someone has some comment on it (y)

class MultiAgentBaseEnv(MultiAgentEnv, ABC):

def __init__(self, *args, **kwargs):
    self._initial_conditions()
    self._agent_ids = set(self.agents)

@abstractmethod
def _initial_conditions(self):
    pass
@abstractmethod
def _get_agent_observation(self, agent_id):
    pass
@abstractmethod
def _get_agent_reward(self, agent_id):
    pass
@abstractmethod
def _get_agent_done(self, agent_id):
    pass
@abstractmethod
def _step(self, action):
    pass

@abstractmethod
def _are_agents_done(self,dones,acting_agent_ids):
    pass

def _render(self,*args, **kwargs):
    pass

def reset(self):
    self._initial_conditions()
    self._agent_ids = set(self.agents)
    return self.agent_observations(self._agent_ids)

def _get_property(self, property_name : str, agent_ids : list = None):
    agent_ids = self._agent_ids if agent_ids is None else agent_ids
    if property_name == "observations":
        property_fnc = self._get_agent_observation
    elif property_name == "rewards":
        property_fnc = self._get_agent_reward
    elif property_name == "dones":
        property_fnc = self._get_agent_done
    elif property_name == "action_sample":
        property_fnc = lambda x : self.action_space.sample()
    elif property_name == "observation_sample":
        property_fnc = lambda x : self.observation_space.sample()
    else:
        raise('Unsupported Property')
    return {i : property_fnc(i) for i in agent_ids if i != "__all__"}

def agent_observations(self, agent_ids):
    agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
    return self._get_property("observations", agent_ids)

def agent_rewards(self, agent_ids):
    agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
    return self._get_property("rewards", agent_ids)

def agent_dones(self, agent_ids):
    agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
    return self._get_property("dones", agent_ids)

def _acting_agents(self,action):
    return [key for key in action.keys() if key != "__all__"]

def step(self, action):
    self._step(action)
    acting_agent_ids = self._acting_agents(action)
    dones = self.agent_dones(acting_agent_ids)
    dones["__all__"] = self._are_agents_done(dones,acting_agent_ids)
    observations = self.agent_observations(acting_agent_ids)
    rewards = self.agent_rewards(acting_agent_ids)

    return observations, rewards, dones, self.info

def render(self, *args, **kwargs):
    self._render(*args, **kwargs)

def action_space_sample(self, agent_ids: list = None):
    agent_ids = self._agent_ids if agent_ids is None else agent_ids
    agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
    return self._get_property("action_sample",agent_ids)

def action_space_contains(self, actions: dict) -> bool:
    return all([self.action_space.contains(action) for key,action in actions.items() if key != "__all__"])

def observation_space_sample(self, agent_ids : list = None):
    agent_ids = self._agent_ids if agent_ids is None else agent_ids
    agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
    return self._get_property("observation_sample",agent_ids)

def observation_space_contains(self, obs: dict) -> bool:
    return all([self.observation_space.contains(ob) for ob in obs.values()])

`

And this the class that I use for training
`
class SomeClassMulti(MultiAgentBaseEnv):

 def __init__(self, env_config : dict):

   # <dumping env confi file > #
   # -------
   # -------
   # -------

  self.action_space = spaces.Dict(
        {
            "a" : spaces.Box(low=-1,high=1,shape=(1,)),
            "b" : spaces.Box(low=-1,high=1,shape=(1,)),
            "c" : spaces.Discrete(self.n_mod)
        }
    )

    self.observation_space = spaces.Dict(
        {
            "action_history" : Repeated(
                spaces.Dict(
                    {
                        "a" : spaces.Box(low=-1,high=1,shape=(1,)),
                        "b" : spaces.Box(low=-1,high=1,shape=(1,)),
                        "c" : spaces.Discrete(self.n_mod)
                    }
                ),
                max_len=self.N_max
            ),
            "avg_neighbors_history" : Repeated(
                spaces.Dict(
                    {
                        "a" : spaces.Box(low=-1,high=1,shape=(1,)),
                        "b" : spaces.Box(low=-1,high=1,shape=(1,)),
                        "c" : spaces.Box(low=0,high=self.n_mod,shape=(1,))
                    }
                ),
                max_len=self.N_max
            )
        }
    )
    super().__init__()        

def _initial_conditions(self):
    self.agents = {i : {key : [] for key in self.observation_space.sample()} for i in range(self.n_agents)}
    self.things= {i : Something(self.ParamA, self.lt, params= {"a" : [], "b" : [], "c" : []}) for i in range(self.n_agents)}
    self.info = {}
    self._curr_reward = None

def _get_agent_observation(self, agent_id):
    return self.agents[agent_id]

def _get_agent_reward(self, agent_id):
    return self._curr_reward

def _get_agent_done(self, agent_id):
    return self.things[agent_id].n == self.N_max

def _are_agents_done(self,dones,acting_agent_ids):
    duration_exceeded = sum([self.things[agent_id].duration for agent_id in self.things.keys()]) >= self.MAX
    return all(dones.values()) or duration_exceeded

def _step(self, actions):
    scaling = self.n_agents- 1
    acting_agent_ids = self._acting_agents(actions)
    action_aggregator = { key : np.array([0.0],dtype=np.float32) for key in self.action_space.sample().keys()}

    for agent_id in acting_agent_ids:
        action = actions[agent_id]
        a= self._get_a(action["a"])
        if self.waveforms[agent_id].n_pulses == 0:
            a= 0
        b= self._get_b(action["b"])
        c= self._get_c(action["c"])
        self.things[agent_id].add(a,b,c)
        self.agents[agent_id]["action_history"].append(action)

        action_aggregator = {key : value + action[key]/scaling for key,value in action_aggregator.items()}
      
    for agent_id in acting_agent_ids:
        action = actions[agent_id]
        avg_neighbors_history = {key : value - action[key]/scaling for key,value in action_aggregator.items()}
        self.agents[agent_id]["avg_neighbors_history"].append(avg_neighbors_history)

    dones = self.agent_dones(acting_agent_ids)
    episode_done = self._are_agents_done(dones,[])
    if episode_done:
        target= self.target()
        self._curr_reward = target
    else:
        self._curr_reward = 0

def _get_a(self, x):
    return float(self.a_min +  (self.a_max - self.a_min)*(x + 1)/2)

def _get_b(self, x):
    return float(self.b_min + (self.b_max - self.b_min)*(x + 1)/2)

def _get_c(self,x):
    return x

def target(self,targetMax):
    output = sum([sum(thing.params['a']) for thing in self.things.values()])

def _render(self, *args, **kwargs):
    raise ValueError("Not implemented")

`

And this part of how I run the environment:

config = {
“train_batch_size”: 256, # 4000
“rollout_fragment_length” : 42, # 200
“lr”: 0.001, # 5e-5
“env”: env_to_use,
“env_config”: env_config,
“model”: {
“fcnet_hiddens”: [32, 16],
“fcnet_activation”: “relu”,
},
“num_workers”: 8,
“framework”: “torch”,
“log_level”: “ERROR”,
“disable_env_checking” : False,
}

ppo_config = DEFAULT_CONFIG.copy()
ppo_config.update(config)
trainer = PPOTrainer(config=ppo_config, env=env_to_use)

num_epochs = 100

for run_index in range(num_epochs):
print(f"Starting run {run_index}…“)
if run_index%10 == 0:
path = trainer.save()
start = time.time()
result = trainer.train()
end = time.time()
print(f"Training time: {end-start} [s]”)

I think the code is incomplete. I can’t find self.n_agents.
Can you please paste a complete script that is also well formatted such that someone else can simply copy, paste and run it?