I am a beginner in rllib. I imitated the format of the sample code and wrote the following custom multi-agent environment. But when I try to run this environment with RL algorithms, different errors will appear on different compilers, google colab, spyder… Based on the revision opinions of those compilers, I tried to modify it many times, but it still failed. Could someone please help me to see what part of my code is wrong?
My code:
from gym import Env, logger
from gym.spaces import Discrete, Tuple, Box
from ray.rllib.env import MultiAgentEnv
from gym.utils import colorize, seeding
import sys
from contextlib import closing
import numpy as np
from io import StringIO
import matplotlib.pyplot as plt
import pickle
import pandas as pd
from scipy import optimize
import warnings
class BertrandCompetitionContinuousEnv(MultiAgentEnv):
metadata = {‘render.modes’: [‘human’]}
def __init__(
self,
num_agents = 2,
c_i = 1,
a_minus_c_i = 1,
a_0 = 0,
mu = 0.25,
delta = 0.95,
xi = 0.1,
k = 1,
max_steps=200,
sessions=1,
):
super(BertrandCompetitionContinuousEnv, self).__init__()
self.num_agents = num_agents
# Length of Memory
self.k = k
# Marginal Cost
self.c_i = c_i
# Product Quality Indexes
a = np.array([c_i + a_minus_c_i] * num_agents)
self.a = a
# Product Quality Index: Outside Good
self.a_0 = a_0
# Index of Horizontal Differentiation
self.mu = mu
# Nash Equilibrium Price
def nash_func(p):
''' Derviative for demand function '''
denominator = np.exp(a_0 / mu)
for i in range(num_agents):
denominator += np.exp((a[i] - p[i]) / mu)
function_list = []
for i in range(num_agents):
term = np.exp((a[i] - p[i]) / mu)
first_term = term / denominator
second_term = (np.exp((2 * (a[i] - p[i])) / mu) * (-c_i + p[i])) / ((denominator ** 2) * mu)
third_term = (term * (-c_i + p[i])) / (denominator * mu)
function_list.append((p[i] - c_i) * (first_term + second_term - third_term))
return function_list
# Finding root of derivative for demand function
nash_sol = optimize.root(nash_func, [2] * num_agents)
self.pN = nash_sol.x[0]
print('Nash Price:', self.pN)
# Monopoly Equilibrium Price
def monopoly_func(p):
return -(p[0] - c_i) * self.demand(self.a, p, self.mu, 0)
monopoly_sol = optimize.minimize(monopoly_func, 0)
self.pM = monopoly_sol.x[0]
print('Monopoly Price:', self.pM)
self.low_price = self.pN - xi * (self.pM - self.pN)
self.high_price = self.pM + xi * (self.pM - self.pN)
act_space = Box(np.array([self.low_price]), np.array([self.high_price]),dtype=np.float32)
# MultiAgentEnv Action and Observation Space
self.agents = ['agent_' + str(i) for i in range(num_agents)]
self.observation_space = {}
self.action_space = {}
if k > 0:
self.numeric_low = np.array([self.low_price] * (k * num_agents))
numeric_high = np.array([self.high_price] * (k * num_agents))
obs_space = Box(self.numeric_low, numeric_high, dtype=np.float32)
else:
self.numeric_low = np.array([self.low_price] * num_agents)
numeric_high = np.array([self.high_price] * num_agents)
obs_space = Box(self.numeric_low, numeric_high, dtype=np.float32)
for agent in self.agents:
self.observation_space[agent] = obs_space
self.action_space[agent] = act_space
self.reward_range = (-float('inf'), float('inf'))
self.current_step = None
self.max_steps = max_steps
self.sessions = sessions
self.action_history = {}
for agent in self.agents:
if agent not in self.action_history:
self.action_history[agent] = [self.action_space[agent].sample()[0]]
self.reset()
def demand(self, a, p, mu, agent_idx):
''' Demand as a function of product quality indexes, price, and mu. '''
return np.exp((a[agent_idx] - p[agent_idx]) / mu) / (np.sum(np.exp((a - p) / mu)) + np.exp(self.a_0 / mu))
def step(self, actions_dict):
''' MultiAgentEnv Step '''
actions_list = np.array(list(actions_dict.values())).flatten()
for i in range(self.num_agents):
self.action_history[self.agents[i]].append(actions_list[i])
if self.k > 0:
obs_agents = np.array([self.action_history[self.agents[i]][-self.k:] for i in range(self.num_agents)], dtype=object).flatten()
obs = dict(zip(self.agents, [obs_agents for i in range(self.num_agents)]))
else:
obs = dict(zip(self.agents, [self.numeric_low for _ in range(self.num_agents)]))
rew = np.array([0.0] * self.num_agents)
self.prices = actions_list[:self.num_agents]
for i in range(self.num_agents):
rew[i] = (self.prices[i] - self.c_i) * self.demand(self.a, self.prices, self.mu, i)
# reward[i] = (self.prices[i] - self.c_i) * demand[i]
rew = dict(zip(self.agents, rew))
terminated = {'__all__': self.current_step == self.max_steps}
truncated = {'__all__': self.current_step == self.max_steps}
info = dict(zip(self.agents, [{} for _ in range(self.num_agents)]))
self.current_step += 1
return obs, rew, terminated, truncated, info
def one_step(self):
step_actions_dict = {}
for agent in self.agents:
step_actions_dict[agent] = self.action_history[agent][-1]
observation, _, _, _, _ = self.step(step_actions_dict)
return observation
def deviate(self, direction='down'):
deviate_actions_dict = {}
if direction == 'down':
# First agent deviates to lowest price
deviate_actions_dict[self.agents[0]] = self.low_price + 0.1
elif direction == 'up':
# First agent deviates to highest price
deviate_actions_dict[self.agents[0]] = self.high_price - 0.1
for agent in range(1, self.num_agents):
# All other agents remain at previous price (large assumption)
deviate_actions_dict[self.agents[agent]] = self.action_history[self.agents[agent]][-1]
observation, _, _, _, _ = self.step(deviate_actions_dict)
return observation
def reset(self, *, seed=None, options=None):
self.current_step = 0
# Reset to random action
random_action = np.random.uniform(self.low_price, self.high_price, size=self.num_agents)
for i in range(random_action.size):
self.action_history[self.agents[i]].append(random_action[i])
if self.k > 0:
obs_agents = np.array([self.action_history[self.agents[i]][-self.k:] for i in range(self.num_agents)], dtype=object).flatten()
observation = dict(zip(self.agents, [obs_agents for i in range(self.num_agents)]))
else:
observation = dict(zip(self.agents, [self.numeric_low for _ in range(self.num_agents)]))
return observation
import gym, ray
from ray.rllib.algorithms import ppo
from ray.tune.registry import register_env
from ray import tune
from ray.tune.logger import pretty_print
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.algorithms.ppo import PPOConfig
ray.init(ignore_reinit_error=True)
def env_creator(env_config):
return BertrandCompetitionContinuousEnv(env_config) # return an env instance
register_env(“my_env”, env_creator)
config = PPOConfig()
config = config.training(gamma=0.9, lr=0.01, kl_coeff=0.3)
config = config.resources(num_gpus=0)
config = config.rollouts(num_rollout_workers=1)
print(config.to_dict())
algo = config.build(env=“my_env”)
algo.train()