Error with EpsilonGreedy option for PPO & DDPG but works in PPO

Hi, my code works great when running PPO with Stochastic sampling.

However, when I try to change the algorithm to DDPG instead of PPO or EpsilonGreedy instead of Stochastic Sampling I run into errors. For DDPG I first get in line 138, in step self.reward = (self.cons ** (1 - self.gamma) / (1 - self.gamma)).item() AttributeError: 'float' object has no attribute 'item'
where self.cons=action was generated from the policy action chosen by the agent. Sometimes this seems to be a ndarray, sometimes a float. The .item() is necessary for PPO to run.
If I remove the .item() I instead get in DDPG returning an error

  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trial_runner.py", line 718, in _process_trial
    results = self.trial_executor.fetch_result(trial)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 688, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/worker.py", line 1495, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::DDPG.train_buffered() (pid=99176, ip=10.16.78.15)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 451, in ray._raylet.execute_task.function_executor
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 563, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 173, in train_buffered
    result = self.train()
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 608, in train
    raise e
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 594, in train
    result = Trainable.train(self)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 232, in train
    result = self.step()
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 173, in step
    res = next(self.train_exec_impl)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 756, in __next__
    return next(self.built_iterator)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 1075, in build_union
    item = next(it)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 756, in __next__
    return next(self.built_iterator)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  [Previous line repeated 2 more times]
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_ops.py", line 91, in gen_replay
    item = local_buffer.replay()
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_buffer.py", line 430, in replay
    samples[policy_id] = replay_buffer.sample(
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_buffer.py", line 216, in sample
    batch = self._encode_sample(idxes)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/replay_buffer.py", line 100, in _encode_sample
    out = SampleBatch.concat_samples([self._storage[i] for i in idxes])
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/policy/sample_batch.py", line 150, in concat_samples
    out[k] = concat_aligned(
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/utils/memory.py", line 70, in concat_aligned
    np.concatenate(items, out=output, axis=1 if time_major else 0)
  File "<__array_function__ internals>", line 5, in concatenate
ValueError: all the input arrays must have same number of dimensions, but the array at index 0 has 1 dimension(s) and the array at index 159 has 2 dimension(s)

which I again suspect has something to do with the action changing sizes as a ndarray or similar .

On EpsilonGreedy for PPO I instead get error

    results = self.trial_executor.fetch_result(trial)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/ray_trial_executor.py", line 688, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/worker.py", line 1495, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::PPO.train_buffered() (pid=3104, ip=10.16.78.15)
  File "python/ray/_raylet.pyx", line 501, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 451, in ray._raylet.execute_task.function_executor
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 563, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 173, in train_buffered
    result = self.train()
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 608, in train
    raise e
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer.py", line 594, in train
    result = Trainable.train(self)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/tune/trainable.py", line 232, in train
    result = self.step()
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/agents/trainer_template.py", line 173, in step
    res = next(self.train_exec_impl)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 756, in __next__
    return next(self.built_iterator)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 843, in apply_filter
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 783, in apply_foreach
    for item in it:
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/util/iter.py", line 791, in apply_foreach
    result = fn(item)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/train_ops.py", line 206, in __call__
    self.optimizers[policy_id].load_data(
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/ray/rllib/execution/multi_gpu_impl.py", line 218, in load_data
    sess.run([t.init_op for t in self._towers], feed_dict=feed_dict)
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/tensorflow/python/client/session.py", line 967, in run
    result = self._run(None, fetches, feed_dict, options_ptr,
  File "/Users/brandonkaplowitz/opt/anaconda3/envs/py39/lib/python3.9/site-packages/tensorflow/python/client/session.py", line 1164, in _run
    raise ValueError(
ValueError: Cannot feed value of shape (3968,) for Tensor 'policy0/action:0', which has shape '(?, 1)'

My code is attached below:

import numpy as np
import gym
from gym import spaces
import numpy as np
from gym.utils import seeding, EzPickle
from gym.envs.registration import EnvSpec
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import ray
from ray.tune.logger import pretty_print
from ray.tune.registry import register_env
from ray.rllib.agents import ppo
from ray.rllib.agents import ddpg
import ray.tune as tune

INITIAL_ASSET_HOLDINGS = 1
BORROW_LIM = 0
R_VALUE = 1.03
DELTA = 0.01
W_VALUE = 0.98
GAMMA = 2.0
AGENT_NUM = 1
N = 5
BETA = 0.95
ALPHA = 0.33
Z = 1.0
np.random.seed(2020)

# alternative for raylib


class AiyagariEnvironment(gym.Env):
    """An environment for value function sampling from a basic RA GE model with capital"""

    # resets state to initial value
    #  def u(cons):
    #      util = cons**(1-GAMMA)/(1-GAMMA)
    #      return util
    # idea pass assets to multiagent, and then return interest rate back to environment.
    metadata = {"render.modes": ["human"]}

    def __init__(self):
        super(AiyagariEnvironment, self).__init__()
        self.reward_range = (0, 100000)
        self.seed()
        # consumption space bounds [borrow_lim, inf)
        self.action_space = spaces.Box(
            low=np.array([0]), high=np.array([100000]), dtype=np.float32
        )
        # observation space -- all variables agent will observe before making new decision. Since we assume r will be fixed here, this will include here assets, prices, income. Due to assets acting as summary statistic in this model we will only provide current period assets, prices, income. We can extend this to multi-period if we wanted.
        self.observation_space = spaces.Box(
            low=np.array([BORROW_LIM, -10, 0]),
            high=np.array([100000000, 100000000, 100000000]),
            dtype=np.float32,
        )
        self.assets = INITIAL_ASSET_HOLDINGS
        self.beta = BETA
        self.epsilon = 10e-3
        self.borrow_lim = BORROW_LIM + self.epsilon
        self.gamma = GAMMA
        self.price = R_VALUE - DELTA
        self.W = W_VALUE
        self.current_step = 0
        self.cons = 0
        self.sav = 0
        self.shock: float = self.np_random.exponential(1)
        self.income = self.W * self.shock
        self.pre_cons_net_worth = self.assets * self.price + self.income
        self.reward = 0
        self.obs = np.array([self.assets, self.price, self.income])
        self.current_step = 0

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def reset(self):
        self.assets = INITIAL_ASSET_HOLDINGS
        self.price = R_VALUE - DELTA
        self.W = W_VALUE
        self.current_step = 0
        self.sav = 0
        self.cons = 0
        self.reward = 0
        self.shock = self.np_random.exponential(1)
        self.income = self.W * self.shock
        self.pre_cons_net_worth = self.assets * self.price + self.income
        self.obs = np.array([self.assets, self.price, self.income])
        self.current_step = 0
        # shifted exponential for time being, can impose own distribution with custom sampling later on.
        # for time being will use default distribution for sampling.
        return self.obs

    # updating function
    @property
    def n(self):
        return AGENT_NUM

    def step(self, cons, R, W):
        self.current_step += 1
        self.cons = cons
        self.price = R - DELTA
        self.delta = DELTA
        self.W = W
        self.shock = self.np_random.exponential(1)
        self.income = self.W * self.shock
        self.pre_cons_net_worth = (self.price) * self.assets + self.income
        # if consumption greater or equal than 0:
        if self.cons in self.action_space:
            # savings are capital income + labor income - cons
            self.sav = self.pre_cons_net_worth - self.cons
            # assets next period are post-depreciation assets today + capital
            self.assets_next = self.sav + (1 - self.delta) * self.assets
            # if assets next period below borrowing limit set equal to borrowing limit
            if self.assets_next <= self.borrow_lim:
                # adjust savings and consumption appropriately so that PHTM.
                self.assets_next = self.borrow_lim
                self.sav = self.borrow_lim - (1 - self.delta) * self.assets
                self.cons = self.pre_cons_net_worth - self.sav
            else:
                self.assets_next = self.sav + (1 - self.delta) * self.assets
        else:
            raise ValueError(
                "Received invalid action={} which is not part of the action space".format(
                    self.cons
                )
            )
            # takes ndarray self.assets and newly created ndarray of self.prices + self.income and returns single ndarray
            # want observations to be past observed events and not also future choice of assets, as otherwise observations endogeneous, a bit harder. Won't make a difference except whether time 0 is observed or whether time t+1 is observed this period.
        self.obs = np.hstack((self.assets, np.array(self.price), np.array(self.income)))
        self.assets = self.assets_next

        done = self.cons < 0
        self.done = done
        if self.cons > 0:
            self.reward = (self.cons ** (1 - self.gamma) / (1 - self.gamma)).item()
        else:
            self.reward = -1
        return self.obs, self.reward, self.done, {}

    def render(self, mode="human", close=False):
        # work on render to make graph.
        results = str(
            f"Step: {self.current_step}\n"
            f"Assets: {self.assets}\n"
            f"Income: {self.income}\n"
            f"Consumption: {self.cons}\n"
            f"Savings: {self.sav}\n"
            f"Net worth: {self.pre_cons_net_worth}\n"
            f"Interest Rate: {self.price}\n"
            f"Wage Rate: {self.W}\n"
            f"Utility: {self.reward}\n"
        )
        return results


class AiyagariMultiAgentEnv(MultiAgentEnv):
    def __init__(self, num):
        self.timestep_limit = 100  # overrides defaults built into multiagent env for config to choose time limit
        self.agents = {str(i): AiyagariEnvironment() for i in range(0, num)}
        self.dones = set()
        self.rew = {str(i): 0 for i in range(0, num)}
        self.timesteps = 0
        # needed here to be 7 as added aggregates states to track
        self.observation_space = gym.spaces.Box(
            low=np.array([BORROW_LIM, -10, 0, 0, 0, 0, 0]),
            high=np.array(
                [
                    10000000,
                    100000000,
                    100000000,
                    100000000,
                    100000000,
                    100000000,
                    100000000,
                ]
            ),
            dtype=np.float32,
        )
        self.action_space = gym.spaces.Box(
            low=np.array([0]), high=np.array([100000]), dtype=np.float32
        )
        self.resetted = False
        self.num = num

    def reset(self):
        # resets accumulated rewards to nothing
        self.rew = {str(i): 0 for i in range(0, self.num)}
        self.timesteps = 0
        self.resetted = True
        self.dones = set()
        dict_agents = {str(i): np.zeros(7) for i, a in enumerate(self.agents)}
        # initial holdings

        self.K = sum(self.agents[str(i)].assets for i in range(self.num))
        self.N = self.num
        self.R = Z * (1 - ALPHA) * (self.N / self.K) ** (ALPHA)
        self.W = Z * (ALPHA) * (self.K / self.N) ** (1 - ALPHA)
        agg_obs_list = [self.K, self.N, self.R, self.W]
        for i in range(self.num):
            dict_agents[str(i)][0:3] = self.agents[str(i)].reset()
            dict_agents[str(i)][3:7] = np.array(agg_obs_list)
        return dict_agents

    def step(self, action_dict):
        self.timesteps += 1
        # dict_agents = {str(i): np.zeros(7) for i, a in enumerate(self.agents)}
        keylist = list(str(i) for i in range(0, self.num))
        obs, rew_temp, done, info = (
            dict.fromkeys(keylist),
            dict.fromkeys(keylist),
            dict.fromkeys(keylist),
            dict.fromkeys(keylist),
        )
        obs_temp_list = {}
        obs = dict.fromkeys(keylist)
        obs_temp_list = dict.fromkeys(keylist)
        obs_temp = np.zeros(7)

        for i, action in action_dict.items():
            # get observations which is tomorrow's capital earnings. Use to construct tomorrow prices. then feedback in.
            obs_temp[0:3], rew_temp[str(i)], done[str(i)], info[str(i)] = self.agents[
                str(i)
            ].step(action, self.R, self.W)
            # if max(done)==1:
            #    print('Agent hit 0. Restarting!')
            #    done[str(i)]=True
            obs_temp_list[i] = obs_temp[0:7]
            # append aggregate observations to each i
        # checks if None, if case sets to 0, else sets to 0.
        # sets all entries to true if single entry is true.
        for v in done.values():
            if v is None:
                done = dict.fromkeys(done, False)

        done = dict.fromkeys(done, bool(max(done.values())))
        for i, action in action_dict.items():
            if done[str(i)]:
                self.dones.add(str(i))

        # adds discounted rewards to get episode discounted sum
        self.rew = {i: self.rew.get(i) + rew_temp.get(i) for i in set(self.rew)}
        # construct and append aggregate states

        try:
            self.K = sum(obs_val[0] for obs_val in obs_temp_list.values())
        except:
            print("Compare 1")
            print(
                {
                    i: self.agents[str(i)].step(action, self.R, self.W)[0]
                    for i, action in action_dict.items()
                }
            )
            print("Compare 2")
            print(obs_temp_list)
            print("Action Dict")
            print(action_dict)
            print(self.agents[str(0)])
        self.N = self.num
        self.R = Z * (1 - ALPHA) * (self.N / self.K) ** (ALPHA)
        self.W = Z * (ALPHA) * (self.K / self.N) ** (1 - ALPHA)
        for i in range(0, self.num):
            obs_temp_list[str(i)][3:7] = [self.K, self.N, self.R, self.W]
            obs[str(i)] = obs_temp_list[str(i)]
        # kills all agents after timestep_limit exceeded. Consider replacing with mixing instead. Add separate policies
        done["__all__"] = len(self.dones) == len(self.agents)

        if self.timesteps >= self.timestep_limit:
            done = dict.fromkeys(done, True)
            done["__all__"] == True
        return obs, self.rew, done, info

    def render(self, mode="human", close=True):
        # TODO: work on nice render
        results_n = []
        for i in range(self.num):
            # results += env.render(mode, close)
            results = self.agents[str(i)].render(mode, close)
            results_n.append(results)
        return results_n


if __name__ == "__main__":

    env = AiyagariMultiAgentEnv(5)
    obs = env.reset()
    for items in env.render():
        print(f"Agent: {env.render().index(items)+1} \n")
        print(items)
    print(env.action_space)

    tune.register_env("my_env", lambda config: AiyagariMultiAgentEnv(5))


    ray.init()


    policy_lookup = {str(i): "policy" + str(i) for i in env.agents.keys()}
    import pprint as pp

    pp.pprint(policy_lookup)

    policies_individual = {
        "policy" + str(i): (None, env.observation_space, env.action_space, {})
        for i in env.agents.keys()
    }

    def policy_mapping_fn(agent_id: str):
        assert agent_id in [
            i for i in env.agents.keys()
        ], "ERROR: invalid agent ID {agent_id}!"
        return policy_lookup[agent_id]

    # runs training per individual
    tune_config_individual = {
        "env": "my_env",
        "num_gpus": 0,
        "num_workers": 2,
        "multiagent": {
            "policies": policies_individual,
            "policy_mapping_fn": policy_mapping_fn,
        },
        "horizon": 100,
        #"shuffle_sequences": False,
        'exploration_config': {'type': 'EpsilonGreedy'}
    }
    analysis = tune.run(
        "PPO",
        stop={"training_iteration": 10000},
        mode="max",
        config=tune_config_individual,
        metric="episode_reward_mean",
        checkpoint_freq=10,
        checkpoint_at_end=True,
    )