How to define a randomly action taking multi-agent team

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Assume that each agent in the team observes at random steps (as a part of their behavior). This is somewhat similar to the example code in “SometimesZeroAgentsMultiAgent”, but differs from the fact that all the agents must step to reach a non-action taking (or an unavailable) mode. So I changed the example code as follows. But it throws ValueError: Data from episode 688144930894468330 does not show any agent interactions. Hint: Make sure for at least one timestep in the episode, env.step() returns non-empty values.. Not sure what I am doing wrong.

Any help on this would be really appreciated!

Reproduction code:


import argparse
import gymnasium as gym
import numpy as np
import random

import os
from ray.rllib.env.multi_agent_env import MultiAgentEnv

class MockEnv(gym.Env):
    """Mock environment for testing purposes.
    Observation=0, reward=1.0, episode-len is configurable.
    Actions are ignored.
    """

    def __init__(self, episode_length, config=None):
        self.episode_length = episode_length
        self.config = config
        self.i = 0
        self.observation_space = gym.spaces.Discrete(1)
        self.action_space = gym.spaces.Discrete(2)
        self.available = True
        self.unavailable_count = 0

    def reset(self, *, seed=None, options=None):
        self.i = 0
        return 0, {}
        self.available = True
        
    def step(self, action):
        self.i += 1
        
        """agent defined random-availability"""
        self.available = np.random.rand(1) > 0.5
    
        terminated = truncated = self.i >= self.episode_length
        return 0, 1.0, terminated, truncated, {}


class SometimesZeroAgentsMultiAgent(MultiAgentEnv):
    """Multi-agent env in which sometimes, no agent acts."""

    def __init__(self, num=2):
        super().__init__()
        self.num_agents = num
        self._agent_ids = set(range(self.num_agents))
        # self.agents = [MockEnv(25) for _ in range(self.num_agents)]

        self.agents = [MockEnv(25) for i in range(self.num_agents)]
        self._observations = {}
        self._infos = {}
        self.terminateds = set()
        self.truncateds = set()
        self.observation_space = gym.spaces.Discrete(2)
        self.action_space = gym.spaces.Discrete(2)
        self._avail = set()
        self.action_requested = set()
        
    def reset(self, *, seed=None, options=None):
        self.terminateds = set()
        self.truncateds = set()
        self._observations = {}
        self._infos = {}
        for aid in self._get_random_agents():
            self._observations[aid], self._infos[aid] = self.agents[aid].reset()
        return self._observations, self._infos

    def step(self, action_dict):
        rew, terminated, truncated = {}, {}, {}

        """every agent must step"""
        for aid in self._agent_ids:
            if action_dict.get(aid) is not None:
                (
                    self._observations[aid],
                    rew[aid],
                    terminated[aid],
                    truncated[aid],
                    self._infos[aid],
                ) = self.agents[aid].step(action_dict[aid])
            else:
                (_,rew[aid],terminated[aid], truncated[aid], _ ) = self.agents[aid].step(None)
            

            if terminated[aid]:
                self.terminateds.add(aid)
            if truncated[aid]:
                self.truncateds.add(aid)
                    
        terminated["__all__"] = len(self.terminateds) == self.num_agents
        truncated["__all__"] = len(self.truncateds) == self.num_agents

        """get the availability of the agents"""
        aid_avialabe = set()
        for aid in self._agent_ids:
            if self.agents[aid].available:
                aid_avialabe.add(aid)
        
        obs = {}
        infos = {}

        """Here we only require actions for the available agents"""
        for aid in aid_avialabe:
            if aid not in self._observations:
                self._observations[aid] = self.observation_space.sample()
                self._infos[aid] = {"fourty-two": 42}
            obs[aid] = self._observations.pop(aid)
            infos[aid] = self._infos.pop(aid)
            
        """ Override some of the rewards."""
        for aid in self._get_random_agents():
            rew[aid] = np.random.rand()

        return obs, rew, terminated, truncated, infos

    def _get_random_agents(self):
        num_observing_agents = np.random.randint(self.num_agents)
        aids = np.random.permutation(self.num_agents)[:num_observing_agents]
        return {
            aid
            for aid in aids
            if aid not in self.terminateds and aid not in self.truncateds
        }


"""Training script"""

import argparse
import gymnasium as gym
import os

import ray
from ray import air, tune
from multi_ag_env import SometimesZeroAgentsMultiAgent
from ray.tune.registry import get_trainable_cls
from ray.tune.registry import register_env

def get_cli_args():
    """Create CLI parser and return parsed arguments"""
    parser = argparse.ArgumentParser()

    # general args
    parser.add_argument(
        "--run", type=str, default="PPO", help="The RLlib-registered algorithm to use."
    )
    parser.add_argument("--num-cpus", type=int, default=0)
    parser.add_argument(
        "--framework",
        choices=["tf", "tf2", "torch"],
        default="torch",
        help="The DL framework specifier.",
    )
    parser.add_argument("--eager-tracing", action="store_true")
    parser.add_argument(
        "--stop-iters", type=int, default=100, help="Number of iterations to train."
    )
    parser.add_argument(
        "--stop-timesteps",
        type=int,
        default=100000,
        help="Number of timesteps to train.",
    )
    parser.add_argument(
        "--stop-reward",
        type=float,
        default=200.0,
        help="Reward at which we stop training.",
    )
    parser.add_argument(
        "--local-mode",
        action="store_true",
        help="Init Ray in local mode for easier debugging.",
    )

    args = parser.parse_args()
    print(f"Running with following CLI args: {args}")
    return args


if __name__ == "__main__":
    args = get_cli_args()

    ray.init(num_cpus=args.num_cpus or None, local_mode=args.local_mode, log_to_driver=args.local_mode)

    stop = {
        "training_iteration": args.stop_iters,
        "timesteps_total": args.stop_timesteps,
        "episode_reward_mean": args.stop_reward,
    }
    
    register_env(
            "sometimes_zero_agents", lambda _: SometimesZeroAgentsMultiAgent(num=2)
        )

    # TODO (Artur): in PPORLModule vf_share_layers = True is broken in tf2. fix it.
    vf_share_layers = not bool(os.environ.get("RLLIB_ENABLE_RL_MODULE", False))
    config = (
        get_trainable_cls(args.run)
        .get_default_config()
        .environment(env="sometimes_zero_agents")
        .resources(
            # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
            num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0")),
        )
        .training(train_batch_size=1024, model={"vf_share_layers": vf_share_layers})
        .rollouts(num_rollout_workers=5, rollout_fragment_length="auto")
        .framework(args.framework, eager_tracing=args.eager_tracing)
        .multi_agent(
            # Use a simple set of policy IDs. Spaces for the individual policies
            # will be inferred automatically using reverse lookup via the
            # `policy_mapping_fn` and the env provided spaces for the different
            # agents. Alternatively, you could use:
            # policies: {main0: PolicySpec(...), main1: PolicySpec}
            policies={"main0", "main1", "main2", "main3", "main4"},
            # Simple mapping fn, mapping agent0 to main0 and agent1 to main1.
            policy_mapping_fn=(lambda aid, episode, worker, **kw: f"main{aid}"),
            # Only train main0.
            policies_to_train=["main0"],
        )
    )

    results = tune.Tuner(
        args.run,
        run_config=air.RunConfig(
            stop=stop,
        ),
        param_space=config,
    ).fit()

    if not results:
        raise ValueError(
            "No results returned from tune.run(). Something must have gone wrong."
        )
    ray.shutdown()

Error:

ray.exceptions.RayTaskError(ValueError): ray::RolloutWorker.apply() (pid=278561, ip=192.168.1.188, repr=<ray.rllib.evaluation.rollout_worker._modify_class.<locals>.Class object at 0x7f2af52e4340>)
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/utils/actor_manager.py", line 185, in apply
    raise e
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/utils/actor_manager.py", line 176, in apply
    return func(self, *args, **kwargs)
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/execution/rollout_ops.py", line 86, in <lambda>
    lambda w: w.sample(), local_worker=False, healthy_only=True
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/rollout_worker.py", line 915, in sample
    batches = [self.input_reader.next()]
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/sampler.py", line 92, in next
    batches = [self.get_data()]
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/sampler.py", line 277, in get_data
    item = next(self._env_runner)
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/env_runner_v2.py", line 323, in run
    outputs = self.step()
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/env_runner_v2.py", line 349, in step
    active_envs, to_eval, outputs = self._process_observations(
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/env_runner_v2.py", line 667, in _process_observations
    self._handle_done_episode(
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/env_runner_v2.py", line 820, in _handle_done_episode
    self.end_episode(env_id, episode_or_exception)
  File "/home/malintha/anaconda3/envs/marl/lib/python3.9/site-packages/ray/rllib/evaluation/env_runner_v2.py", line 945, in end_episode
    raise ValueError(msg)
ValueError: Data from episode 27926846350290456 does not show any agent interactions. Hint: Make sure for at least one timestep in the episode, env.step() returns non-empty values.

Thanks in advance!