NaNs in reward fields in `results` dict

How severe does this issue affect your experience of using Ray?
High: It blocks me to complete my task.

I’m new to Ray and I’m at an impasse with respect to getting usable results. I am trying to create a simple multi-agent economic model where agents can buy, sell, or hold some asset using PettingZoo (and therefore PettingZooEnv). I have a simple environment that returns a reward based on the agent’s action and the current price of the asset, and a simple default policy that returns a random action (only one agent is trainable).

My code (it’s long, apologies in advance):

from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from gymnasium.spaces import Box, Discrete
import numpy as np
from ray import tune, air
import os
import ray
from ray.rllib.policy.policy import Policy
from ray.rllib.env import PettingZooEnv
from ray.tune.registry import register_env
from ray.rllib.utils import check_env
from ray.rllib.algorithms.ppo import PPO, PPOConfig
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.policy.policy import PolicySpec
from scipy.stats import norm
from math import sqrt


ray.init()


class BrownianMotion:
    def __init__(self, x0, dt, delta):
        """
        Initialize the Brownian motion generator.

        Arguments:
        x0 : float or numpy array
            The initial condition(s) (i.e., position(s)) of the Brownian motion.
        dt : float
            The time step.
        delta : float
            Delta determines the "speed" of the Brownian motion.
        """
        self.x0 = np.asarray(x0)
        self.dt = dt
        self.delta = delta
        self.current_position = np.copy(np.float64(self.x0))

    def next(self):
        """
        Generate the next step in the Brownian motion.

        Returns:
        A numpy array representing the next position in the Brownian motion.
        """
        r = norm.rvs(size=self.x0.shape, scale=self.delta * sqrt(self.dt))
        self.current_position += r
        return self.current_position


class Agent:
    def __init__(self, id):
        self.id = id
        # self.action_space = None
        # self.observation_space = None
        self.amount_of_asset = 0
        self.buy_price = 0
        self.total_profit = 0
        self.steps = 0

    def step(self, action, price):
        """
        Is this even necessary?
        """
        print(f"agent {self.id} stepping with action {action}")
        if action == 0:
            self.buy(1, price)

        elif action == 1:
            self.sell(1, price)

        elif action == 2:
            self.hold()

        self.steps += 1

    # raise NotImplementedError

    def reset(self):
        """
        This method should be implemented to reset the agent's state.
        """
        self.amount_of_asset = 0
        print(f"resetting agent {self.id}")
        # raise NotImplementedError

    def render(self):
        """
        This method should be implemented to render the agent's current state.
        """
        print(f"rendering agent {self.id}:\n\tamount of asset: {self.amount_of_asset}")
        # raise NotImplementedError

    def buy(self, amount, price):
        """
        This method should be implemented to handle the "buy" action.
        """
        if self.amount_of_asset < 1:
            self.amount_of_asset += amount
            self.buy_price = price
            print(f"agent {self.id} buying {amount}")
        else:
            print(f"agent {self.id} buying {amount} (already has asset)")

        # raise NotImplementedError

    def sell(self, amount, price):
        """
        This method should be implemented to handle the "sell" action.
        """
        if self.amount_of_asset >= amount:
            self.amount_of_asset -= amount
            self.buy_price = 0
            self.total_profit += self.buy_price - price
            print(f"agent {self.id} selling {amount}")
        else:
            self.amount_of_asset = 0
            print(f"agent {self.id} selling {amount} (not enough asset)")
        # raise NotImplementedError

    def hold(self):
        """
        This method should be implemented to handle the "hold" action.
        """
        print(f"agent {self.id} holding with {self.amount_of_asset} asset")
        # raise NotImplementedError


class RandomAction(Policy):
    """Pick a random move each time."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exploration = self._create_exploration()

    def compute_actions(
        self,
        obs_batch,
        state_batches=None,
        prev_action_batch=None,
        prev_reward_batch=None,
        info_batch=None,
        episodes=None,
        **kwargs,
    ):
        def random_action(x):
            return np.random.randint(0, 3)

        return [random_action(x) for x in obs_batch], [], {}

    def learn_on_batch(self, samples):
        pass

    def get_weights(self):
        pass

    def set_weights(self, weights):
        pass


class MARL(AECEnv):
    def __init__(self, num_agents=10):
        self._num_agents = num_agents
        self._none = 4  # num actions (buy, sell, hold, None)

        self.metadata = {"render.modes": ["human"]}
        self.possible_agents = [f"agent_{i}" for i in range(num_agents)]
        self.agents = self.possible_agents[:]
        self.actual_agents = {
            agent: Agent(i) for i, agent in enumerate(self.possible_agents)
        }

        # observations are the price of the asset, the buy price of the agent, and the total profit of the agent
        # actions are buy, sell, hold and None
        self.observation_spaces = dict(
            zip(
                self.agents,
                [Box(low=-1e18, high=1e18, shape=(3,)) for _ in range(num_agents)],
            )
        )
        self.action_spaces = dict(
            zip(self.agents, [Discrete(4) for _ in range(num_agents)])
        )
        self.terminations = dict(zip(self.agents, [False for _ in range(num_agents)]))
        self.terminations.update({"__all__": False})  # ugh
        self.truncations = dict(zip(self.agents, [False for _ in range(num_agents)]))
        self.rewards = dict(zip(self.agents, [0 for _ in range(num_agents)]))
        self._cumulative_rewards = dict(
            zip(self.agents, [0 for _ in range(num_agents)])
        )
        self.infos = dict(zip(self.agents, [{} for _ in range(num_agents)]))
        self._agent_selector = agent_selector(self.agents)
        self.market = None  # Create a market object here

        self._price_process = BrownianMotion(1000, 1, 0.2)
        self._asset_price = 1000
        print(self.terminations)

    def step(self, action):
        # Update environment state, calculate reward, check if episode is over, etc.
        current_agent = self.agent_selection
        print(f"agent {current_agent} : observation {self.observations[current_agent]}")

        self.actual_agents[current_agent].step(action, self._asset_price)

        ##### I thought this would work but it doesn't #####
        # if self.actual_agents[current_agent].steps >= 10:
        #     # self.agents.remove(current_agent)
        #     self.terminations[current_agent] = True
        # # self.terminations["__all__"] = len(self.agents) == 0
        # self.terminations["__all__"] = all(self.terminations.values())


        self.observations = {
            agent: np.array(
                [
                    self._asset_price,
                    self.actual_agents[agent].buy_price,
                    self.actual_agents[agent].total_profit,
                ],
                dtype=np.float32,
            )
            for agent in self.agents
        }
        self.rewards[current_agent] = self.actual_agents[current_agent].total_profit # TODO: this is not correct but simple enough for testing

        self.agent_selection = self._agent_selector.next()  # Determine the next agent
        self._asset_price = self._price_process.next()  # get the next price

        self._accumulate_rewards()

    def reset(self, seed=None, options=None):
        # Reset logic
        self.agents = self.possible_agents[:]
        self._agent_selector.reinit(self.agents)
        self.agent_selection = self._agent_selector.next()

        for k, v in self.rewards.items():
            self.rewards[k] = 0
        for k, v in self._cumulative_rewards.items():
            self._cumulative_rewards[k] = 0
        for k, v in self.terminations.items():
            self.terminations[k] = False
        for k, v in self.truncations.items():
            self.truncations[k] = False
        for k, v in self.infos.items():
            self.infos[k] = {}

        # self.state = {agent: self. for agent in self.agents}
        self._asset_price = self._price_process.next()
        self.observations = {
            agent: np.array(
                [
                    self._asset_price,
                    self.actual_agents[agent].buy_price,
                    self.actual_agents[agent].total_profit,
                ],
                dtype=np.float32,
            )
            for agent in self.agents
        }

    def observe(self, agent):
        # observation logic to update the observations for each agent
        return self.observations[agent]

    def render(self, mode="human"):
        l = []
        for agent in self.agents:
            l.append(agent.render())
        return l

    def state():
        return np.zeros((1, 1))


# Wrap the environment with the PettingZooEnv class
def env_creator(env_config):
    num_agents = env_config.get("num_agents", 10)  # Default to 1 agent if not specified
    env = MARL(num_agents)
    return PettingZooEnv(env)


register_env("my_pettingzoo_env", env_creator)
check_env(env_creator({"num_agents": 10}))


def select_policy(agent_id, episode, **kwargs):
    if agent_id == "player_0":
        return "learned"
    else:
        return "default_policy"


config = (
    (PPOConfig())
    .environment("my_pettingzoo_env")
    .framework("tf")
    .rollouts(
        num_rollout_workers=0,
        num_envs_per_worker=4,
    )
    .multi_agent(
        policies={
            "default_policy": PolicySpec(policy_class=RandomAction),
            "learned": PolicySpec(
                config=AlgorithmConfig.overrides(
                    model={"use_lstm": True},
                    framework_str="tf",
                )
            ),
        },
        policies_to_train=["learned"],
    )
    .resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0")))
)
config.train_batch_size = 1000

# run it
stop = {
    # "training_iteration": 1000,
    "timesteps_total": 100,
    # "episode_reward_mean": 10000,
}
results = tune.Tuner(
    "PPO", param_space=config, run_config=air.RunConfig(stop=stop, verbose=1)
).fit()
# runs but returns nans

# algo = config.build() # runs but returns nan in sampler_results rewards keys

# appears most promising (easy to understand) but returns nans
# results = tune.run(
#     "PPO",
#     stop={"timesteps_total": 10},
#     config=config,
#     num_samples=5,  # Set the number of trials here
# )

{'20765_00000': {'custom_metrics': {},
  'episode_media': {},
  'info': {'learner': {},
   'num_env_steps_sampled': 128,
   'num_env_steps_trained': 128,
   'num_agent_steps_sampled': 92,
   'num_agent_steps_trained': 92},
  'sampler_results': {'episode_reward_max': nan,
   'episode_reward_min': nan,
   'episode_reward_mean': nan,
   'episode_len_mean': nan,
...

From my reading I think this has to do with not ending the episode but I’m not sure how to do that correctly. I tried setting the __all__ termination to True but that didn’t work. I also tried removing the agent from the list of agents but that didn’t work either (modulo 0 error). I’m not sure what else to try. Any help would be greatly appreciated.

I had this happen in a custom environment, there were a couple things that you might check.

  1. Check that your environment is properly resetting. I had some weird reset bug in mine that just caused every episode after a certain occurrence to only last 1 step and never returned any rewards.
  2. Check to make sure that you don’t have any values in your observation vectors that would return nan’s. Remember these algorithms just inputs and outputs - if you have something like an infinite value or a value divided by zero in your observation space then the algorithm you use will likely spit out nan. Even it it only spits out a single nan value, it’ll corrupt the rest of your metric calculations.
  3. If your environment is like mine and provides it’s primary reward at the end of an episode but the episode length varies, you may want to turn batch_mode to "complete_episodes". I found that to help me eliminate lots of noise in training. Though, notably, this won’t help you specifically with the nan issue I don’t think.

Hope these help!

1 Like

Thanks for this, Brendan.

I ended up just migrating it out of a PZ environment and in a further simplified form using MultiAgentEnv, it seems to work just fine barring my other struggles. Thanks for the heads up about complete_episodes, I’m sure I’ll run into that at some point.