Here is a script that reproduces the issue. It creates a simple custom env, a custom logging callback and starts a tune run.
I also added a small print(custom_metrics)
in the summarize_episodes
method in metrics.py
, just before the min/max/mean is applied to the custom_metrics
. Running this then gives output like this:
...
(pid=1893929) Logging state0=5.3611773754530025 at time 300
(pid=1893929) Logging state0=2.3788934510806534 at time 400
(pid=1893944) defaultdict(<class 'list'>, {'time': [400, 400, 200, 200], 'state0': [2.3788934510806534, 2.3788934510806534, 1.582037767340426, 1.582037767340426]})
...
which is not what I would think is the expected behaviour. The effect can also be seen in the min/mean/max values of the custom metrics in the tensorboard interface.
from typing import Dict
import numpy as np
import tensorflow as tf
import gym
import ray
import ray.tune as tune
from ray.rllib.agents.callbacks import DefaultCallbacks
from ray.rllib.env import BaseEnv
from ray.rllib.evaluation import MultiAgentEpisode, RolloutWorker
from ray.rllib.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
class TestEnv(gym.Env):
def __init__(self, config={}):
self.action_space = gym.spaces.Discrete(2)
self.observation_space = gym.spaces.Box(-100, 100, shape=(3,))
def reset(self):
self.time = 0
self.state = np.random.rand(3)
self.reward = np.sum(np.abs(1 - self.state))
return self.state
def step(self, action):
self.time += 1
if action == 0:
self.state = np.abs(self.state)
else:
self.state /= 2
self.state += np.random.rand(3)
np.clip(self.state, -100, 100)
self.reward = np.sum(np.abs(1 - self.state))
return self.state, self.reward, False, {}
class LoggingCallbacks(DefaultCallbacks):
def on_sample_end(self, *, worker: RolloutWorker, samples: SampleBatch,
**kwargs):
pass
def on_train_result(self, *, trainer, result: dict, **kwargs):
pass
def on_postprocess_trajectory(
self, *, worker: RolloutWorker, episode: MultiAgentEpisode,
agent_id: str, policy_id: str, policies: Dict[str, Policy],
postprocessed_batch: SampleBatch,
original_batches: Dict[str, SampleBatch], **kwargs):
pass
def on_episode_start(self, *, worker: RolloutWorker, base_env: BaseEnv,
policies: Dict[str, Policy],
episode: MultiAgentEpisode, env_index: int, **kwargs):
pass
def on_episode_step(self, *, worker: RolloutWorker, base_env: BaseEnv,
episode: MultiAgentEpisode, env_index: int, **kwargs):
pass
def on_episode_end(self, *, worker: RolloutWorker, base_env: BaseEnv,
policies: Dict[str, Policy], episode: MultiAgentEpisode,
env_index: int, **kwargs):
env = base_env.get_unwrapped()[env_index]
print("Logging state0={} at time {}".format(env.state[0], env.time))
episode.custom_metrics["time"] = env.time
episode.custom_metrics["state0"] = env.state[0]
tune.run(
"PPO",
config={
"env": TestEnv,
"env_config": {
},
"callbacks": LoggingCallbacks,
"soft_horizon": True,
"no_done_at_end": True,
"horizon": 100, # Decides length of episodes
"train_batch_size": 200, # Decides how often stuff is logged (will do min/max/avg over each episode datapoint)
"rollout_fragment_length": 200,
"num_workers": 1,
},
stop={
"timesteps_total": 1000,
},
verbose=1,
)