Possible to access default logger from environment?

Just started looking at ray and it seems cool, though it is hard to wrap your head around how everything is supposed to interconnect when new to the library.

I would like to log state and actions for the trials to visualise what my agent is doing for debugging purposes and thought I would try to use the internal TBXLogger for that. Though I’m not sure how I would go about this since I can’t figure out how to access the loggers created by tune inside my environment (or maybe this is not at all how you are supposed to use it).

My setup is pretty much just trying to run a simple custom environment with PPO.

class TestEnv(gym.Env):
...
    def step(self, action):
        # Maybe log actions/state from here if I can access the loggers?
...
tune.run("PPO", config={"env": TestEnv})

Hey @albheim I’m recategorizing your question under Ray Tune, since it seems like your question is about the tune logger specifically.

Hmm, this is a RLlib specific question. Consider using one of the RLlib callbacks (@sven1977 will know more)!

That seems like what I’m after, will give it a try as soon as I have time and come back if I don’t get it to work. Only found the Logging callbacks previously and they seem to not contain the information I want and not run often enough.

So it kinda allow me to do what I want. Subclassing the DefaultCallbacks as shown in this example allows me to access the state and any other parameters from my environment for each step and save that in user_data and in on_episode_end I can calculate mean of the episode and save that as a custom_metric. This will then get reported in tensorflow for each episode.

Right now I just use the on_episode_end and sample the metrics there. I have a continuing environment with a soft horizon set, so it will sample every soft horizon then and I can control the sampling interval with the soft horizon.

Hey @albheim , thanks for asking this question.
Yes, the example (ray/custom_metrics_and_callbacks.py at master · ray-project/ray · GitHub) shows some good ways to store environment data during the episodes and then have that data logged via TB.
Let us know, if you need more pointers with this.

I looked through the code in metrics.py, specifically the summarize_episodes function which seems to take the average over all previous episodes recorded, this is not what I had expected. This would make me assume that the max measurement would be always increasing (cumulative max over time should not be able to decrease), but looking at what is actually plotted in TB that is not the case.

What is the intended behaviour here?

I would like it to report the min/mean/max over the episodes that occured since last time something was reported. Is this possible in any way or is the easiest way for me to just edit summarize_episodes myself?

Testing a small example where I have a time that increments by one for each step in my environment, and I log that in custom_metrics in on_episode_end with horizon=100 and train_batch_size=200, the custom metric vector in summarize_episodes that aggregate all the metrics from different episodes into lists will contain the values [200, 200] after the first 200 steps (I would expect [100, 200]). And the TB plot of min/mean/max shows the min/mean/max of the lists [200, 200], [200, 200, 400, 400], … it seems like.

Found it said smoothed history of episodes and dug a little deeper to find the metric_ops file where I saw how it handled the smoothing, so all of that is fine now.

Still a bit confused about the second part though. It seems to add an episode for each horizon steps but somehow it adds the value of the second episode in each rollout in both episodes. I tried printing the values next to it and it prints 100, 200, 300, 400… as the time goes on but it saves the values 200, 200, 400, 400…

@albheim, hmm, strange. Maybe it’s doing max over 2 steps (100 200 → 200; 300 400 → 400). Could you post a self-sufficient reproduction script that makes this more clear and shows the issue with the summarization?

Here is a script that reproduces the issue. It creates a simple custom env, a custom logging callback and starts a tune run.

I also added a small print(custom_metrics) in the summarize_episodes method in metrics.py, just before the min/max/mean is applied to the custom_metrics. Running this then gives output like this:

...
(pid=1893929) Logging state0=5.3611773754530025 at time 300
(pid=1893929) Logging state0=2.3788934510806534 at time 400
(pid=1893944) defaultdict(<class 'list'>, {'time': [400, 400, 200, 200], 'state0': [2.3788934510806534, 2.3788934510806534, 1.582037767340426, 1.582037767340426]})
...

which is not what I would think is the expected behaviour. The effect can also be seen in the min/mean/max values of the custom metrics in the tensorboard interface.

from typing import Dict

import numpy as np
import tensorflow as tf
import gym

import ray
import ray.tune as tune
from ray.rllib.agents.callbacks import DefaultCallbacks
from ray.rllib.env import BaseEnv
from ray.rllib.evaluation import MultiAgentEpisode, RolloutWorker
from ray.rllib.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch

class TestEnv(gym.Env):
    def __init__(self, config={}):
        self.action_space = gym.spaces.Discrete(2)
        self.observation_space = gym.spaces.Box(-100, 100, shape=(3,))
        
    def reset(self):
        self.time = 0
        self.state = np.random.rand(3)
        self.reward = np.sum(np.abs(1 - self.state))
        return self.state

    def step(self, action):
        self.time += 1
        if action == 0:
            self.state = np.abs(self.state)
        else:
            self.state /= 2
        self.state += np.random.rand(3)
        np.clip(self.state, -100, 100)
        self.reward = np.sum(np.abs(1 - self.state))
        return self.state, self.reward, False, {}


class LoggingCallbacks(DefaultCallbacks):
    def on_sample_end(self, *, worker: RolloutWorker, samples: SampleBatch,
                      **kwargs):
        pass
    def on_train_result(self, *, trainer, result: dict, **kwargs):
        pass
    def on_postprocess_trajectory(
            self, *, worker: RolloutWorker, episode: MultiAgentEpisode,
            agent_id: str, policy_id: str, policies: Dict[str, Policy],
            postprocessed_batch: SampleBatch,
            original_batches: Dict[str, SampleBatch], **kwargs):
        pass
    def on_episode_start(self, *, worker: RolloutWorker, base_env: BaseEnv,
                         policies: Dict[str, Policy],
                         episode: MultiAgentEpisode, env_index: int, **kwargs):
        pass
    def on_episode_step(self, *, worker: RolloutWorker, base_env: BaseEnv,
                        episode: MultiAgentEpisode, env_index: int, **kwargs):
        pass
    def on_episode_end(self, *, worker: RolloutWorker, base_env: BaseEnv,
                       policies: Dict[str, Policy], episode: MultiAgentEpisode,
                       env_index: int, **kwargs):
        env = base_env.get_unwrapped()[env_index]
        print("Logging state0={} at time {}".format(env.state[0], env.time))

        episode.custom_metrics["time"] = env.time
        episode.custom_metrics["state0"] = env.state[0]


tune.run(
    "PPO", 
    config={
        "env": TestEnv,
        "env_config": {
        },
        "callbacks": LoggingCallbacks,
        "soft_horizon": True,
        "no_done_at_end": True,
        "horizon": 100, # Decides length of episodes
        "train_batch_size": 200, # Decides how often stuff is logged (will do min/max/avg over each episode datapoint)
        "rollout_fragment_length": 200,
        "num_workers": 1,
    }, 
    stop={
        "timesteps_total": 1000,
    }, 
    verbose=1,
    )
1 Like

Great! Let me try to debug …
Seeing this as well. Seems like later sampled episodes get prepended to the metrics, rather than appended.

Could you try hot-fixing this via changing the if block in rllib/execution/metrics_ops.py::~L82
as follows:

        if missing > 0:
            episodes = self.episode_history[-missing:] + episodes  # <--- change this line, which was causing the "flip" (newest episodes come first in the list, which is unexpected)
            assert len(episodes) <= self.min_history

I’ll also provide a PR now:

I tried it and it does indeed change the order that items are added, though this was never what I had a problem with (this does not seem to have any affect on the data logged to tensorboard).

It does not seem fix the duplication problem which is what I was concerned about, trying your fix I still get [200, 200, 400, 400] instead of [100, 200, 300, 400] which I would expect.