I am trying to implement a MAPPO Multi-Agent boid environment.
Using Gym and Ray Rllib.
Whenever I run the training file it gives the error
(PPO pid=24516) NotImplementedError: Unsupported args: Box(-5.0, 5.0, (2,), float32) None [repeated 4x across cluster]
(RolloutWorker pid=22872) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=22872, ip=127.0.0.1, actor_id=a88ff9d62147059f0922a61b01000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x000001B3C8AEE960>)
I tried multiple different methods. The spaces are to be from one agent’s perspective.
Environment:
import json
import numpy as np
from gym import spaces
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from Settings import *
class Agent:
def __init__(self, position):
self.position = np.array(position, dtype=float)
self.acceleration = np.zeros(2)
self.max_acceleration = SimulationVariables["AccelerationUpperLimit"]
self.velocity = np.round(np.random.uniform(-SimulationVariables["VelocityUpperLimit"], SimulationVariables["VelocityUpperLimit"], size=2), 2)
self.max_velocity = SimulationVariables["VelocityUpperLimit"]
def update(self, action):
self.acceleration += action
self.acceleration = np.clip(self.acceleration, -self.max_acceleration, self.max_acceleration)
self.velocity += self.acceleration * SimulationVariables["dt"]
vel = np.linalg.norm(self.velocity)
if vel > self.max_velocity:
self.velocity = (self.velocity / vel) * self.max_velocity
self.position += self.velocity * SimulationVariables["dt"]
return self.position, self.velocity
class FlockingEnv(MultiAgentEnv):
def __init__(self):
super(FlockingEnv, self).__init__()
self.episode=0
self.counter=1
self.CTDE=False
self.current_timestep = 0
self.reward_log = []
self.agents = [Agent(position) for position in self.read_agent_locations()]
# Per agent action and observation space
# Define single-agent action and observation space
self.action_space = spaces.Box(low=np.array([-5, -5], dtype=np.float32),
high=np.array([5, 5], dtype=np.float32),
dtype=np.float32)
self.observation_space = spaces.Box(low=np.array([-np.inf, -np.inf, -2.5, -2.5], dtype=np.float32),
high=np.array([np.inf, np.inf, 2.5, 2.5], dtype=np.float32),
dtype=np.float32)
self.agents_ids = [f"agent_{i}" for i in range(len(self.agents))]
# def step(self, action_dict):
# observations = {}
# rewards = {}
# dones = {"__all__": False}
# infos = {}
# # Apply noisy actions
# for agent_id, action in action_dict.items():
# noisy_action = action + np.random.normal(loc=0, scale=0.5, size=action.shape)
# action = np.clip(noisy_action, self.action_space.low, self.action_space.high)
# position, velocity = self.agents[agent_id].update(action)
# observations[agent_id] = np.concatenate([position, velocity])
# # Calculate reward and check collisions
# reward, out_of_flock = self.calculate_reward(self.agents[agent_id])
# rewards[agent_id] = reward
# if self.check_collision(self.agents[agent_id]) or out_of_flock:
# dones["__all__"] = True # End episode if a collision occurs
# self.reset()
# self.current_timestep += 1
# return observations, rewards, dones, infos
def step(self, action_dict):
observations = {}
rewards = {}
dones = {"__all__": False}
infos = {}
# Apply noisy actions and update each agent's state
for agent_id, action in action_dict.items():
# Apply noise to actions and update agent's state
noisy_action = action + np.random.normal(loc=0, scale=0.5, size=action.shape)
action = np.clip(noisy_action, self.agent_action_space.low, self.agent_action_space.high)
# Update agent position and velocity
position, velocity = self.agents[agent_id].update(action)
# Update observation for each agent
observations[agent_id] = np.concatenate([position, velocity])
# Calculate reward
reward, out_of_flock = self.calculate_reward(self.agents[agent_id])
rewards[agent_id] = reward
# If there's a collision or agent is out of flock, mark the episode as done
if self.check_collision(self.agents[agent_id]) or out_of_flock:
dones["__all__"] = True # End episode if a collision occurs
# Reset environment for all agents
self.reset()
# Update the time step count
self.current_timestep += 1
return observations, rewards, dones, infos
# def reset(self):
# self.agents = {f"agent_{i}": Agent(position) for i, position in enumerate(self.read_agent_locations())}
# observations = {}
# for agent_id, agent in self.agents.items():
# agent.acceleration = np.zeros(2)
# agent.velocity = np.round(np.random.uniform(-SimulationVariables["VelocityUpperLimit"], SimulationVariables["VelocityUpperLimit"], size=2), 2)
# observations[agent_id] = np.concatenate([agent.position, agent.velocity])
# return observations
def reset(self):
# Reset all agents
self.agents = {f"agent_{i}": Agent(position) for i, position in enumerate(self.read_agent_locations())}
# Initialize observations for each agent
observations = {}
for agent_id, agent in self.agents.items():
agent.acceleration = np.zeros(2)
agent.velocity = np.round(np.random.uniform(-SimulationVariables["VelocityUpperLimit"], SimulationVariables["VelocityUpperLimit"], size=2), 2)
observations[agent_id] = np.concatenate([agent.position, agent.velocity])
return observations
def close(self):
print("Environment is closed. Cleanup complete.")
def simulate_agents(self, actions):
observations = [] # Initialize an empty 1D array
actions_reshaped = actions.reshape(((SimulationVariables["SimAgents"]), 2))
for i, agent in enumerate(self.agents):
position, velocity = agent.update(actions_reshaped[i])
observation_pair = np.concatenate([position, velocity])
observations = np.concatenate([observations, observation_pair]) # Concatenate each pair directly
return observations
def check_collision(self, agent):
for other in self.agents:
if agent != other:
distance = np.linalg.norm(agent.position - other.position)
if distance < SimulationVariables["SafetyRadius"]:
return True
return False
def get_observation(self):
observations = np.zeros((len(self.agents), 4), dtype=np.float32)
for i, agent in enumerate(self.agents):
observations[i] = [
agent.position[0],
agent.position[1],
agent.velocity[0],
agent.velocity[1]
]
# Reshape the observation into 1D
return observations
def get_closest_neighbors(self, agent):
neighbor_positions=[]
neighbor_velocities=[]
for _, other in enumerate(self.agents):
if agent != other:
distance = np.linalg.norm(other.position - agent.position)
if(self.CTDE == True):
################################################################
# if distance < SimulationVariables["NeighborhoodRadius"]:
# neighbor_positions.append(other.position)
# neighbor_velocities.append(other.velocity)
################################################################
neighbor_positions.append(other.position)
neighbor_velocities.append(other.velocity)
else:
neighbor_positions.append(other.position)
neighbor_velocities.append(other.velocity)
return neighbor_positions, neighbor_velocities
# def calculate_reward(self):
# reward=0
# Collisions={}
# neighbor_positions = [[] for _ in range(len(self.agents))]
# neighbor_velocities = [[] for _ in range(len(self.agents))]
# out_of_flock=False
# for idx, _ in enumerate(self.agents):
# Collisions[idx] = []
# for _, agent in enumerate(self.agents):
# neighbor_positions, neighbor_velocities=self.get_closest_neighbors(agent)
# val, out_of_flock=self.reward(agent, neighbor_velocities, neighbor_positions)
# reward+=val
# return reward, out_of_flock
def calculate_reward(self, agent):
reward = 0
out_of_flock = False
# Get closest neighbors for a single agent
neighbor_positions, neighbor_velocities = self.get_closest_neighbors(agent)
# Calculate reward for the single agent
reward, out_of_flock = self.reward(agent, neighbor_velocities, neighbor_positions)
return reward, out_of_flock
def reward(self, agent, neighbor_velocities, neighbor_positions):
CohesionReward = 0
AlignmentReward = 0
total_reward = 0
outofflock = False
midpoint = (SimulationVariables["SafetyRadius"] + SimulationVariables["NeighborhoodRadius"]) / 2
if len(neighbor_positions) > 0:
for neighbor_position in neighbor_positions:
distance = np.linalg.norm(agent.position - neighbor_position)
if distance <= SimulationVariables["SafetyRadius"]:
CohesionReward += -10
elif SimulationVariables["SafetyRadius"] < distance <= midpoint:
CohesionReward += (10 / (midpoint - SimulationVariables["SafetyRadius"])) * (distance - SimulationVariables["SafetyRadius"])
elif midpoint < distance <= SimulationVariables["NeighborhoodRadius"]:
CohesionReward += 10 - (10 / (SimulationVariables["NeighborhoodRadius"] - midpoint)) * (distance - midpoint)
average_velocity = np.mean(neighbor_velocities, axis=0)
dot_product = np.dot(average_velocity, agent.velocity)
norm_product = np.linalg.norm(average_velocity) * np.linalg.norm(agent.velocity)
if norm_product == 0:
cos_angle = 1.0
else:
cos_angle = dot_product / norm_product
cos_angle = np.clip(cos_angle, -1.0, 1.0)
orientation_diff = np.arccos(cos_angle)
alignment = (orientation_diff / np.pi)
AlignmentReward = -20 * alignment + 10
else:
CohesionReward -= 10
outofflock = True
total_reward = CohesionReward + AlignmentReward
return total_reward, outofflock
def read_agent_locations(self):
File = rf"{Results['InitPositions']}" + str(self.counter) + "\config.json"
with open(File, "r") as f:
data = json.load(f)
return data
Training File:
import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPOConfig
from env import FlockingEnv
from Settings import *
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from collections import deque
import numpy as np
# Config and kwargs not being used
class LossCallback(DefaultCallbacks):
def __init__(self, loss_threshold=2000):
super().__init__()
self.loss_threshold = loss_threshold
def on_train_result(self, *, trainer, result, **kwargs):
# Assuming "policy_loss" or similar is in the result dict
# Adjust this key according to your setup
if 'policy_loss' in result['info']:
policy_loss = result['info']['policy_loss']
# Stopping if the average policy loss is below threshold
if policy_loss < self.loss_threshold:
print(f"Stopping training as loss ({policy_loss}) is below {self.loss_threshold}")
result['done'] = True # This will signal to stop the training
# Custom callback to handle replay buffer
class ReplayBufferCallback(DefaultCallbacks):
def __init__(self, replay_buffer):
super().__init__()
self.replay_buffer = replay_buffer
def on_postprocess_trajectory(self, *, worker, episode, agent_id, policies, postprocessed_batch, **kwargs):
# Extract data from the postprocessed batch
observations = postprocessed_batch["obs"]
actions = postprocessed_batch["actions"]
rewards = postprocessed_batch["rewards"]
next_observations = postprocessed_batch["new_obs"]
dones = postprocessed_batch["dones"]
for obs, action, reward, next_obs, done in zip(observations, actions, rewards, next_observations, dones):
self.replay_buffer.add(obs, action, reward, next_obs, done)
class ReplayBuffer:
def __init__(self, capacity, observation_space, action_space):
self.capacity = capacity
self.observation_space = observation_space
self.action_space = action_space
self.buffer = deque(maxlen=capacity)
self.size = 0
def add(self, observation, action, reward, next_observation, done):
self.buffer.append((observation, action, reward, next_observation, done))
self.size = min(self.size + 1, self.capacity)
def sample(self, batch_size):
indices = np.random.randint(0, self.size, size=batch_size)
batch = [self.buffer[i] for i in indices]
observations, actions, rewards, next_observations, dones = zip(*batch)
return {
'observations': np.array(observations),
'actions': np.array(actions),
'rewards': np.array(rewards),
'next_observations': np.array(next_observations),
'dones': np.array(dones)
}
def __len__(self):
return self.size
ray.init()
# Create an instance of the environment to access spaces
env = FlockingEnv()
replay_buffer = ReplayBuffer(
capacity=100000, # Adjust capacity as needed
observation_space=env.observation_space,
action_space=env.action_space
)
# Register the environment if not already registered
env_name = "flocking_env"
tune.register_env(env_name, lambda config: FlockingEnv())
policy_ids = [f"agent_{i}" for i in range(3)] # Adjust the number based on the number of agents
config = PPOConfig() \
.environment(env_name) \
.framework("torch") \
.multi_agent(
policies={
policy_id: (
None,
env.observation_space,
env.action_space,
{
"model": {
"vf_share_layers": True,
"fcnet_hiddens": [512, 512, 512, 512, 512, 512, 512, 512],
"fcnet_activation": "tanh",
},
}
)
for policy_id in policy_ids
},
policy_mapping_fn=lambda agent_id, **kwargs: agent_id,
) \
.rollouts(num_rollout_workers=2) \
.training(
train_batch_size=2048,
sgd_minibatch_size=64,
num_sgd_iter=10
)
# Example of running the training
tune.run(
"PPO",
config=config.to_dict(), # Convert the config to a dictionary
stop={"timesteps_total": SimulationVariables["LearningTimeSteps"]}, # Train for 2 million timesteps
# callbacks=[ReplayBufferCallback(replay_buffer)] # Add custom callback
)
# Shutdown Ray
ray.shutdown()
# tune.run(
# PPO,
# config=config, # No need to convert to dict
# stop={"episode_reward_mean": 100}, # Adjust this based on your environment
# )
# tune.run(
# PPO,
# config=config,
# stop={"training_iteration": 100}, # Optional stop condition if loss threshold isn't hit
# callbacks=[LossCallback(loss_threshold=2000)]
# )
Stack trace:
Trial status: 1 ERROR
Current time: 2024-09-13 23:19:29. Total running time: 17s
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
╭─────────────────────────────────────────╮
│ Trial name status │
├─────────────────────────────────────────┤
│ PPO_flocking_env_ad2bc_00000 ERROR │
╰─────────────────────────────────────────╯Number of errored trials: 1
╭──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Trial name # failures error file
│
├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ PPO_flocking_env_ad2bc_00000 1 C:/Users/Cr7th/AppData/Local/Temp/ray/session_2024-09-13_23-19-07_213600_12872/artifacts/2024-09-13_23-19-12/PPO_2024-09-13_23-19-12/driver_artifacts/PPO_flocking_env_ad2bc_00000_0_2024-09-13_23-19-12/error.txt │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯Traceback (most recent call last):
File “D:\MARL Flocking\Train.py”, line 117, in
tune.run(
File “C:\Python312\Lib\site-packages\ray\tune\tune.py”, line 1035, in run
raise TuneError(“Trials did not complete”, incomplete_trials)
ray.tune.error.TuneError: (‘Trials did not complete’, [PPO_flocking_env_ad2bc_00000])
(PPO pid=24516) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::PPO.init() (pid=24516, ip=127.0.0.1, actor_id=6981d82e454ec16aef59f8be01000000, repr=PPO)
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\env\env_runner_group.py”, line 259, in _setup
(PPO pid=24516) self.add_workers(
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\env\env_runner_group.py”, line 792, in add_workers
(PPO pid=24516) raise result.get()
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\utils\actor_manager.py”, line 500, in _fetch_result
(PPO pid=24516) result = ray.get(ready)
(PPO pid=24516) ^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\auto_init_hook.py”, line 21, in auto_init_wrapper
(PPO pid=24516) return fn(*args, **kwargs)
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\client_mode_hook.py”, line 103, in wrapper
(PPO pid=24516) return func(*args, **kwargs)
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\worker.py”, line 2661, in get
(PPO pid=24516) values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\worker.py”, line 873, in get_objects
(PPO pid=24516) raise value
(PPO pid=24516) ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=22880, ip=127.0.0.1, actor_id=c5e76cb89068f11352c7d86c01000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x000002E29D31E6C0>)
(PPO pid=24516) TorchPolicyV2.init(
(PPO pid=24516)
(PPO pid=24516) During handling of the above exception, another exception occurred:
(PPO pid=24516)
(PPO pid=24516) ray::PPO.init() (pid=24516, ip=127.0.0.1, actor_id=6981d82e454ec16aef59f8be01000000, repr=PPO)
(PPO pid=24516) super().init(
(PPO pid=24516) self.setup(copy.deepcopy(self.config))
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\algorithms\algorithm.py”, line 659, in setup
(PPO pid=24516) self.env_runner_group = EnvRunnerGroup(
(PPO pid=24516) ^^^^^^^^^^^^^^^
(PPO pid=24516) raise e.args[0].args[2]
(PPO pid=24516) File “python\ray_raylet.pyx”, line 1858, in ray._raylet.execute_task [repeated 6x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see Configuring Logging — Ray 3.0.0.dev0 for more options.)
(PPO pid=24516) File “python\ray_raylet.pyx”, line 1799, in ray._raylet.execute_task.function_executor [repeated 4x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\function_manager.py”, line 691, in actor_method_executor [repeated 4x across cluster]
(PPO pid=24516) return method(__ray_actor, *args, **kwargs) [repeated 4x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 4x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\util\tracing\tracing_helper.py”, line 467, in _resume_span [repeated 11x across cluster]
(PPO pid=24516) return method(self, *_args, **_kwargs) [repeated 11x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 11x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\env\env_runner_group.py”, line 211, in init [repeated 12x across cluster]
(PPO pid=24516) self._update_policy_map(policy_dict=self.policy_dict) [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\evaluation\rollout_worker.py”, line 1744, in _update_policy_map [repeated 3x across cluster]
(PPO pid=24516) self._build_policy_map( [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\evaluation\rollout_worker.py”, line 1855, in _build_policy_map [repeated 3x across cluster]
(PPO pid=24516) new_policy = create_policy_for_framework( [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\utils\policy.py”, line 108, in create_policy_for_framework [repeated 3x across cluster]
(PPO pid=24516) return policy_class(observation_space, action_space, merged_config) [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) model, dist_class = self._init_model_and_dist_class() [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\policy\torch_policy_v2.py”, line 512, in _init_model_and_dist_class [repeated 3x across cluster]
(PPO pid=24516) dist_class, logit_dim = ModelCatalog.get_action_dist( [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\models\catalog.py”, line 457, in get_action_dist [repeated 3x across cluster]
(PPO pid=24516) raise NotImplementedError( [repeated 3x across cluster]
(PPO pid=24516) NotImplementedError: Unsupported args: Box(-5.0, 5.0, (2,), float32) None [repeated 4x across cluster]
(RolloutWorker pid=22872) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=22872, ip=127.0.0.1, actor_id=a88ff9d62147059f0922a61b01000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x000001B3C8AEE960>)