How severe does this issue affect your experience of using Ray?
High: It blocks me to complete my task.
I’m new to Ray and I’m at an impasse with respect to getting usable results. I am trying to create a simple multi-agent economic model where agents can buy, sell, or hold some asset using PettingZoo (and therefore PettingZooEnv). I have a simple environment that returns a reward based on the agent’s action and the current price of the asset, and a simple default policy that returns a random action (only one agent is trainable).
My code (it’s long, apologies in advance):
from pettingzoo import AECEnv
from pettingzoo.utils import agent_selector
from gymnasium.spaces import Box, Discrete
import numpy as np
from ray import tune, air
import os
import ray
from ray.rllib.policy.policy import Policy
from ray.rllib.env import PettingZooEnv
from ray.tune.registry import register_env
from ray.rllib.utils import check_env
from ray.rllib.algorithms.ppo import PPO, PPOConfig
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.policy.policy import PolicySpec
from scipy.stats import norm
from math import sqrt
ray.init()
class BrownianMotion:
def __init__(self, x0, dt, delta):
"""
Initialize the Brownian motion generator.
Arguments:
x0 : float or numpy array
The initial condition(s) (i.e., position(s)) of the Brownian motion.
dt : float
The time step.
delta : float
Delta determines the "speed" of the Brownian motion.
"""
self.x0 = np.asarray(x0)
self.dt = dt
self.delta = delta
self.current_position = np.copy(np.float64(self.x0))
def next(self):
"""
Generate the next step in the Brownian motion.
Returns:
A numpy array representing the next position in the Brownian motion.
"""
r = norm.rvs(size=self.x0.shape, scale=self.delta * sqrt(self.dt))
self.current_position += r
return self.current_position
class Agent:
def __init__(self, id):
self.id = id
# self.action_space = None
# self.observation_space = None
self.amount_of_asset = 0
self.buy_price = 0
self.total_profit = 0
self.steps = 0
def step(self, action, price):
"""
Is this even necessary?
"""
print(f"agent {self.id} stepping with action {action}")
if action == 0:
self.buy(1, price)
elif action == 1:
self.sell(1, price)
elif action == 2:
self.hold()
self.steps += 1
# raise NotImplementedError
def reset(self):
"""
This method should be implemented to reset the agent's state.
"""
self.amount_of_asset = 0
print(f"resetting agent {self.id}")
# raise NotImplementedError
def render(self):
"""
This method should be implemented to render the agent's current state.
"""
print(f"rendering agent {self.id}:\n\tamount of asset: {self.amount_of_asset}")
# raise NotImplementedError
def buy(self, amount, price):
"""
This method should be implemented to handle the "buy" action.
"""
if self.amount_of_asset < 1:
self.amount_of_asset += amount
self.buy_price = price
print(f"agent {self.id} buying {amount}")
else:
print(f"agent {self.id} buying {amount} (already has asset)")
# raise NotImplementedError
def sell(self, amount, price):
"""
This method should be implemented to handle the "sell" action.
"""
if self.amount_of_asset >= amount:
self.amount_of_asset -= amount
self.buy_price = 0
self.total_profit += self.buy_price - price
print(f"agent {self.id} selling {amount}")
else:
self.amount_of_asset = 0
print(f"agent {self.id} selling {amount} (not enough asset)")
# raise NotImplementedError
def hold(self):
"""
This method should be implemented to handle the "hold" action.
"""
print(f"agent {self.id} holding with {self.amount_of_asset} asset")
# raise NotImplementedError
class RandomAction(Policy):
"""Pick a random move each time."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.exploration = self._create_exploration()
def compute_actions(
self,
obs_batch,
state_batches=None,
prev_action_batch=None,
prev_reward_batch=None,
info_batch=None,
episodes=None,
**kwargs,
):
def random_action(x):
return np.random.randint(0, 3)
return [random_action(x) for x in obs_batch], [], {}
def learn_on_batch(self, samples):
pass
def get_weights(self):
pass
def set_weights(self, weights):
pass
class MARL(AECEnv):
def __init__(self, num_agents=10):
self._num_agents = num_agents
self._none = 4 # num actions (buy, sell, hold, None)
self.metadata = {"render.modes": ["human"]}
self.possible_agents = [f"agent_{i}" for i in range(num_agents)]
self.agents = self.possible_agents[:]
self.actual_agents = {
agent: Agent(i) for i, agent in enumerate(self.possible_agents)
}
# observations are the price of the asset, the buy price of the agent, and the total profit of the agent
# actions are buy, sell, hold and None
self.observation_spaces = dict(
zip(
self.agents,
[Box(low=-1e18, high=1e18, shape=(3,)) for _ in range(num_agents)],
)
)
self.action_spaces = dict(
zip(self.agents, [Discrete(4) for _ in range(num_agents)])
)
self.terminations = dict(zip(self.agents, [False for _ in range(num_agents)]))
self.terminations.update({"__all__": False}) # ugh
self.truncations = dict(zip(self.agents, [False for _ in range(num_agents)]))
self.rewards = dict(zip(self.agents, [0 for _ in range(num_agents)]))
self._cumulative_rewards = dict(
zip(self.agents, [0 for _ in range(num_agents)])
)
self.infos = dict(zip(self.agents, [{} for _ in range(num_agents)]))
self._agent_selector = agent_selector(self.agents)
self.market = None # Create a market object here
self._price_process = BrownianMotion(1000, 1, 0.2)
self._asset_price = 1000
print(self.terminations)
def step(self, action):
# Update environment state, calculate reward, check if episode is over, etc.
current_agent = self.agent_selection
print(f"agent {current_agent} : observation {self.observations[current_agent]}")
self.actual_agents[current_agent].step(action, self._asset_price)
##### I thought this would work but it doesn't #####
# if self.actual_agents[current_agent].steps >= 10:
# # self.agents.remove(current_agent)
# self.terminations[current_agent] = True
# # self.terminations["__all__"] = len(self.agents) == 0
# self.terminations["__all__"] = all(self.terminations.values())
self.observations = {
agent: np.array(
[
self._asset_price,
self.actual_agents[agent].buy_price,
self.actual_agents[agent].total_profit,
],
dtype=np.float32,
)
for agent in self.agents
}
self.rewards[current_agent] = self.actual_agents[current_agent].total_profit # TODO: this is not correct but simple enough for testing
self.agent_selection = self._agent_selector.next() # Determine the next agent
self._asset_price = self._price_process.next() # get the next price
self._accumulate_rewards()
def reset(self, seed=None, options=None):
# Reset logic
self.agents = self.possible_agents[:]
self._agent_selector.reinit(self.agents)
self.agent_selection = self._agent_selector.next()
for k, v in self.rewards.items():
self.rewards[k] = 0
for k, v in self._cumulative_rewards.items():
self._cumulative_rewards[k] = 0
for k, v in self.terminations.items():
self.terminations[k] = False
for k, v in self.truncations.items():
self.truncations[k] = False
for k, v in self.infos.items():
self.infos[k] = {}
# self.state = {agent: self. for agent in self.agents}
self._asset_price = self._price_process.next()
self.observations = {
agent: np.array(
[
self._asset_price,
self.actual_agents[agent].buy_price,
self.actual_agents[agent].total_profit,
],
dtype=np.float32,
)
for agent in self.agents
}
def observe(self, agent):
# observation logic to update the observations for each agent
return self.observations[agent]
def render(self, mode="human"):
l = []
for agent in self.agents:
l.append(agent.render())
return l
def state():
return np.zeros((1, 1))
# Wrap the environment with the PettingZooEnv class
def env_creator(env_config):
num_agents = env_config.get("num_agents", 10) # Default to 1 agent if not specified
env = MARL(num_agents)
return PettingZooEnv(env)
register_env("my_pettingzoo_env", env_creator)
check_env(env_creator({"num_agents": 10}))
def select_policy(agent_id, episode, **kwargs):
if agent_id == "player_0":
return "learned"
else:
return "default_policy"
config = (
(PPOConfig())
.environment("my_pettingzoo_env")
.framework("tf")
.rollouts(
num_rollout_workers=0,
num_envs_per_worker=4,
)
.multi_agent(
policies={
"default_policy": PolicySpec(policy_class=RandomAction),
"learned": PolicySpec(
config=AlgorithmConfig.overrides(
model={"use_lstm": True},
framework_str="tf",
)
),
},
policies_to_train=["learned"],
)
.resources(num_gpus=int(os.environ.get("RLLIB_NUM_GPUS", "0")))
)
config.train_batch_size = 1000
# run it
stop = {
# "training_iteration": 1000,
"timesteps_total": 100,
# "episode_reward_mean": 10000,
}
results = tune.Tuner(
"PPO", param_space=config, run_config=air.RunConfig(stop=stop, verbose=1)
).fit()
# runs but returns nans
# algo = config.build() # runs but returns nan in sampler_results rewards keys
# appears most promising (easy to understand) but returns nans
# results = tune.run(
# "PPO",
# stop={"timesteps_total": 10},
# config=config,
# num_samples=5, # Set the number of trials here
# )
{'20765_00000': {'custom_metrics': {},
'episode_media': {},
'info': {'learner': {},
'num_env_steps_sampled': 128,
'num_env_steps_trained': 128,
'num_agent_steps_sampled': 92,
'num_agent_steps_trained': 92},
'sampler_results': {'episode_reward_max': nan,
'episode_reward_min': nan,
'episode_reward_mean': nan,
'episode_len_mean': nan,
...
From my reading I think this has to do with not ending the episode but I’m not sure how to do that correctly. I tried setting the __all__
termination to True
but that didn’t work. I also tried removing the agent from the list of agents but that didn’t work either (modulo 0 error). I’m not sure what else to try. Any help would be greatly appreciated.