How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hi everyone,
Im trying to implement a multiagent environment with PPO.
For some reason, when I try my implementation considering only two agents, I do not get any problem (for the tested number of train rounds), however, when I consider more than two agents, suddenly I get an error saying that my structures don’t have the same number of elements and the particualr worker crashes.
I have the feeling that I’m doing something wrong, but so far I’m not being able to figure out what it is.
I thought that it was because I don’t return the right dictionaries at every step but I think I do -only active agents. I’m not sure if this a problem because of my Repeated space or because I missing something basic.
Below is part of the code I’m running that gives me the error with more than two agents.
I appreciate if someone has some comment on it (y)
class MultiAgentBaseEnv(MultiAgentEnv, ABC):
def __init__(self, *args, **kwargs):
self._initial_conditions()
self._agent_ids = set(self.agents)
@abstractmethod
def _initial_conditions(self):
pass
@abstractmethod
def _get_agent_observation(self, agent_id):
pass
@abstractmethod
def _get_agent_reward(self, agent_id):
pass
@abstractmethod
def _get_agent_done(self, agent_id):
pass
@abstractmethod
def _step(self, action):
pass
@abstractmethod
def _are_agents_done(self,dones,acting_agent_ids):
pass
def _render(self,*args, **kwargs):
pass
def reset(self):
self._initial_conditions()
self._agent_ids = set(self.agents)
return self.agent_observations(self._agent_ids)
def _get_property(self, property_name : str, agent_ids : list = None):
agent_ids = self._agent_ids if agent_ids is None else agent_ids
if property_name == "observations":
property_fnc = self._get_agent_observation
elif property_name == "rewards":
property_fnc = self._get_agent_reward
elif property_name == "dones":
property_fnc = self._get_agent_done
elif property_name == "action_sample":
property_fnc = lambda x : self.action_space.sample()
elif property_name == "observation_sample":
property_fnc = lambda x : self.observation_space.sample()
else:
raise('Unsupported Property')
return {i : property_fnc(i) for i in agent_ids if i != "__all__"}
def agent_observations(self, agent_ids):
agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
return self._get_property("observations", agent_ids)
def agent_rewards(self, agent_ids):
agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
return self._get_property("rewards", agent_ids)
def agent_dones(self, agent_ids):
agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
return self._get_property("dones", agent_ids)
def _acting_agents(self,action):
return [key for key in action.keys() if key != "__all__"]
def step(self, action):
self._step(action)
acting_agent_ids = self._acting_agents(action)
dones = self.agent_dones(acting_agent_ids)
dones["__all__"] = self._are_agents_done(dones,acting_agent_ids)
observations = self.agent_observations(acting_agent_ids)
rewards = self.agent_rewards(acting_agent_ids)
return observations, rewards, dones, self.info
def render(self, *args, **kwargs):
self._render(*args, **kwargs)
def action_space_sample(self, agent_ids: list = None):
agent_ids = self._agent_ids if agent_ids is None else agent_ids
agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
return self._get_property("action_sample",agent_ids)
def action_space_contains(self, actions: dict) -> bool:
return all([self.action_space.contains(action) for key,action in actions.items() if key != "__all__"])
def observation_space_sample(self, agent_ids : list = None):
agent_ids = self._agent_ids if agent_ids is None else agent_ids
agent_ids = [agent_ids] if not hasattr(agent_ids,"__len__") else agent_ids
return self._get_property("observation_sample",agent_ids)
def observation_space_contains(self, obs: dict) -> bool:
return all([self.observation_space.contains(ob) for ob in obs.values()])
`
And this the class that I use for training
`
class SomeClassMulti(MultiAgentBaseEnv):
def __init__(self, env_config : dict):
# <dumping env confi file > #
# -------
# -------
# -------
self.action_space = spaces.Dict(
{
"a" : spaces.Box(low=-1,high=1,shape=(1,)),
"b" : spaces.Box(low=-1,high=1,shape=(1,)),
"c" : spaces.Discrete(self.n_mod)
}
)
self.observation_space = spaces.Dict(
{
"action_history" : Repeated(
spaces.Dict(
{
"a" : spaces.Box(low=-1,high=1,shape=(1,)),
"b" : spaces.Box(low=-1,high=1,shape=(1,)),
"c" : spaces.Discrete(self.n_mod)
}
),
max_len=self.N_max
),
"avg_neighbors_history" : Repeated(
spaces.Dict(
{
"a" : spaces.Box(low=-1,high=1,shape=(1,)),
"b" : spaces.Box(low=-1,high=1,shape=(1,)),
"c" : spaces.Box(low=0,high=self.n_mod,shape=(1,))
}
),
max_len=self.N_max
)
}
)
super().__init__()
def _initial_conditions(self):
self.agents = {i : {key : [] for key in self.observation_space.sample()} for i in range(self.n_agents)}
self.things= {i : Something(self.ParamA, self.lt, params= {"a" : [], "b" : [], "c" : []}) for i in range(self.n_agents)}
self.info = {}
self._curr_reward = None
def _get_agent_observation(self, agent_id):
return self.agents[agent_id]
def _get_agent_reward(self, agent_id):
return self._curr_reward
def _get_agent_done(self, agent_id):
return self.things[agent_id].n == self.N_max
def _are_agents_done(self,dones,acting_agent_ids):
duration_exceeded = sum([self.things[agent_id].duration for agent_id in self.things.keys()]) >= self.MAX
return all(dones.values()) or duration_exceeded
def _step(self, actions):
scaling = self.n_agents- 1
acting_agent_ids = self._acting_agents(actions)
action_aggregator = { key : np.array([0.0],dtype=np.float32) for key in self.action_space.sample().keys()}
for agent_id in acting_agent_ids:
action = actions[agent_id]
a= self._get_a(action["a"])
if self.waveforms[agent_id].n_pulses == 0:
a= 0
b= self._get_b(action["b"])
c= self._get_c(action["c"])
self.things[agent_id].add(a,b,c)
self.agents[agent_id]["action_history"].append(action)
action_aggregator = {key : value + action[key]/scaling for key,value in action_aggregator.items()}
for agent_id in acting_agent_ids:
action = actions[agent_id]
avg_neighbors_history = {key : value - action[key]/scaling for key,value in action_aggregator.items()}
self.agents[agent_id]["avg_neighbors_history"].append(avg_neighbors_history)
dones = self.agent_dones(acting_agent_ids)
episode_done = self._are_agents_done(dones,[])
if episode_done:
target= self.target()
self._curr_reward = target
else:
self._curr_reward = 0
def _get_a(self, x):
return float(self.a_min + (self.a_max - self.a_min)*(x + 1)/2)
def _get_b(self, x):
return float(self.b_min + (self.b_max - self.b_min)*(x + 1)/2)
def _get_c(self,x):
return x
def target(self,targetMax):
output = sum([sum(thing.params['a']) for thing in self.things.values()])
def _render(self, *args, **kwargs):
raise ValueError("Not implemented")
`
And this part of how I run the environment:
config = {
“train_batch_size”: 256, # 4000
“rollout_fragment_length” : 42, # 200
“lr”: 0.001, # 5e-5
“env”: env_to_use,
“env_config”: env_config,
“model”: {
“fcnet_hiddens”: [32, 16],
“fcnet_activation”: “relu”,
},
“num_workers”: 8,
“framework”: “torch”,
“log_level”: “ERROR”,
“disable_env_checking” : False,
}
ppo_config = DEFAULT_CONFIG.copy()
ppo_config.update(config)
trainer = PPOTrainer(config=ppo_config, env=env_to_use)
num_epochs = 100
for run_index in range(num_epochs):
print(f"Starting run {run_index}…“)
if run_index%10 == 0:
path = trainer.save()
start = time.time()
result = trainer.train()
end = time.time()
print(f"Training time: {end-start} [s]”)