Hi, my code works great when running PPO with Stochastic sampling.
However, when I try to change the algorithm to DDPG instead of PPO or EpsilonGreedy instead of Stochastic Sampling I run into errors. For DDPG I first get in line 138, in step self.reward = (self.cons ** (1 - self.gamma) / (1 - self.gamma)).item() AttributeError: 'float' object has no attribute 'item'
where self.cons=action was generated from the policy action chosen by the agent. Sometimes this seems to be a ndarray, sometimes a float. The .item() is necessary for PPO to run.
If I remove the .item() I instead get in DDPG returning an error
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 718, in _process_trial
results = self.trial_executor.fetch_result(trial)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 688, in fetch_result
result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
return func(*args, **kwargs)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/worker.py", line 1495, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::DDPG.train_buffered() (pid=99176, ip=10.16.78.15)
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 451, in ray._raylet.execute_task.function_executor
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 563, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 173, in train_buffered
result = self.train()
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 608, in train
raise e
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 594, in train
result = Trainable.train(self)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 232, in train
result = self.step()
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 173, in step
res = next(self.train_exec_impl)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 756, in __next__
return next(self.built_iterator)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 1075, in build_union
item = next(it)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 756, in __next__
return next(self.built_iterator)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
[Previous line repeated 2 more times]
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_ops.py", line 91, in gen_replay
item = local_buffer.replay()
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_buffer.py", line 430, in replay
samples[policy_id] = replay_buffer.sample(
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_buffer.py", line 216, in sample
batch = self._encode_sample(idxes)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_buffer.py", line 100, in _encode_sample
out = SampleBatch.concat_samples([self._storage[i] for i in idxes])
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/policy/sample_batch.py", line 150, in concat_samples
out[k] = concat_aligned(
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/utils/memory.py", line 70, in concat_aligned
np.concatenate(items, out=output, axis=1 if time_major else 0)
File "<__array_function__ internals>", line 5, in concatenate
ValueError: all the input arrays must have same number of dimensions, but the array at index 0 has 1 dimension(s) and the array at index 159 has 2 dimension(s)
which I again suspect has something to do with the action changing sizes as a ndarray or similar .
On EpsilonGreedy for PPO I instead get error
results = self.trial_executor.fetch_result(trial)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 688, in fetch_result
result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
return func(*args, **kwargs)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/worker.py", line 1495, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::PPO.train_buffered() (pid=3104, ip=10.16.78.15)
File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 451, in ray._raylet.execute_task.function_executor
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 563, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 173, in train_buffered
result = self.train()
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 608, in train
raise e
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 594, in train
result = Trainable.train(self)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 232, in train
result = self.step()
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 173, in step
res = next(self.train_exec_impl)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 756, in __next__
return next(self.built_iterator)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
for item in it:
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 791, in apply_foreach
result = fn(item)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/train_ops.py", line 206, in __call__
self.optimizers[policy_id].load_data(
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/multi_gpu_impl.py", line 218, in load_data
sess.run([t.init_op for t in self._towers], feed_dict=feed_dict)
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/tensorflow/python/client/session.py", line 967, in run
result = self._run(None, fetches, feed_dict, options_ptr,
File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/tensorflow/python/client/session.py", line 1164, in _run
raise ValueError(
ValueError: Cannot feed value of shape (3968,) for Tensor 'policy0/action:0', which has shape '(?, 1)'
My code is attached below:
import numpy as np
import gym
from gym import spaces
import numpy as np
from gym.utils import seeding, EzPickle
from gym.envs.registration import EnvSpec
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import ray
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
from ray.rllib.agents import ppo
from ray.rllib.agents import ddpg
import ray.tune as tune
INITIAL_ASSET_HOLDINGS = 1
BORROW_LIM = 0
R_VALUE = 1.03
DELTA = 0.01
W_VALUE = 0.98
GAMMA = 2.0
AGENT_NUM = 1
N = 5
BETA = 0.95
ALPHA = 0.33
Z = 1.0
np.random.seed(2020)
# alternative for raylib
class AiyagariEnvironment(gym.Env):
"""An environment for value function sampling from a basic RA GE model with capital"""
# resets state to initial value
# def u(cons):
# util = cons**(1-GAMMA)/(1-GAMMA)
# return util
# idea pass assets to multiagent, and then return interest rate back to environment.
metadata = {"render.modes": ["human"]}
def __init__(self):
super(AiyagariEnvironment, self).__init__()
self.reward_range = (0, 100000)
self.seed()
# consumption space bounds [borrow_lim, inf)
self.action_space = spaces.Box(
low=np.array([0]), high=np.array([100000]), dtype=np.float32
)
# observation space -- all variables agent will observe before making new decision. Since we assume r will be fixed here, this will include here assets, prices, income. Due to assets acting as summary statistic in this model we will only provide current period assets, prices, income. We can extend this to multi-period if we wanted.
self.observation_space = spaces.Box(
low=np.array([BORROW_LIM, -10, 0]),
high=np.array([100000000, 100000000, 100000000]),
dtype=np.float32,
)
self.assets = INITIAL_ASSET_HOLDINGS
self.beta = BETA
self.epsilon = 10e-3
self.borrow_lim = BORROW_LIM + self.epsilon
self.gamma = GAMMA
self.price = R_VALUE - DELTA
self.W = W_VALUE
self.current_step = 0
self.cons = 0
self.sav = 0
self.shock: float = self.np_random.exponential(1)
self.income = self.W * self.shock
self.pre_cons_net_worth = self.assets * self.price + self.income
self.reward = 0
self.obs = np.array([self.assets, self.price, self.income])
self.current_step = 0
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self.assets = INITIAL_ASSET_HOLDINGS
self.price = R_VALUE - DELTA
self.W = W_VALUE
self.current_step = 0
self.sav = 0
self.cons = 0
self.reward = 0
self.shock = self.np_random.exponential(1)
self.income = self.W * self.shock
self.pre_cons_net_worth = self.assets * self.price + self.income
self.obs = np.array([self.assets, self.price, self.income])
self.current_step = 0
# shifted exponential for time being, can impose own distribution with custom sampling later on.
# for time being will use default distribution for sampling.
return self.obs
# updating function
@property
def n(self):
return AGENT_NUM
def step(self, cons, R, W):
self.current_step += 1
self.cons = cons
self.price = R - DELTA
self.delta = DELTA
self.W = W
self.shock = self.np_random.exponential(1)
self.income = self.W * self.shock
self.pre_cons_net_worth = (self.price) * self.assets + self.income
# if consumption greater or equal than 0:
if self.cons in self.action_space:
# savings are capital income + labor income - cons
self.sav = self.pre_cons_net_worth - self.cons
# assets next period are post-depreciation assets today + capital
self.assets_next = self.sav + (1 - self.delta) * self.assets
# if assets next period below borrowing limit set equal to borrowing limit
if self.assets_next <= self.borrow_lim:
# adjust savings and consumption appropriately so that PHTM.
self.assets_next = self.borrow_lim
self.sav = self.borrow_lim - (1 - self.delta) * self.assets
self.cons = self.pre_cons_net_worth - self.sav
else:
self.assets_next = self.sav + (1 - self.delta) * self.assets
else:
raise ValueError(
"Received invalid action={} which is not part of the action space".format(
self.cons
)
)
# takes ndarray self.assets and newly created ndarray of self.prices + self.income and returns single ndarray
# want observations to be past observed events and not also future choice of assets, as otherwise observations endogeneous, a bit harder. Won't make a difference except whether time 0 is observed or whether time t+1 is observed this period.
self.obs = np.hstack((self.assets, np.array(self.price), np.array(self.income)))
self.assets = self.assets_next
done = self.cons < 0
self.done = done
if self.cons > 0:
self.reward = (self.cons ** (1 - self.gamma) / (1 - self.gamma)).item()
else:
self.reward = -1
return self.obs, self.reward, self.done, {}
def render(self, mode="human", close=False):
# work on render to make graph.
results = str(
f"Step: {self.current_step}\n"
f"Assets: {self.assets}\n"
f"Income: {self.income}\n"
f"Consumption: {self.cons}\n"
f"Savings: {self.sav}\n"
f"Net worth: {self.pre_cons_net_worth}\n"
f"Interest Rate: {self.price}\n"
f"Wage Rate: {self.W}\n"
f"Utility: {self.reward}\n"
)
return results
class AiyagariMultiAgentEnv(MultiAgentEnv):
def __init__(self, num):
self.timestep_limit = 100 # overrides defaults built into multiagent env for config to choose time limit
self.agents = {str(i): AiyagariEnvironment() for i in range(0, num)}
self.dones = set()
self.rew = {str(i): 0 for i in range(0, num)}
self.timesteps = 0
# needed here to be 7 as added aggregates states to track
self.observation_space = gym.spaces.Box(
low=np.array([BORROW_LIM, -10, 0, 0, 0, 0, 0]),
high=np.array(
[
10000000,
100000000,
100000000,
100000000,
100000000,
100000000,
100000000,
]
),
dtype=np.float32,
)
self.action_space = gym.spaces.Box(
low=np.array([0]), high=np.array([100000]), dtype=np.float32
)
self.resetted = False
self.num = num
def reset(self):
# resets accumulated rewards to nothing
self.rew = {str(i): 0 for i in range(0, self.num)}
self.timesteps = 0
self.resetted = True
self.dones = set()
dict_agents = {str(i): np.zeros(7) for i, a in enumerate(self.agents)}
# initial holdings
self.K = sum(self.agents[str(i)].assets for i in range(self.num))
self.N = self.num
self.R = Z * (1 - ALPHA) * (self.N / self.K) ** (ALPHA)
self.W = Z * (ALPHA) * (self.K / self.N) ** (1 - ALPHA)
agg_obs_list = [self.K, self.N, self.R, self.W]
for i in range(self.num):
dict_agents[str(i)][0:3] = self.agents[str(i)].reset()
dict_agents[str(i)][3:7] = np.array(agg_obs_list)
return dict_agents
def step(self, action_dict):
self.timesteps += 1
# dict_agents = {str(i): np.zeros(7) for i, a in enumerate(self.agents)}
keylist = list(str(i) for i in range(0, self.num))
obs, rew_temp, done, info = (
dict.fromkeys(keylist),
dict.fromkeys(keylist),
dict.fromkeys(keylist),
dict.fromkeys(keylist),
)
obs_temp_list = {}
obs = dict.fromkeys(keylist)
obs_temp_list = dict.fromkeys(keylist)
obs_temp = np.zeros(7)
for i, action in action_dict.items():
# get observations which is tomorrow's capital earnings. Use to construct tomorrow prices. then feedback in.
obs_temp[0:3], rew_temp[str(i)], done[str(i)], info[str(i)] = self.agents[
str(i)
].step(action, self.R, self.W)
# if max(done)==1:
# print('Agent hit 0. Restarting!')
# done[str(i)]=True
obs_temp_list[i] = obs_temp[0:7]
# append aggregate observations to each i
# checks if None, if case sets to 0, else sets to 0.
# sets all entries to true if single entry is true.
for v in done.values():
if v is None:
done = dict.fromkeys(done, False)
done = dict.fromkeys(done, bool(max(done.values())))
for i, action in action_dict.items():
if done[str(i)]:
self.dones.add(str(i))
# adds discounted rewards to get episode discounted sum
self.rew = {i: self.rew.get(i) + rew_temp.get(i) for i in set(self.rew)}
# construct and append aggregate states
try:
self.K = sum(obs_val[0] for obs_val in obs_temp_list.values())
except:
print("Compare 1")
print(
{
i: self.agents[str(i)].step(action, self.R, self.W)[0]
for i, action in action_dict.items()
}
)
print("Compare 2")
print(obs_temp_list)
print("Action Dict")
print(action_dict)
print(self.agents[str(0)])
self.N = self.num
self.R = Z * (1 - ALPHA) * (self.N / self.K) ** (ALPHA)
self.W = Z * (ALPHA) * (self.K / self.N) ** (1 - ALPHA)
for i in range(0, self.num):
obs_temp_list[str(i)][3:7] = [self.K, self.N, self.R, self.W]
obs[str(i)] = obs_temp_list[str(i)]
# kills all agents after timestep_limit exceeded. Consider replacing with mixing instead. Add separate policies
done["__all__"] = len(self.dones) == len(self.agents)
if self.timesteps >= self.timestep_limit:
done = dict.fromkeys(done, True)
done["__all__"] == True
return obs, self.rew, done, info
def render(self, mode="human", close=True):
# TODO: work on nice render
results_n = []
for i in range(self.num):
# results += env.render(mode, close)
results = self.agents[str(i)].render(mode, close)
results_n.append(results)
return results_n
if __name__ == "__main__":
env = AiyagariMultiAgentEnv(5)
obs = env.reset()
for items in env.render():
print(f"Agent: {env.render().index(items)+1} \n")
print(items)
print(env.action_space)
tune.register_env("my_env", lambda config: AiyagariMultiAgentEnv(5))
ray.init()
policy_lookup = {str(i): "policy" + str(i) for i in env.agents.keys()}
import pprint as pp
pp.pprint(policy_lookup)
policies_individual = {
"policy" + str(i): (None, env.observation_space, env.action_space, {})
for i in env.agents.keys()
}
def policy_mapping_fn(agent_id: str):
assert agent_id in [
i for i in env.agents.keys()
], "ERROR: invalid agent ID {agent_id}!"
return policy_lookup[agent_id]
# runs training per individual
tune_config_individual = {
"env": "my_env",
"num_gpus": 0,
"num_workers": 2,
"multiagent": {
"policies": policies_individual,
"policy_mapping_fn": policy_mapping_fn,
},
"horizon": 100,
#"shuffle_sequences": False,
'exploration_config': {'type': 'EpsilonGreedy'}
}
analysis = tune.run(
"PPO",
stop={"training_iteration": 10000},
mode="max",
config=tune_config_individual,
metric="episode_reward_mean",
checkpoint_freq=10,
checkpoint_at_end=True,
)