NotImplementedError: Unsupported args

I am trying to implement a MAPPO Multi-Agent boid environment.
Using Gym and Ray Rllib.

Whenever I run the training file it gives the error
(PPO pid=24516) NotImplementedError: Unsupported args: Box(-5.0, 5.0, (2,), float32) None [repeated 4x across cluster]
(RolloutWorker pid=22872) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=22872, ip=127.0.0.1, actor_id=a88ff9d62147059f0922a61b01000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x000001B3C8AEE960>)

I tried multiple different methods. The spaces are to be from one agent’s perspective.

Environment:

import json
import numpy as np
from gym import spaces
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from Settings import *

class Agent:
    def __init__(self, position):
        self.position = np.array(position, dtype=float)
        self.acceleration = np.zeros(2)
        self.max_acceleration = SimulationVariables["AccelerationUpperLimit"]
        self.velocity = np.round(np.random.uniform(-SimulationVariables["VelocityUpperLimit"], SimulationVariables["VelocityUpperLimit"], size=2), 2)
        self.max_velocity = SimulationVariables["VelocityUpperLimit"]

    def update(self, action):
        self.acceleration += action
        self.acceleration = np.clip(self.acceleration, -self.max_acceleration, self.max_acceleration)
        self.velocity += self.acceleration * SimulationVariables["dt"]
        vel = np.linalg.norm(self.velocity)
        if vel > self.max_velocity:
            self.velocity = (self.velocity / vel) * self.max_velocity
        self.position += self.velocity * SimulationVariables["dt"]
        return self.position, self.velocity

class FlockingEnv(MultiAgentEnv):
    def __init__(self):
        super(FlockingEnv, self).__init__()
        self.episode=0
        self.counter=1
        self.CTDE=False
        self.current_timestep = 0
        self.reward_log = []

        self.agents = [Agent(position) for position in self.read_agent_locations()]

        # Per agent action and observation space
        # Define single-agent action and observation space
        self.action_space = spaces.Box(low=np.array([-5, -5], dtype=np.float32),
                                    high=np.array([5, 5], dtype=np.float32),
                                    dtype=np.float32)

        self.observation_space = spaces.Box(low=np.array([-np.inf, -np.inf, -2.5, -2.5], dtype=np.float32),
                                            high=np.array([np.inf, np.inf, 2.5, 2.5], dtype=np.float32),
                                            dtype=np.float32)
        
        self.agents_ids = [f"agent_{i}" for i in range(len(self.agents))]


    # def step(self, action_dict):
    #     observations = {}
    #     rewards = {}
    #     dones = {"__all__": False}
    #     infos = {}

    #     # Apply noisy actions
    #     for agent_id, action in action_dict.items():
    #         noisy_action = action + np.random.normal(loc=0, scale=0.5, size=action.shape)
    #         action = np.clip(noisy_action, self.action_space.low, self.action_space.high)
    #         position, velocity = self.agents[agent_id].update(action)

    #         observations[agent_id] = np.concatenate([position, velocity])

    #         # Calculate reward and check collisions
    #         reward, out_of_flock = self.calculate_reward(self.agents[agent_id])
    #         rewards[agent_id] = reward

    #         if self.check_collision(self.agents[agent_id]) or out_of_flock:
    #             dones["__all__"] = True  # End episode if a collision occurs
    #             self.reset()

    #     self.current_timestep += 1

    #     return observations, rewards, dones, infos

    def step(self, action_dict):
        observations = {}
        rewards = {}
        dones = {"__all__": False}
        infos = {}

        # Apply noisy actions and update each agent's state
        for agent_id, action in action_dict.items():
            # Apply noise to actions and update agent's state
            noisy_action = action + np.random.normal(loc=0, scale=0.5, size=action.shape)
            action = np.clip(noisy_action, self.agent_action_space.low, self.agent_action_space.high)

            # Update agent position and velocity
            position, velocity = self.agents[agent_id].update(action)

            # Update observation for each agent
            observations[agent_id] = np.concatenate([position, velocity])

            # Calculate reward
            reward, out_of_flock = self.calculate_reward(self.agents[agent_id])
            rewards[agent_id] = reward

            # If there's a collision or agent is out of flock, mark the episode as done
            if self.check_collision(self.agents[agent_id]) or out_of_flock:
                dones["__all__"] = True  # End episode if a collision occurs
                # Reset environment for all agents
                self.reset()

        # Update the time step count
        self.current_timestep += 1

        return observations, rewards, dones, infos

    # def reset(self):
    #     self.agents = {f"agent_{i}": Agent(position) for i, position in enumerate(self.read_agent_locations())}
        
    #     observations = {}
    #     for agent_id, agent in self.agents.items():
    #         agent.acceleration = np.zeros(2)
    #         agent.velocity = np.round(np.random.uniform(-SimulationVariables["VelocityUpperLimit"], SimulationVariables["VelocityUpperLimit"], size=2), 2)
    #         observations[agent_id] = np.concatenate([agent.position, agent.velocity])

    #     return observations

    def reset(self):
        # Reset all agents
        self.agents = {f"agent_{i}": Agent(position) for i, position in enumerate(self.read_agent_locations())}
        
        # Initialize observations for each agent
        observations = {}
        for agent_id, agent in self.agents.items():
            agent.acceleration = np.zeros(2)
            agent.velocity = np.round(np.random.uniform(-SimulationVariables["VelocityUpperLimit"], SimulationVariables["VelocityUpperLimit"], size=2), 2)
            observations[agent_id] = np.concatenate([agent.position, agent.velocity])

        return observations

    def close(self):
        print("Environment is closed. Cleanup complete.")
        
    def simulate_agents(self, actions):
        observations = []  # Initialize an empty 1D array

        actions_reshaped = actions.reshape(((SimulationVariables["SimAgents"]), 2))

        for i, agent in enumerate(self.agents):
            position, velocity = agent.update(actions_reshaped[i])
            observation_pair = np.concatenate([position, velocity])
            observations = np.concatenate([observations, observation_pair])  # Concatenate each pair directly

        return observations
    
    def check_collision(self, agent):

        for other in self.agents:
            if agent != other:
                distance = np.linalg.norm(agent.position - other.position)
                if distance < SimulationVariables["SafetyRadius"]:
                    return True
                
        return False

    def get_observation(self):
        observations = np.zeros((len(self.agents), 4), dtype=np.float32)
        
        for i, agent in enumerate(self.agents):
            observations[i] = [
                agent.position[0],
                agent.position[1],
                agent.velocity[0],
                agent.velocity[1]
            ]

        # Reshape the observation into 1D                    
        return observations
   
    def get_closest_neighbors(self, agent):

        neighbor_positions=[]
        neighbor_velocities=[]

        for _, other in enumerate(self.agents):
            if agent != other:
                distance = np.linalg.norm(other.position - agent.position)

                if(self.CTDE == True):

                    ################################################################
                    # if distance < SimulationVariables["NeighborhoodRadius"]:
                    #    neighbor_positions.append(other.position)
                    #    neighbor_velocities.append(other.velocity)
                    ################################################################
                    neighbor_positions.append(other.position)
                    neighbor_velocities.append(other.velocity)

                else:  
                    neighbor_positions.append(other.position)
                    neighbor_velocities.append(other.velocity)

        return neighbor_positions, neighbor_velocities         
   
    # def calculate_reward(self):
    #     reward=0
    #     Collisions={}

    #     neighbor_positions = [[] for _ in range(len(self.agents))] 
    #     neighbor_velocities = [[] for _ in range(len(self.agents))]
    #     out_of_flock=False

    #     for idx, _ in enumerate(self.agents):
    #         Collisions[idx] = []

    #     for _, agent in enumerate(self.agents): 
    #         neighbor_positions, neighbor_velocities=self.get_closest_neighbors(agent)
    #         val, out_of_flock=self.reward(agent, neighbor_velocities, neighbor_positions)
    #         reward+=val

    #     return reward, out_of_flock

    def calculate_reward(self, agent):
        reward = 0
        out_of_flock = False

        # Get closest neighbors for a single agent
        neighbor_positions, neighbor_velocities = self.get_closest_neighbors(agent)

        # Calculate reward for the single agent
        reward, out_of_flock = self.reward(agent, neighbor_velocities, neighbor_positions)

        return reward, out_of_flock

    def reward(self, agent, neighbor_velocities, neighbor_positions):
        CohesionReward = 0
        AlignmentReward = 0
        total_reward = 0
        outofflock = False
        midpoint = (SimulationVariables["SafetyRadius"] + SimulationVariables["NeighborhoodRadius"]) / 2

        if len(neighbor_positions) > 0:
            for neighbor_position in neighbor_positions:
                distance = np.linalg.norm(agent.position - neighbor_position)

                if distance <= SimulationVariables["SafetyRadius"]:
                    CohesionReward += -10
                elif SimulationVariables["SafetyRadius"] < distance <= midpoint:
                    CohesionReward += (10 / (midpoint - SimulationVariables["SafetyRadius"])) * (distance - SimulationVariables["SafetyRadius"])
                elif midpoint < distance <= SimulationVariables["NeighborhoodRadius"]:
                    CohesionReward += 10 - (10 / (SimulationVariables["NeighborhoodRadius"] - midpoint)) * (distance - midpoint)
      
                average_velocity = np.mean(neighbor_velocities, axis=0)
                dot_product = np.dot(average_velocity, agent.velocity)
                norm_product = np.linalg.norm(average_velocity) * np.linalg.norm(agent.velocity)

                if norm_product == 0:
                    cos_angle = 1.0
                else:
                    cos_angle = dot_product / norm_product

                cos_angle = np.clip(cos_angle, -1.0, 1.0)
                orientation_diff = np.arccos(cos_angle)

                alignment = (orientation_diff / np.pi)
                AlignmentReward = -20 * alignment + 10  

        else:
            CohesionReward -= 10
            outofflock = True

        total_reward = CohesionReward + AlignmentReward

        return total_reward, outofflock

    def read_agent_locations(self):

        File = rf"{Results['InitPositions']}" + str(self.counter) + "\config.json"
        with open(File, "r") as f:
            data = json.load(f)

        return data

Training File:

import ray
from ray import tune
from ray.rllib.algorithms.ppo import PPOConfig
from env import FlockingEnv
from Settings import *
from ray.rllib.algorithms.callbacks import DefaultCallbacks
from collections import deque
import numpy as np
# Config and kwargs not being used 

class LossCallback(DefaultCallbacks):
    def __init__(self, loss_threshold=2000):
        super().__init__()
        self.loss_threshold = loss_threshold

    def on_train_result(self, *, trainer, result, **kwargs):
        # Assuming "policy_loss" or similar is in the result dict
        # Adjust this key according to your setup
        if 'policy_loss' in result['info']:
            policy_loss = result['info']['policy_loss']
            # Stopping if the average policy loss is below threshold
            if policy_loss < self.loss_threshold:
                print(f"Stopping training as loss ({policy_loss}) is below {self.loss_threshold}")
                result['done'] = True  # This will signal to stop the training

# Custom callback to handle replay buffer
class ReplayBufferCallback(DefaultCallbacks):
    def __init__(self, replay_buffer):
        super().__init__()
        self.replay_buffer = replay_buffer

    def on_postprocess_trajectory(self, *, worker, episode, agent_id, policies, postprocessed_batch, **kwargs):
        # Extract data from the postprocessed batch
        observations = postprocessed_batch["obs"]
        actions = postprocessed_batch["actions"]
        rewards = postprocessed_batch["rewards"]
        next_observations = postprocessed_batch["new_obs"]
        dones = postprocessed_batch["dones"]

        for obs, action, reward, next_obs, done in zip(observations, actions, rewards, next_observations, dones):
            self.replay_buffer.add(obs, action, reward, next_obs, done)

class ReplayBuffer:
    def __init__(self, capacity, observation_space, action_space):
        self.capacity = capacity
        self.observation_space = observation_space
        self.action_space = action_space
        self.buffer = deque(maxlen=capacity)
        self.size = 0

    def add(self, observation, action, reward, next_observation, done):
        self.buffer.append((observation, action, reward, next_observation, done))
        self.size = min(self.size + 1, self.capacity)

    def sample(self, batch_size):
        indices = np.random.randint(0, self.size, size=batch_size)
        batch = [self.buffer[i] for i in indices]
        observations, actions, rewards, next_observations, dones = zip(*batch)

        return {
            'observations': np.array(observations),
            'actions': np.array(actions),
            'rewards': np.array(rewards),
            'next_observations': np.array(next_observations),
            'dones': np.array(dones)
        }

    def __len__(self):
        return self.size

ray.init()

# Create an instance of the environment to access spaces
env = FlockingEnv()

replay_buffer = ReplayBuffer(
    capacity=100000,  # Adjust capacity as needed
    observation_space=env.observation_space,
    action_space=env.action_space
)

# Register the environment if not already registered
env_name = "flocking_env"
tune.register_env(env_name, lambda config: FlockingEnv())

policy_ids = [f"agent_{i}" for i in range(3)]  # Adjust the number based on the number of agents

config = PPOConfig() \
    .environment(env_name) \
    .framework("torch") \
    .multi_agent(
        policies={
            policy_id: (
                None,
                env.observation_space,
                env.action_space,
                {
                    "model": {
                        "vf_share_layers": True,
                        "fcnet_hiddens": [512, 512, 512, 512, 512, 512, 512, 512],
                        "fcnet_activation": "tanh",
                    },
                }
            )
            for policy_id in policy_ids
        },
        policy_mapping_fn=lambda agent_id, **kwargs: agent_id,
    ) \
    .rollouts(num_rollout_workers=2) \
    .training(
        train_batch_size=2048,
        sgd_minibatch_size=64,
        num_sgd_iter=10
    )

# Example of running the training
tune.run(
    "PPO",
    config=config.to_dict(),  # Convert the config to a dictionary
    stop={"timesteps_total": SimulationVariables["LearningTimeSteps"]},  # Train for 2 million timesteps
    # callbacks=[ReplayBufferCallback(replay_buffer)]  # Add custom callback
)

# Shutdown Ray
ray.shutdown()

# tune.run(
#     PPO,
#     config=config,  # No need to convert to dict
#     stop={"episode_reward_mean": 100},  # Adjust this based on your environment
# )

# tune.run(
#     PPO,
#     config=config,
#     stop={"training_iteration": 100},  # Optional stop condition if loss threshold isn't hit
#     callbacks=[LossCallback(loss_threshold=2000)]
# )

Stack trace:

Trial status: 1 ERROR
Current time: 2024-09-13 23:19:29. Total running time: 17s
Logical resource usage: 0/12 CPUs, 0/1 GPUs (0.0/1.0 accelerator_type:G)
╭─────────────────────────────────────────╮
│ Trial name status │
├─────────────────────────────────────────┤
│ PPO_flocking_env_ad2bc_00000 ERROR │
╰─────────────────────────────────────────╯

Number of errored trials: 1
╭──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ Trial name # failures error file

├──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┤
│ PPO_flocking_env_ad2bc_00000 1 C:/Users/Cr7th/AppData/Local/Temp/ray/session_2024-09-13_23-19-07_213600_12872/artifacts/2024-09-13_23-19-12/PPO_2024-09-13_23-19-12/driver_artifacts/PPO_flocking_env_ad2bc_00000_0_2024-09-13_23-19-12/error.txt │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

Traceback (most recent call last):
File “D:\MARL Flocking\Train.py”, line 117, in
tune.run(
File “C:\Python312\Lib\site-packages\ray\tune\tune.py”, line 1035, in run
raise TuneError(“Trials did not complete”, incomplete_trials)
ray.tune.error.TuneError: (‘Trials did not complete’, [PPO_flocking_env_ad2bc_00000])
(PPO pid=24516) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::PPO.init() (pid=24516, ip=127.0.0.1, actor_id=6981d82e454ec16aef59f8be01000000, repr=PPO)
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\env\env_runner_group.py”, line 259, in _setup
(PPO pid=24516) self.add_workers(
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\env\env_runner_group.py”, line 792, in add_workers
(PPO pid=24516) raise result.get()
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\utils\actor_manager.py”, line 500, in _fetch_result
(PPO pid=24516) result = ray.get(ready)
(PPO pid=24516) ^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\auto_init_hook.py”, line 21, in auto_init_wrapper
(PPO pid=24516) return fn(*args, **kwargs)
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\client_mode_hook.py”, line 103, in wrapper
(PPO pid=24516) return func(*args, **kwargs)
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\worker.py”, line 2661, in get
(PPO pid=24516) values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\worker.py”, line 873, in get_objects
(PPO pid=24516) raise value
(PPO pid=24516) ray.exceptions.ActorDiedError: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=22880, ip=127.0.0.1, actor_id=c5e76cb89068f11352c7d86c01000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x000002E29D31E6C0>)
(PPO pid=24516) TorchPolicyV2.init(
(PPO pid=24516)
(PPO pid=24516) During handling of the above exception, another exception occurred:
(PPO pid=24516)
(PPO pid=24516) ray::PPO.init() (pid=24516, ip=127.0.0.1, actor_id=6981d82e454ec16aef59f8be01000000, repr=PPO)
(PPO pid=24516) super().init(
(PPO pid=24516) self.setup(copy.deepcopy(self.config))
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\algorithms\algorithm.py”, line 659, in setup
(PPO pid=24516) self.env_runner_group = EnvRunnerGroup(
(PPO pid=24516) ^^^^^^^^^^^^^^^
(PPO pid=24516) raise e.args[0].args[2]
(PPO pid=24516) File “python\ray_raylet.pyx”, line 1858, in ray._raylet.execute_task [repeated 6x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see Configuring Logging — Ray 3.0.0.dev0 for more options.)
(PPO pid=24516) File “python\ray_raylet.pyx”, line 1799, in ray._raylet.execute_task.function_executor [repeated 4x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray_private\function_manager.py”, line 691, in actor_method_executor [repeated 4x across cluster]
(PPO pid=24516) return method(__ray_actor, *args, **kwargs) [repeated 4x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 4x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\util\tracing\tracing_helper.py”, line 467, in _resume_span [repeated 11x across cluster]
(PPO pid=24516) return method(self, *_args, **_kwargs) [repeated 11x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 11x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\env\env_runner_group.py”, line 211, in init [repeated 12x across cluster]
(PPO pid=24516) self._update_policy_map(policy_dict=self.policy_dict) [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\evaluation\rollout_worker.py”, line 1744, in _update_policy_map [repeated 3x across cluster]
(PPO pid=24516) self._build_policy_map( [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\evaluation\rollout_worker.py”, line 1855, in _build_policy_map [repeated 3x across cluster]
(PPO pid=24516) new_policy = create_policy_for_framework( [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\utils\policy.py”, line 108, in create_policy_for_framework [repeated 3x across cluster]
(PPO pid=24516) return policy_class(observation_space, action_space, merged_config) [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) model, dist_class = self._init_model_and_dist_class() [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\policy\torch_policy_v2.py”, line 512, in _init_model_and_dist_class [repeated 3x across cluster]
(PPO pid=24516) dist_class, logit_dim = ModelCatalog.get_action_dist( [repeated 3x across cluster]
(PPO pid=24516) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 3x across cluster]
(PPO pid=24516) File “C:\Python312\Lib\site-packages\ray\rllib\models\catalog.py”, line 457, in get_action_dist [repeated 3x across cluster]
(PPO pid=24516) raise NotImplementedError( [repeated 3x across cluster]
(PPO pid=24516) NotImplementedError: Unsupported args: Box(-5.0, 5.0, (2,), float32) None [repeated 4x across cluster]
(RolloutWorker pid=22872) Exception raised in creation task: The actor died because of an error raised in its creation task, ray::RolloutWorker.init() (pid=22872, ip=127.0.0.1, actor_id=a88ff9d62147059f0922a61b01000000, repr=<ray.rllib.evaluation.rollout_worker.RolloutWorker object at 0x000001B3C8AEE960>)