Self.t == other.t_started training error

Hello,
I’ve spent days trying to figure out what I’m doing wrong with no success. In need your help :sweat_smile:

I’m trying to train an agent to play the Quoridor game. When using the train() method with the below code I get the below assertion error, coming from this code (assert self.t == other.t_started)

I don’t know what I’m doing wrong, given that I have tried my best to follow the documentation for both Ray and my PettingZoo environment.

Has someone already seen this error? What needs to change? I suspect something with the env, but I’m unable to figure this out.

The error

Traceback (most recent call last):
File “F:\\quo/src/Training/rllib_training.py”, line 61, in 
res = algo.train()
^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\tune\\trainable\\trainable.py”, line 331, in train
raise skipped from exception_cause(skipped)
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\tune\\trainable\\trainable.py”, line 328, in train
result = self.step()
^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\algorithm.py”, line 1035, in step
train_results, train_iter_ctx = self.\_run_one_training_iteration()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\algorithm.py”, line 3352, in \_run_one_training_iteration
training_step_return_value = self.training_step()
^^^^^^^^^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\dqn\\dqn.py”, line 644, in training_step
return self.\_training_step_new_api_stack()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\algorithms\\dqn\\dqn.py”, line 666, in \_training_step_new_api_stack
self.local_replay_buffer.add(episodes)
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\utils\\replay_buffers\\prioritized_episode_buffer.py”, line 314, in add
existing_eps.concat_episode(eps)
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\env\\multi_agent_episode.py”, line 862, in concat_episode
sa_episode.concat_episode(other.agent_episodes\[agent_id\])
File “f:\\quo.rllib_env\\Lib\\site-packages\\ray\\rllib\\env\\single_agent_episode.py”, line 614, in concat_episode
assert self.t == other.t_started
^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError

My training script

from CustomEnvironment.PettingZooQuoridorEnv import PettingZooQuoridorEnv
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.dqn.dqn import DQNConfig
from ray.rllib.env import PettingZooEnv
from ray.rllib.examples.rl_modules.classes.random_rlm import RandomRLModule
from ray.tune.registry import register_env
from ray.rllib.models import ModelCatalog
from QuoridorCNN_Rllib import QuoridorCNN
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from CustomEnvironment.EnvironmentConstants import EnvironmentConstants
import ray
from ray.rllib.policy.policy import PolicySpec

def env_creator(config):
    return PettingZooEnv(PettingZooQuoridorEnv(num_players=3))

def policy_mapping_fn(agent_id, *args, **kwargs):
    if agent_id == "player_0":
        return "learnable_policy"
    
    return "random"

if __name__ == "__main__":
    register_env("quoridor_multi", env_creator)
    ModelCatalog.register_custom_model("quoridor_cnn", QuoridorCNN)

    env = env_creator({})
    agent_ids = list(env.observation_space.keys())
    first_id = agent_ids[0]

    spec = RLModuleSpec(
        module_class=QuoridorCNN,
        observation_space=env.observation_space[first_id],
        action_space=env.action_space[first_id],
        model_config={QuoridorCNN._NUM_PLAYER: 3,
                      QuoridorCNN._MAX_WALLS: 6}
    )

    ray.init(local_mode=True, include_dashboard=False, log_to_driver=True)

    config = (DQNConfig()
              .framework("torch")
              .environment("quoridor_multi")
              .multi_agent(
                  policies={"learnable_policy", "random"},
                  policy_mapping_fn=policy_mapping_fn,
                  policies_to_train=["learnable_policy"])
               .debugging(log_level="DEBUG")
              .rl_module(
                rl_module_spec=MultiRLModuleSpec(rl_module_specs={
                    "learnable_policy": spec,
                    "random": RLModuleSpec(module_class=RandomRLModule)
                }
                )
            )
    )

    algo = config.build()
    try:
        res = algo.train()
        print(res)
    except Exception:
        import traceback
        traceback.print_exc()
        raise

My environment (not totally finished, ex: no masks yet but I’m trying an E2E before updating it. It should work as is)

from gymnasium.spaces import Discrete
from gymnasium.spaces import Dict
from gymnasium.utils import seeding
import numpy as np
from pettingzoo import AECEnv
import gymnasium as gym
import functools
from pettingzoo.utils.agent_selector import agent_selector
import heapq
from CustomEnvironment.EnvironmentConstants import EnvironmentConstants

class PettingZooQuoridorEnv(AECEnv):
    metadata = {
        "name": "petting_zoo_quoridor_env",
    }

    def __init__(self, num_players = 3):
        self.board_size = 9
        self.num_players = num_players
        self.max_walls = 10 if num_players == 2 else 6
        self.max_turns = 100
        self.agents = ["player_" + str(r) for r in range(self.num_players)]
        self.possible_agents = self.agents.copy()

        self.wall_indexes = {}

        index = 0
        for row in range(1, self.board_size):
            for col in range(0, self.board_size - 1):
                self.wall_indexes[index] = (row, col, True)
                index += 1

        for row in range(0, self.board_size - 1):
            for col in range(1, self.board_size):
                self.wall_indexes[index] = (row, col, False)
                index += 1

        """
        Board channels:
        0,1,2 = Pawns, 3 = horizontal walls, 4=vertical walls
        Walls cover 2 cells. a wall coordinate represents the top left corner cell
        Example:
         * Horizontal wall at (r, c) covers (r, c) and (r, c + 1), and prevents 
         movement from (r, c) to (r - 1, c) or from (r, c + 1) to (r - 1, c + 1).
         * Vertical wall at (r, c) covers (r, c) and (r + 1, c), and prevents
         movement from (r, c) to (r, c - 1) or from (r + 1, c) to (r + 1, c - 1).
        """
        self.obs_shape = (5, self.board_size, self.board_size)


        self.reset()
                
    def reset(self, seed=None, options=None):
        # First ever reset with no seed: create RNG.
        # Subsequent resets with no seed: keep continuity.
        # Reset with seed: overwrite np_random to start a new deterministic sequence.
        if seed is not None:
            self.np_random, _ = seeding.np_random(seed)
        elif not hasattr(self, "np_random"):
            self.np_random, _ = seeding.np_random(None)    

        # Absolute game state
        self.board = np.zeros(self.obs_shape, dtype=np.float32)
        # From doc https://tianshou.org/en/stable/01_tutorials/00_dqn.html
        np.random.seed(seed)

        # Pawns coords
        self.players_pos = {
            # player 1: left
            EnvironmentConstants._PLAYER_0: [self.np_random.integers(self.board_size), 0],
            # player 2: right
            EnvironmentConstants._PLAYER_1: [self.np_random.integers(self.board_size), self.board_size - 1],  
        }          
            
        # Player 3: top
        self.players_pos[EnvironmentConstants._PLAYER_2] = [0, self.np_random.integers(self.board_size)]

        self.players_walls = {agent: self.max_walls for agent in self.agents}

        self.rewards = {agent: 0 for agent in self.agents}
        self._cumulative_rewards = {agent: 0 for agent in self.agents}
        self.terminations = {agent: False for agent in self.agents}
        self.truncations = {agent: False for agent in self.agents}
        self.infos = {agent: {} for agent in self.agents}

        self.horizontal_wall_starts = []
        self.vertical_wall_starts = []

        self._agent_selector = agent_selector(self.agents)
        self.agent_selection = self._agent_selector.next()
        
        self.num_moves = 0
        self.turns_elapsed = 0

    @functools.lru_cache(maxsize=None)
    def observation_space(self, agent):
        return gym.spaces.Dict({
            EnvironmentConstants._OBSERVATION_BOARD: gym.spaces.Box(low=0, high=1, shape=self.obs_shape, dtype=np.float32),
            EnvironmentConstants._OBSERVATION_WALLS: gym.spaces.Box(low=0, high=self.max_walls, shape=(len(self.agents),), dtype=np.float32),
            EnvironmentConstants._OBSERVATION_PLAYERS: gym.spaces.Box(low=0, high=1, shape=(self.num_players,), dtype=np.float32)
        })
        
    def observe(self, agent):
        """
        Observe should return the observation of the specified agent. This function
        should return a sane observation (though not necessarily the most up to date possible)
        at any time after reset() is called.
        """

        # todo avoid a copy each time
        board_copy = self.board.copy()

        row, col = self.players_pos[EnvironmentConstants._PLAYER_0]
        board_copy[EnvironmentConstants._PLAYER_0_POSITION_CHANNEL, row, col] = 1

        row, col = self.players_pos[EnvironmentConstants._PLAYER_1]
        board_copy[EnvironmentConstants._PLAYER_1_POSITION_CHANNEL, row, col] = 1

        row, col = self.players_pos[EnvironmentConstants._PLAYER_2]
        board_copy[EnvironmentConstants._PLAYER_2_POSITION_CHANNEL, row, col] = 1

        one_hot = np.zeros(self.num_players, dtype=np.float32)
        
        if agent in self.agents:
            one_hot[self.agents.index(agent)] = 1

        return {
            EnvironmentConstants._OBSERVATION_BOARD: board_copy,
            EnvironmentConstants._OBSERVATION_WALLS: np.array([self.players_walls[a] for a in self.possible_agents], dtype=np.float32),
            EnvironmentConstants._OBSERVATION_PLAYERS: one_hot
        }

    @functools.lru_cache(maxsize=None)
    def action_space(self, agent):
        # Action space:
        # 0-3: move (left, right, up, down)
        # 4+: place wall (index encoding position + orientation)
        # Walls have a size of 2
        # Horizontal walls can't be put on row 0 nor in the last column
        # Vertical walls can't be put on col 0 nor on the last row
        return gym.spaces.Discrete(4 + (self.board_size - 1) ** 2 + (self.board_size - 1) ** 2)

    def step(self, action):
        agent = self.agent_selection
        
        if (self.terminations[agent] or self.truncations[agent]):
            # handles stepping an agent which is already dead
            # accepts a None action for the one agent, and moves the agent_selection to
            # the next dead agent,  or if there are no more dead agents, to the next live agent
            self._was_dead_step(action)
            return
        
        # the agent which stepped last had its _cumulative_rewards accounted for
        # (because it was returned by last()), so the _cumulative_rewards for this
        # agent should start again at 0
        self._cumulative_rewards[agent] = 0

        # Give rewards        
        if self._agent_selector.is_last():
            # rewards for all agents are placed in the .rewards dictionary
            if self.players_pos[EnvironmentConstants._PLAYER_0][1] == self.board_size - 1:
                self.rewards[EnvironmentConstants._PLAYER_0] += EnvironmentConstants._WIN_GAME_REWARD
                self.terminations[EnvironmentConstants._PLAYER_0] = True

            if self.players_pos[EnvironmentConstants._PLAYER_1][1] == 0:
                self.rewards[EnvironmentConstants._PLAYER_1] += EnvironmentConstants._WIN_GAME_REWARD
                self.terminations[EnvironmentConstants._PLAYER_1] = True

            if (self.num_players == 3
                and self.players_pos[EnvironmentConstants._PLAYER_2][0] == self.board_size - 1):
                self.rewards[EnvironmentConstants._PLAYER_2] += EnvironmentConstants._WIN_GAME_REWARD
                self.terminations[EnvironmentConstants._PLAYER_2] = True

            self.num_moves += 1
            # The truncations dictionary must be updated for all players.
            self.truncations = {
                agent: self.num_moves >= EnvironmentConstants._MAX_TURNS for agent in self.agents
            }

        # Apply action (movement or wall placement)
        if action < 4:
            self._move_pawn(agent, action)
        # else:
            # self._place_wall(agent, action - 4)
        
        self.rewards[agent] += EnvironmentConstants._DEFAULT_STEP_REWARD

        self._accumulate_rewards()

        if not all(self.terminations[a] or self.truncations[a] for a in self.agents):
            self.agent_selection = self._agent_selector.next()

    def render(self):
        pass

    def close(self):
        """
        Close should release any graphical displays, subprocesses, network connections
        or any other environment data which should not be kept around after the
        user is no longer using the environment.
        """
        pass

    def seed(self, seed):
        # https://tianshou.org/en/stable/01_tutorials/00_dqn.html
        self.np_random, _ = seeding.np_random(seed)

    def _move_pawn(self, agent, action):
        """
        Move pawn in direction if allowed.
        action: 0=LEFT, 1=RIGHT, 2=UP, 3=DOWN (relative to rotated view)
        Return: (reward, done)
        """

        r, c = self.players_pos[agent]
        nr, nc = r, c

        # Compute new position based on action
        if action == EnvironmentConstants._ACTION_MOVE_LEFT:
            nc = c - 1
        elif action == EnvironmentConstants._ACTION_MOVE_RIGHT:
            nc = c + 1
        elif action == EnvironmentConstants._ACTION_MOVE_UP:
            nr = r - 1
        elif action == EnvironmentConstants._ACTION_MOVE_DOWN:
            nr = r + 1
        else:
            raise Exception("Invalid action number")

        if not self._is_move_valid(r, c, nr, nc, action):
            self._invalid_action(agent)
            return

        # Move is valid: update position
        self.players_pos[agent] = [nr, nc]

    def _place_wall(self, agent, wall_idx):
        if self.players_walls[agent] == 0:
            self._invalid_action(agent)
            return

        row, col, is_horizontal = self.wall_indexes[wall_idx]

        # horizontal wall
        if (is_horizontal):
            if (row == 0
                or col == self.board_size - 1
                or self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col] == 1
                or (col + 1 < self.board_size and self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col + 1] == 1)
                or ((row - 1, col + 1) in self.vertical_wall_starts)):
                self._invalid_action(agent)
                return

            self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col] = 1

            if col + 1 < self.board_size:
                self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col + 1] = 1

            self.players_walls[agent] -= 1
            self.horizontal_wall_starts.append((row, col))

        # vertical wall
        else:
            if (col == 0
                or row == self.board_size - 1
                or self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row, col] == 1
                or (row + 1 < self.board_size and self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row + 1, col] == 1)
                or ((row + 1, col - 1) in self.horizontal_wall_starts)): # crossing walls
                self._invalid_action(agent)
                return

            self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row, col] = 1

            if row + 1 < self.board_size:
                self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row + 1, col] = 1

            self.players_walls[agent] -= 1
            self.vertical_wall_starts.append((row, col))

        for player in self.agents:
            if not self._is_path_available(player):
                self._invalid_action(agent)

                # Refund
                self.players_walls[agent] += 1

                if is_horizontal:
                    self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col] = 0
                    self.horizontal_wall_starts.remove((row, col))

                    if col + 1 < self.board_size:
                        self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, row, col + 1] = 0
                else:
                    self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row, col] = 0
                    self.vertical_wall_starts.remove((row, col))

                    if row + 1 < self.board_size:
                        self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, row + 1, col] = 0

                return

    def _invalid_action(self, agent):
        self.rewards[agent] += EnvironmentConstants._INVALID_ACTION_REWARD
        self.terminations[agent] = True

    # moving onto another pawn is OK
    def _is_move_valid(self, r, c, nr, nc, action):
        # Check board bounds
        if not (0 <= nr < self.board_size and 0 <= nc < self.board_size):
            return False

        # Check walls blocking movement:
        if action == EnvironmentConstants._ACTION_MOVE_LEFT:
            if self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, r, c] == 1:
                return False

        elif action == EnvironmentConstants._ACTION_MOVE_RIGHT:
            if c + 1 < self.board_size:
                if self.board[EnvironmentConstants._VERTICAL_WALLS_CHANNEL, r, c + 1] == 1:
                    return False
        
        elif action == EnvironmentConstants._ACTION_MOVE_UP:
            if self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, r, c] == 1:
                return False

        elif action == EnvironmentConstants._ACTION_MOVE_DOWN:
            if r + 1 < self.board_size:
                if self.board[EnvironmentConstants._HORIZONTAL_WALLS_CHANNEL, r + 1, c] == 1:
                    return False
        
        return True
    
    def _is_path_available(self, agent):
        """
        Check if there is a valid path for the given agent to their goal using A*.
        """
        start = tuple(self.players_pos[agent])  # Starting position of the agent
        goal_row, goal_col = None, None

        # Define the goal based on the agent
        if agent == EnvironmentConstants._PLAYER_0:  # Goal is the right edge
            goal_col = self.board_size - 1
        elif agent == EnvironmentConstants._PLAYER_1:  # Goal is the left edge
            goal_col = 0
        elif agent == EnvironmentConstants._PLAYER_2:  # Goal is the bottom edge
            goal_row = self.board_size - 1

        # Priority queue for A* (min-heap)
        priority_queue = []
        heapq.heappush(priority_queue, (0, start))  # (priority, position)

        # Cost to reach each position
        g_cost = {start: 0}

        # Visited nodes
        visited = set()

        # Directions: LEFT, RIGHT, UP, DOWN
        directions = {
            EnvironmentConstants._ACTION_MOVE_UP : (-1, 0),
            EnvironmentConstants._ACTION_MOVE_DOWN : (1, 0),
            EnvironmentConstants._ACTION_MOVE_LEFT : (0, -1),
            EnvironmentConstants._ACTION_MOVE_RIGHT : (0, 1)
        }

        while priority_queue:
            _, current = heapq.heappop(priority_queue)
            r, c = current

            # If the goal is reached
            if goal_col is not None and c == goal_col:
                return True
            if goal_row is not None and r == goal_row:
                return True

            if current in visited:
                continue
            visited.add(current)

            # Explore neighbors
            for action, (dr, dc) in directions.items():
                nr, nc = r + dr, c + dc
                
                if not self._is_move_valid(r, c, nr, nc, action):
                    continue

                # Calculate costs
                new_cost = g_cost[current] + 1  # Cost to move to the neighbor
                neighbor = (nr, nc)

                if neighbor not in g_cost or new_cost < g_cost[neighbor]:
                    g_cost[neighbor] = new_cost
                    # Heuristic: Manhattan distance to the goal
                    h_cost = 0

                    if goal_row is not None and goal_col is not None:
                        h_cost = abs(nr - goal_row) + abs(nc - goal_col)
                    elif goal_row is None:
                        h_cost = abs(nc - goal_col)
                    else:
                        h_cost = abs(nr - goal_row)

                    priority = new_cost + h_cost
                    heapq.heappush(priority_queue, (priority, neighbor))

        # If the queue is empty and no path is found
        return False

A constant file

class EnvironmentConstants:
    _INVALID_ACTION_REWARD = -20
    _WIN_GAME_REWARD = 10
    _DEFAULT_STEP_REWARD = -1

    _ACTION_MOVE_LEFT = 0
    _ACTION_MOVE_RIGHT = 1
    _ACTION_MOVE_UP = 2
    _ACTION_MOVE_DOWN = 3

    _PLAYER_0_POSITION_CHANNEL = 0
    _PLAYER_1_POSITION_CHANNEL = 1
    _PLAYER_2_POSITION_CHANNEL = 2
    _HORIZONTAL_WALLS_CHANNEL = 3
    _VERTICAL_WALLS_CHANNEL = 4

    _PLAYER_0 = "player_0"
    _PLAYER_1 = "player_1"
    _PLAYER_2 = "player_2"

    _OBSERVATION_BOARD = "board"
    _OBSERVATION_PLAYERS = "players"
    _OBSERVATION_WALLS = "walls_remaining"

    _MAX_TURNS = 100

And my neural network (if it helps)

import torch
import torch.nn as nn
from ray.rllib.core.rl_module.torch.torch_rl_module import TorchRLModule
from ray.rllib.core.columns import Columns
from CustomEnvironment.EnvironmentConstants import EnvironmentConstants

# See https://docs.ray.io/en/latest/rllib/rl-modules.html
class QuoridorCNN(TorchRLModule):

    _NUM_PLAYER = "num_player"
    _MAX_WALLS = "max_walls"

    def setup(self):
        super().setup()
        self.num_players = self.model_config[self._NUM_PLAYER]
        self.max_walls = self.model_config[self._MAX_WALLS]

        self.conv = nn.Sequential(
            nn.Conv2d(self.observation_space[EnvironmentConstants._OBSERVATION_BOARD].shape[0], 32, 3, padding=1), 
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU()
        )
        # Global pooling to reduce spatial dims
        self.pool = nn.Sequential(
            nn.MaxPool2d(2),
            nn.Flatten()
        )

        scalar_dim = self.num_players *2 # 1 for position, 1 for array
        self.fc = nn.Sequential(
            nn.Linear(128 * 4 *4 + scalar_dim, 256),
            nn.ReLU(),
            nn.Linear(256, self.action_space.n)
        )

    def _forward(self, batch, **kwargs):
        obs = batch[Columns.OBS]
        # device = next(self.parameters()).device
        device = "cpu"

        board = obs[EnvironmentConstants._OBSERVATION_BOARD]
        board = torch.as_tensor(board, dtype=torch.float32, device=device)
        
        x = self.conv(board) # 128x9x9
        x = self.pool(x)

        players = obs[EnvironmentConstants._OBSERVATION_PLAYERS]
        players = torch.as_tensor(players, dtype=torch.float32, device=device)

        walls = obs[EnvironmentConstants._OBSERVATION_WALLS]
        walls = torch.as_tensor(walls, dtype=torch.float32, device=device)
        walls_norm = walls / float(self.max_walls)

        scalars = torch.cat([players, walls_norm], dim=1)

        h = torch.cat([x, scalars], dim=1)
        q = self.fc(h) 

        return {
            Columns.ACTION_DIST_INPUTS: q
        }