Hello,
I’ve spent days trying to figure out what I’m doing wrong with no success. In need your help
I’m trying to train an agent to play the Quoridor game. When using the train() method with the below code I get the below assertion error, coming from this code (assert self.t == other.t_started
)
I don’t know what I’m doing wrong, given that I have tried my best to follow the documentation for both Ray and my PettingZoo environment.
Has someone already seen this error? What needs to change? I suspect something with the env, but I’m unable to figure this out.
The error
Traceback (most recent call last):
File “F:\\quo/src/Training/rllib_training.py”, line 61, in
res = algo.train()
^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\tune\\trainable\\trainable.py”, line 331, in train
raise skipped from exception_cause(skipped)
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\tune\\trainable\\trainable.py”, line 328, in train
result = self.step()
^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\algorithm.py”, line 1035, in step
train_results, train_iter_ctx = self.\_run_one_training_iteration()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\algorithm.py”, line 3352, in \_run_one_training_iteration
training_step_return_value = self.training_step()
^^^^^^^^^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\dqn\\dqn.py”, line 644, in training_step
return self.\_training_step_new_api_stack()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\dqn\\dqn.py”, line 666, in \_training_step_new_api_stack
self.local_replay_buffer.add(episodes)
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\utils\\replay_buffers\\prioritized_episode_buffer.py”, line 314, in add
existing_eps.concat_episode(eps)
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\env\\multi_agent_episode.py”, line 862, in concat_episode
sa_episode.concat_episode(other.agent_episodes\[agent_id\])
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\env\\single_agent_episode.py”, line 614, in concat_episode
assert self.t == other.t_started
^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError
My training script
from CustomEnvironment.PettingZooQuoridorEnv import PettingZooQuoridorEnv
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.dqn.dqn import DQNConfig
from ray.rllib.env import PettingZooEnv
from ray.rllib.examples.rl_modules.classes.random_rlm import RandomRLModule
from ray.tune.registry import register_env
from ray.rllib.models import ModelCatalog
from QuoridorCNN_Rllib import QuoridorCNN
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from CustomEnvironment.EnvironmentConstants import EnvironmentConstants
import ray
from ray.rllib.policy.policy import PolicySpec
def env_creator(config):
return PettingZooEnv(PettingZooQuoridorEnv(num_players=3))
def policy_mapping_fn(agent_id, *args, **kwargs):
if agent_id == "player_0":
return "learnable_policy"
return "random"
if __name__ == "__main__":
register_env("quoridor_multi", env_creator)
ModelCatalog.register_custom_model("quoridor_cnn", QuoridorCNN)
env = env_creator({})
agent_ids = list(env.observation_space.keys())
first_id = agent_ids[0]
spec = RLModuleSpec(
module_class=QuoridorCNN,
observation_space=env.observation_space[first_id],
action_space=env.action_space[first_id],
model_config={QuoridorCNN._NUM_PLAYER: 3,
QuoridorCNN._MAX_WALLS: 6}
)
ray.init(local_mode=True, include_dashboard=False, log_to_driver=True)
config = (DQNConfig()
.framework("torch")
.environment("quoridor_multi")
.multi_agent(
policies={"learnable_policy", "random"},
policy_mapping_fn=policy_mapping_fn,
policies_to_train=["learnable_policy"])
.debugging(log_level="DEBUG")
.rl_module(
rl_module_spec=MultiRLModuleSpec(rl_module_specs={
"learnable_policy": spec,
"random": RLModuleSpec(module_class=RandomRLModule)
}
)
)
)
algo = config.build()
try:
res = algo.train()
print(res)
except Exception:
import traceback
traceback.print_exc()
raise
My environment (not totally finished, ex: no masks yet but I’m trying an E2E before updating it. It should work as is)
from gymnasium.spaces import Discrete
from gymnasium.spaces import Dict
from gymnasium.utils import seeding
import numpy as np
from pettingzoo import AECEnv
import gymnasium as gym
import functools
from pettingzoo.utils.agent_selector import agent_selector
import heapq
from CustomEnvironment.EnvironmentConstants import EnvironmentConstants
class PettingZooQuoridorEnv(AECEnv):
metadata = {
"name": "petting_zoo_quoridor_env",
}
def __init__(self, num_players = 3):
self.board_size = 9
self.num_players = num_players
self.max_walls = 10 if num_players == 2 else 6
self.max_turns = 100
self.agents = ["player_" + str(r) for r in range(self.num_players)]
self.possible_agents = self.agents.copy()
self.wall_indexes = {}
index = 0
for row in range(1, self.board_size):
for col in range(0, self.board_size - 1):
self.wall_indexes[index] = (row, col, True)
index += 1
for row in range(0, self.board_size - 1):
for col in range(1, self.board_size):
self.wall_indexes[index] = (row, col, False)
index += 1
"""
Board channels:
0,1,2 = Pawns, 3 = horizontal walls, 4=vertical walls
Walls cover 2 cells. a wall coordinate represents the top left corner cell
Example:
* Horizontal wall at (r, c) covers (r, c) and (r, c + 1), and prevents
movement from (r, c) to (r - 1, c) or from (r, c + 1) to (r - 1, c + 1).
* Vertical wall at (r, c) covers (r, c) and (r + 1, c), and prevents
movement from (r, c) to (r, c - 1) or from (r + 1, c) to (r + 1, c - 1).
"""
self.obs_shape = (5, self.board_size, self.board_size)
self.reset()
def reset(self, seed=None, options=None):
# First ever reset with no seed: create RNG.
# Subsequent resets with no seed: keep continuity.
# Reset with seed: overwrite np_random to start a new deterministic sequence.
if seed is not None:
self.np_random, _ = seeding.np_random(seed)
elif not hasattr(self, "np_random"):
self.np_random, _ = seeding.np_random(None)
# Absolute game state
self.board = np.zeros(self.obs_shape, dtype=np.float32)
# From doc https://tianshou.org/en/stable/01_tutorials/00_dqn.html
np.random.seed(seed)
# Pawns coords
self.players_pos = {
# player 1: left
EnvironmentConstants._PLAYER_0: [self.np_random.integers(self.board_size), 0],
# player 2: right
EnvironmentConstants._PLAYER_1: [self.np_random.integers(self.board_size), self.board_size - 1],
}
# Player 3: top
self.players_pos[EnvironmentConstants._PLAYER_2] = [0, self.np_random.integers(self.board_size)]
self.players_walls = {agent: self.max_walls for agent in self.agents}
self.rewards = {agent: 0 for agent in self.agents}
self._cumulative_rewards = {agent: 0 for agent in self.agents}
self.terminations = {agent: False for agent in self.agents}
self.truncations = {agent: False for agent in self.agents}
self.infos = {agent: {} for agent in self.agents}
self.horizontal_wall_starts = []
self.vertical_wall_starts = []
self._agent_selector = agent_selector(self.agents)
self.agent_selection = self._agent_selector.next()
self.num_moves = 0
self.turns_elapsed = 0
@functools.lru_cache(maxsize=None)
def observation_space(self, agent):
return gym.spaces.Dict({
EnvironmentConstants._OBSERVATION_BOARD: gym.spaces.Box(low=0, high=1, shape=self.obs_shape, dtype=np.float32),
EnvironmentConstants._OBSERVATION_WALLS: gym.spaces.Box(low=0, high=self.max_walls, shape=(len(self.agents),), dtype=np.float32),
EnvironmentConstants._OBSERVATION_PLAYERS: gym.spaces.Box(low=0, high=1, shape=(self.num_players,), dtype=np.float32)
})
def observe(self, agent):
"""
Observe should return the observation of the specified agent. This function
should return a sane observation (though not necessarily the most up to date possible)
at any time after reset() is called.
"""
# todo avoid a copy each time
board_copy = self.board.copy()
row, col = self.players_pos[EnvironmentConstants._PLAYER_0]
board_copy[EnvironmentConstants._PLAYER_0_POSITION_CHANNEL, row, col] = 1
row, col = self.players_pos[EnvironmentConstants._PLAYER_1]
board_copy[EnvironmentConstants._PLAYER_1_POSITION_CHANNEL, row, col] = 1
row, col = self.players_pos[EnvironmentConstants._PLAYER_2]
board_copy[EnvironmentConstants._PLAYER_2_POSITION_CHANNEL, row, col] = 1
one_hot = np.zeros(self.num_players, dtype=np.float32)
if agent in self.agents:
one_hot[self.agents.index(agent)] = 1
return {
EnvironmentConstants._OBSERVATION_BOARD: board_copy,
EnvironmentConstants._OBSERVATION_WALLS: np.array([self.players_walls[a] for a in self.possible_agents], dtype=np.float32),
EnvironmentConstants._OBSERVATION_PLAYERS: one_hot
}
@functools.lru_cache(maxsize=None)
def action_space(self, agent):
# Action space:
# 0-3: move (left, right, up, down)
# 4+: place wall (index encoding position + orientation)
# Walls have a size of 2
# Horizontal walls can't be put on row 0 nor in the last column
# Vertical walls can't be put on col 0 nor on the last row
return gym.spaces.Discrete(4 + (self.board_size - 1) ** 2 + (self.board_size - 1) ** 2)
def step(self, action):
agent = self.agent_selection
if (self.terminations[agent] or self.truncations[agent]):
# handles stepping an agent which is already dead
# accepts a None action for the one agent, and moves the agent_selection to
# the next dead agent, or if there are no more dead agents, to the next live agent
self._was_dead_step(action)
return
# the agent which stepped last had its _cumulative_rewards accounted for
# (because it was returned by last()), so the _cumulative_rewards for this
# agent should start again at 0
self._cumulative_rewards[agent] = 0
# Give rewards
if self._agent_selector.is_last():
# rewards for all agents are placed in the .rewards dictionary
if self.players_pos[EnvironmentConstants._PLAYER_0][1] == self.board_size - 1:
self.rewards[EnvironmentConstants._PLAYER_0] += EnvironmentConstants._WIN_GAME_REWARD
self.terminations[EnvironmentConstants._PLAYER_0] = True
if self.players_pos[EnvironmentConstants._PLAYER_1][1] == 0:
self.rewards[EnvironmentConstants._PLAYER_1] += EnvironmentConstants._WIN_GAME_REWARD
self.terminations[EnvironmentConstants._PLAYER_1] = True
if (self.num_players == 3
and self.players_pos[EnvironmentConstants._PLAYER_2][0] == self.board_size - 1):
self.rewards[EnvironmentConstants._PLAYER_2] += EnvironmentConstants._WIN_GAME_REWARD
self.terminations[EnvironmentConstants._PLAYER_2] = True
self.num_moves += 1
# The truncations dictionary must be updated for all players.
self.truncations = {
agent: self.num_moves >= EnvironmentConstants._MAX_TURNS for agent in self.agents
}
# Apply action (movement or wall placement)
if action < 4:
self._move_pawn(agent, action)
# else:
# self._place_wall(agent, action - 4)
self.rewards[agent] += EnvironmentConstants._DEFAULT_STEP_REWARD
self._accumulate_rewards()
if not all(self.terminations[a] or self.truncations[a] for a in self.agents):
self.agent_selection = self._agent_selector.next()
def render(self):
pass
def close(self):
"""
Close should release any graphical displays, subprocesses, network connections
or any other environment data which should not be kept around after the
user is no longer using the environment.
"""
pass
def seed(self, seed):
# https://tianshou.org/en/stable/01_tutorials/00_dqn.html
self.np_random, _ = seeding.np_random(seed)
def _move_pawn(self, agent, action):
"""
Move pawn in direction if allowed.
action: 0=LEFT, 1=RIGHT, 2=UP, 3=DOWN (relative to rotated view)
Return: (reward, done)
"""
r, c = self.players_pos[agent]
nr, nc = r, c
# Compute new position based on action
if action == EnvironmentConstants._ACTION_MOVE_LEFT:
nc = c - 1
elif action == EnvironmentConstants._ACTION_MOVE_RIGHT:
nc = c + 1
elif action == EnvironmentConstants._ACTION_MOVE_UP:
nr = r - 1
elif action == EnvironmentConstants._ACTION_MOVE_DOWN:
nr = r + 1
else:
raise Exception("Invalid action number")
if not self._is_move_valid(r, c, nr, nc, action):
self._invalid_action(agent)
return
# Move is valid: update position
self.players_pos[agent] = [nr, nc]
def _place_wall(self, agent, wall_idx):
if self.players_walls[agent] == 0:
self._invalid_action(agent)
return
row, col, is_horizontal = self.wall_indexes[wall_idx]
# horizontal wall
if (is_horizontal):
if (row == 0
or col == self.board_size - 1
or self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col] == 1
or (col + 1 < self.board_size and self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col + 1] == 1)
or ((row - 1, col + 1) in self.vertical_wall_starts)):
self._invalid_action(agent)
return
self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col] = 1
if col + 1 < self.board_size:
self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col + 1] = 1
self.players_walls[agent] -= 1
self.horizontal_wall_starts.append((row, col))
# vertical wall
else:
if (col == 0
or row == self.board_size - 1
or self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row, col] == 1
or (row + 1 < self.board_size and self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row + 1, col] == 1)
or ((row + 1, col - 1) in self.horizontal_wall_starts)): # crossing walls
self._invalid_action(agent)
return
self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row, col] = 1
if row + 1 < self.board_size:
self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row + 1, col] = 1
self.players_walls[agent] -= 1
self.vertical_wall_starts.append((row, col))
for player in self.agents:
if not self._is_path_available(player):
self._invalid_action(agent)
# Refund
self.players_walls[agent] += 1
if is_horizontal:
self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col] = 0
self.horizontal_wall_starts.remove((row, col))
if col + 1 < self.board_size:
self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col + 1] = 0
else:
self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row, col] = 0
self.vertical_wall_starts.remove((row, col))
if row + 1 < self.board_size:
self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row + 1, col] = 0
return
def _invalid_action(self, agent):
self.rewards[agent] += EnvironmentConstants._INVALID_ACTION_REWARD
self.terminations[agent] = True
# moving onto another pawn is OK
def _is_move_valid(self, r, c, nr, nc, action):
# Check board bounds
if not (0 <= nr < self.board_size and 0 <= nc < self.board_size):
return False
# Check walls blocking movement:
if action == EnvironmentConstants._ACTION_MOVE_LEFT:
if self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, r, c] == 1:
return False
elif action == EnvironmentConstants._ACTION_MOVE_RIGHT:
if c + 1 < self.board_size:
if self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, r, c + 1] == 1:
return False
elif action == EnvironmentConstants._ACTION_MOVE_UP:
if self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, r, c] == 1:
return False
elif action == EnvironmentConstants._ACTION_MOVE_DOWN:
if r + 1 < self.board_size:
if self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, r + 1, c] == 1:
return False
return True
def _is_path_available(self, agent):
"""
Check if there is a valid path for the given agent to their goal using A*.
"""
start = tuple(self.players_pos[agent]) # Starting position of the agent
goal_row, goal_col = None, None
# Define the goal based on the agent
if agent == EnvironmentConstants._PLAYER_0: # Goal is the right edge
goal_col = self.board_size - 1
elif agent == EnvironmentConstants._PLAYER_1: # Goal is the left edge
goal_col = 0
elif agent == EnvironmentConstants._PLAYER_2: # Goal is the bottom edge
goal_row = self.board_size - 1
# Priority queue for A* (min-heap)
priority_queue = []
heapq.heappush(priority_queue, (0, start)) # (priority, position)
# Cost to reach each position
g_cost = {start: 0}
# Visited nodes
visited = set()
# Directions: LEFT, RIGHT, UP, DOWN
directions = {
EnvironmentConstants._ACTION_MOVE_UP : (-1, 0),
EnvironmentConstants._ACTION_MOVE_DOWN : (1, 0),
EnvironmentConstants._ACTION_MOVE_LEFT : (0, -1),
EnvironmentConstants._ACTION_MOVE_RIGHT : (0, 1)
}
while priority_queue:
_, current = heapq.heappop(priority_queue)
r, c = current
# If the goal is reached
if goal_col is not None and c == goal_col:
return True
if goal_row is not None and r == goal_row:
return True
if current in visited:
continue
visited.add(current)
# Explore neighbors
for action, (dr, dc) in directions.items():
nr, nc = r + dr, c + dc
if not self._is_move_valid(r, c, nr, nc, action):
continue
# Calculate costs
new_cost = g_cost[current] + 1 # Cost to move to the neighbor
neighbor = (nr, nc)
if neighbor not in g_cost or new_cost < g_cost[neighbor]:
g_cost[neighbor] = new_cost
# Heuristic: Manhattan distance to the goal
h_cost = 0
if goal_row is not None and goal_col is not None:
h_cost = abs(nr - goal_row) + abs(nc - goal_col)
elif goal_row is None:
h_cost = abs(nc - goal_col)
else:
h_cost = abs(nr - goal_row)
priority = new_cost + h_cost
heapq.heappush(priority_queue, (priority, neighbor))
# If the queue is empty and no path is found
return False
A constant file
class EnvironmentConstants:
_INVALID_ACTION_REWARD = -20
_WIN_GAME_REWARD = 10
_DEFAULT_STEP_REWARD = -1
_ACTION_MOVE_LEFT = 0
_ACTION_MOVE_RIGHT = 1
_ACTION_MOVE_UP = 2
_ACTION_MOVE_DOWN = 3
_PLAYER_0_POSITION_CHANNEL = 0
_PLAYER_1_POSITION_CHANNEL = 1
_PLAYER_2_POSITION_CHANNEL = 2
_HORIZONTAL_WALLS_CHANNEL = 3
_VERTICAL_WALLS_CHANNEL = 4
_PLAYER_0 = "player_0"
_PLAYER_1 = "player_1"
_PLAYER_2 = "player_2"
_OBSERVATION_BOARD = "board"
_OBSERVATION_PLAYERS = "players"
_OBSERVATION_WALLS = "walls_remaining"
_MAX_TURNS = 100
And my neural network (if it helps)
import torch
import torch.nn as nn
from ray.rllib.core.rl_module.torch.torch_rl_module import TorchRLModule
from ray.rllib.core.columns import Columns
from CustomEnvironment.EnvironmentConstants import EnvironmentConstants
# See https://docs.ray.io/en/latest/rllib/rl-modules.html
class QuoridorCNN(TorchRLModule):
_NUM_PLAYER = "num_player"
_MAX_WALLS = "max_walls"
def setup(self):
super().setup()
self.num_players = self.model_config[self._NUM_PLAYER]
self.max_walls = self.model_config[self._MAX_WALLS]
self.conv = nn.Sequential(
nn.Conv2d(self.observation_space[EnvironmentConstants._OBSERVATION_BOARD].shape[0], 32, 3, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, padding=1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU()
)
# Global pooling to reduce spatial dims
self.pool = nn.Sequential(
nn.MaxPool2d(2),
nn.Flatten()
)
scalar_dim = self.num_players *2 # 1 for position, 1 for array
self.fc = nn.Sequential(
nn.Linear(128 * 4 *4 + scalar_dim, 256),
nn.ReLU(),
nn.Linear(256, self.action_space.n)
)
def _forward(self, batch, **kwargs):
obs = batch[Columns.OBS]
# device = next(self.parameters()).device
device = "cpu"
board = obs[EnvironmentConstants._OBSERVATION_BOARD]
board = torch.as_tensor(board, dtype=torch.float32, device=device)
x = self.conv(board) # 128x9x9
x = self.pool(x)
players = obs[EnvironmentConstants._OBSERVATION_PLAYERS]
players = torch.as_tensor(players, dtype=torch.float32, device=device)
walls = obs[EnvironmentConstants._OBSERVATION_WALLS]
walls = torch.as_tensor(walls, dtype=torch.float32, device=device)
walls_norm = walls / float(self.max_walls)
scalars = torch.cat([players, walls_norm], dim=1)
h = torch.cat([x, scalars], dim=1)
q = self.fc(h)
return {
Columns.ACTION_DIST_INPUTS: q
}