Multi Agent Prioritized Replay Buffer giving me trouble in DQN

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m trying to run a DQN + LSTM algorithm after looking at some example scripts (especially this one: Replay Buffers — Ray 2.42.1)
but it’s giving me an error.
My code:

# now let's try to make a DRQN or R2D2 and see if I can get similar performance on env2.1
# register env on ray tune
# let's see if I can recreate the trained DQN performance score on env2.1 (no partial observation)
import gymnasium as gym
from env2_1gym import Env2_1gym
import numpy as np
import random
from ray.rllib.connectors.env_to_module import FlattenObservations
from ray.rllib.algorithms.dqn.dqn import DQNConfig
from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig
from ray.rllib.utils.replay_buffers.replay_buffer import StorageUnit
from scheduler import lrscheduling, epsilonscheduling
import torch
from ray.tune.registry import register_env

# register environment
def env_creator(config):
    return Env2_1gym(config)  # Return a gymnasium.Env instance.
register_env("Env2_1", env_creator)

lrschedule = lrscheduling(init=0.01, rate = 0.99995,type='exponential')
epsilonschedule = epsilonscheduling(init=0.1, rate = 0.01, type='constant')

config = (
    DQNConfig()
    .environment("Env2_1",
                env_config={"initstate": [-1, -1, -1, -1, -1, -1], "parameterization_set": 2, "discretization_set": 0})
    .env_runners(num_env_runners=1)
    .framework("torch")
    .training(dueling =False,
              lr = lrschedule, # [(0, 0.01), (1000, 0.0001)],
              epsilon = epsilonschedule, #[(0, 0.1), (1000, 0.01)],
              gamma = 0.99,
              replay_buffer_config={
                "type": "MultiAgentPrioritizedReplayBuffer",
                "capacity": 1000,
                "alpha": 0.5,
                "storage_unit": StorageUnit.SEQUENCES,
                "replay_sequence_length": 20,
                "replay_zero_init_states": True,
                },
              train_batch_size=100,
              num_steps_sampled_before_learning_starts = 100, 
              training_intensity = 7,
              target_network_update_freq=15,
              td_error_loss_fn = 'mse',
              #model=dict(use_lstm=True, lstm_cell_size=64, max_seq_len=20)
              )
    .rl_module(
        model_config=DefaultModelConfig(
        fcnet_hiddens=[30],
        fcnet_activation="relu",
        use_lstm=True,
        max_seq_len=20,
        lstm_cell_size=64
    ))
    .resources(
        num_gpus = 0
    )
    .evaluation(
        evaluation_interval = 14,
        evaluation_duration = 1000,
        evaluation_num_env_runners = 4,
    )
)
algo = config.build()
foo = algo.train()

The error text:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
Cell In[4], line 69
     24 config = (
     25     DQNConfig()
     26     .environment("Env2_1",
   (...)
     66     #.debugging(seed=12)
     67 )
     68 algo = config.build()
---> 69 foo = algo.train()

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\tune\trainable\trainable.py:331, in Trainable.train(self)
    329 except Exception as e:
    330     skipped = skip_exceptions(e)
--> 331     raise skipped from exception_cause(skipped)
    333 assert isinstance(result, dict), "step() needs to return a dict."
    335 # We do not modify internal state nor update this result if duplicate.

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\tune\trainable\trainable.py:328, in Trainable.train(self)
    326 start = time.time()
    327 try:
--> 328     result = self.step()
    329 except Exception as e:
    330     skipped = skip_exceptions(e)

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\rllib\algorithms\algorithm.py:1022, in Algorithm.step(self)
   1017 # - No evaluation necessary, just run the next training iteration.
   1018 # - We have to evaluate in this training iteration, but no parallelism ->
   1019 #   evaluate after the training iteration is entirely done.
   1020 else:
   1021     if self.config.enable_env_runner_and_connector_v2:
-> 1022         train_results, train_iter_ctx = self._run_one_training_iteration()
   1023     else:
   1024         (
   1025             train_results,
   1026             train_iter_ctx,
   1027         ) = self._run_one_training_iteration_old_api_stack()

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\rllib\algorithms\algorithm.py:3382, in Algorithm._run_one_training_iteration(self)
   3380 # Try to train one step.
   3381 with self.metrics.log_time((TIMERS, TRAINING_STEP_TIMER)):
-> 3382     training_step_return_value = self.training_step()
   3383     has_run_once = True
   3385 # On the new API stack, results should NOT be returned anymore as
   3386 # a dict, but purely logged through the `MetricsLogger` API. This
   3387 # way, we make sure to never miss a single stats/counter/timer
   3388 # when calling `self.training_step()` more than once within the same
   3389 # iteration.

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\rllib\algorithms\dqn\dqn.py:631, in DQN.training_step(self)
    628     return self._training_step_old_api_stack()
    630 # New API stack (RLModule, Learner, EnvRunner, ConnectorV2).
--> 631 return self._training_step_new_api_stack()

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\rllib\algorithms\dqn\dqn.py:655, in DQN._training_step_new_api_stack(self)
    653     # Add the sampled experiences to the replay buffer.
    654     with self.metrics.log_time((TIMERS, REPLAY_BUFFER_ADD_DATA_TIMER)):
--> 655         self.local_replay_buffer.add(episodes)
    657 if self.config.count_steps_by == "agent_steps":
    658     current_ts = sum(
    659         self.metrics.peek(
    660             (ENV_RUNNER_RESULTS, NUM_AGENT_STEPS_SAMPLED_LIFETIME), default={}
    661         ).values()
    662     )

File c:\Users\Hyun\anaconda3\Lib\site-packages\ray\rllib\utils\replay_buffers\multi_agent_replay_buffer.py:224, in MultiAgentReplayBuffer.add(self, batch, **kwargs)
    222 batch = batch.copy()
    223 # Handle everything as if multi-agent.
--> 224 batch = batch.as_multi_agent()
    226 with self.add_batch_timer:
    227     pids_and_batches = self._maybe_split_into_policy_batches(batch)

AttributeError: 'list' object has no attribute 'as_multi_agent'

The ‘batch’ variable goes into the add function just as a list and not a samplebatch class object, and therefore the as_multi_agent function is not working.

My environment is just a single agent environment, but the documents were suggesting that I should use multi-agent replay buffer regardless. The example in the link above also uses the multi-agent buffer for a cart-pole environment.

Is there anything I’m missing in the code?

Thank you