Hello everyone,
i am training several agents to set optimal prices in a market environment. The agents are competing against each other. The agent that sets a lower prices gets a higher share of the demand in the market. Hypothesis: The agents will learn to collude and develop strategies to increase the prices in parallel.
When i execute the training i get the following error:
IndexError: index 0 is out of bounds for axis 0 with size 0
Therefore i build the following environment:
import numpy as np
import gymnasium as gym
from scipy.optimize import fsolve
from itertools import product
from ray.rllib.env import MultiAgentEnv
import ray
from ray.tune import registry
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.dqn import DQNConfig, DQN
from gym.spaces import Discrete, Box
from ray import tune
from ray import air
class PricingStrategyEnv(MultiAgentEnv):
def __init__(self):
super().__init__()
# Initialize parameters
self.n_agents = 3
self.c = 1 # Marginal cost for calculating profit
self.a = 5 # perceived quality of a product. necessary for the demand function
self.a0 = 0 # A parameter in the demand function
self.mu = 0.25 # Price sensitivity
self.k = 15 # Number of possible prices
#self.tmax = 1e7 # Maximum time steps. currently not used
self._agent_ids = [0,1,2]
# Derived properties
self.sdim, self.s0 = self.init_state()
self.p_minmax = self.compute_p_competitive_monopoly()
self.A = self.init_actions()
self.PI = self.init_PI()
self.action_space = Discrete(self.k) # Discrete action space for each agent
# self.observation_space = Box(low=np.min(self.PI), high=np.max(self.PI), shape=(self.n_agents,), dtype=np.float32)
self.observation_space = Box(self.k, self.k)
# Initialize state and tracking variables
self.state = self.s0
self.current_step = 0
def demand(self, p):
e = np.exp((self.a - p) / self.mu)
d = e / (np.sum(e) + np.exp(self.a0 / self.mu))
return d
def foc(self, p):
d = self.demand(p)
zero = 1 - (p - self.c) * (1 - d) / self.mu
return np.squeeze(zero)
def foc_monopoly(self, p):
d = self.demand(p)
d1 = np.flip(d)
p1 = np.flip(p)
zero = 1 - (p - self.c) * (1 - d) / self.mu + (p1 - self.c) * d1 / self.mu
return np.squeeze(zero)
def compute_p_competitive_monopoly(self):
p0 = np.ones((1, self.n_agents)) * 3 * self.c
p_competitive = fsolve(self.foc, p0)
p_monopoly = fsolve(self.foc_monopoly, p0)
return p_competitive, p_monopoly
def init_actions(self):
a = np.linspace(min(self.p_minmax[0]), max(self.p_minmax[1]), self.k - 2)
delta = a[1] - a[0]
A = np.linspace(min(a) - delta, max(a) + delta, self.k)
return A
def init_state(self):
sdim = (self.k, self.k, self.k) # for 3 agents
s0 = np.zeros(len(sdim)).astype(int)
return sdim, s0
def compute_profits(self, p):
d = self.demand(p)
pi = (p - self.c) * d
return pi
def init_PI(self):
PI = np.zeros(self.sdim + (self.n_agents,))
for s in product(*[range(i) for i in self.sdim]): # Reason for adding self.k above
p = np.asarray(self.A[np.asarray(s)])
PI[s] = self.compute_profits(p)
return PI
def step(self, action_dict):
print("Current Step:", self.current_step)
print("Received actions:", action_dict)
actions = []
for agent_id, action in action_dict.items():
actual_price = self.A[action] # Map action index to price
actions.append(actual_price)
# Compute prices, demands, and profits
prices = np.array(actions)
demands = self.demand(prices)
profits = self.compute_profits(prices)
# Construct rewards. Rewards are just the same as the profit that an agent gets
rewards = {str(agent_id): profits[agent_id] for agent_id in range(self.n_agents)}
print("Rewards:", rewards)
# Update past prices and rewards
for i, price in enumerate(prices):
self.past_prices[i].append(price)
self.past_rewards[i].append(rewards[str(i)])
self.current_step += 1
done = self.current_step >= self.tmax
# Construct observations for all agents
observations = {}
for agent_id in range(self.n_agents):
observations[agent_id] = self.observe(agent_id)
print("Observations:", observations)
info = {}
return observations, rewards, done, info
def reset(self, **kwargs):
super().reset(seed=kwargs.get('seed')) # Pass 'seed' to Gym's reset function
self.state = self.s0.copy()
self.current_step = 0
# Initialize past price and reward tracking
self.past_prices = [[0.0] for _ in range(self.n_agents)]
self.past_rewards = [[0.0] for _ in range(self.n_agents)]
observations = {}
agent_ids = list(range(self.n_agents)) # Get a list of agent IDs
for agent_id in agent_ids:
observations[agent_id] = self.observe(agent_id)
print("Initial Observations:", observations)
info = {} # Create an empty info dictionary
return observations, info
def observe(self, agent_id):
return {
"own_price": self.past_prices[agent_id][-1] if self.past_prices[agent_id] else 0.0,
"competitor_prices": [self.past_prices[i][-1] for i in range(self.n_agents) if i != agent_id],
"last_reward": self.past_rewards[agent_id][-1] if self.past_rewards[agent_id] else 0.0
}
def close(self):
pass
def render(self, mode='human'):
print(f"Step: {self.current_step}, State: {self.state}")
```python
i want to train with PPO but this does not work and i actually do not know how to interpret the error. The following code is how i train:
```python
registry.register_env("PricingStrategyEnv", lambda config: PricingStrategyEnv())
config = (
PPOConfig()
.environment("PricingStrategyEnv")
.rollouts(num_rollout_workers=2)
.framework("torch")
.training(
lr_schedule=[(0, 0.01)], # Learning rate schedule
clip_param=0.2, # PPO clip parameter
shuffle_sequences=True, # Shuffle sequences in the batch
)
.evaluation(evaluation_num_workers=1)
)
algo = config.build() # Build the algorithm
for _ in range(5):
print(algo.train()) # Train the algorithm
algo.evaluate() # Evaluate the algorithm
```python
when i execute i get the following error: IndexError: index 0 is out of bounds for axis 0 with size 0
Before training the environment can be tested by using the following script. The output looks ok, i guess
```python
# Testing the environment
env = PricingStrategyEnv()
obs, info = env.reset()
for _ in range(3): # Iterate through the step function three times
action_dict = {agent_id: env.action_space.sample() for agent_id in range(env.n_agents)}
obs, rewards, done, info = env.step(action_dict)
if done:
break # Stop iterating if the episode is done
```python