Log or record custom env data via RLlib

Hi folks,

Does RLlib offers pre-built functionalities for logging or recording custom env data?
Or is it intended resp. better to do such things via custom loggers integrated in the env?

For example, I would like to track some entities during an episode (e.g. record the trace of an article through a plant). I’ve thought of doing this using custom callback functions on_episode_start|step|end and store my custom env data in the episode object, maybe in episode.media like suggested here. But then I’m not sure how RLlib might be used to write the data to some log dir? Who knows what to do?

Hey, I had the same issue, but depending on what you’re logging it might be different.
Also, this setup is quite tailored to my specific reporting needs, so probably you can throw out a lot, but I’ll just leave it as is for reference.

In my env step I have this:

def step(actions):
        ........
        info = {
            "media": media,
            "data": {
                "running": { },
                "final": { },
            },
        }
        return obs, reward, done, info

The media can be pretty much anything, I used a dictionary of arrays, and in data I have running stats, that are kept for each step, and final is only a single value throughout the episode (e.g. if the env counts the total number of something from the reset). But this can be simplified and shaped in whatever way you want it.

Then I have the following custom callbacks:

class CustomCallbacks(DefaultCallbacks):
    def on_episode_start(
        self, *, worker, base_env, policies, episode, env_index, **kwargs
    ):
        episode.media["episode_data"] = defaultdict(list)
        episode.user_data = {"final": {}, "running": defaultdict(list)}

    def on_episode_step(
        self, *, worker, base_env, episode, env_index, **kwargs
    ):
        # Running metrics -> keep all values
        # Final metrics -> only keep the current value
        for data_type, data_subset in episode.user_data.items():
            data = episode.last_info_for().get("data", {}).get(data_type)
            for name, value in data.items():
                if data_type == "running":
                    data_subset[name].append(value)
                else:
                    data_subset[name] = value

        # Arbitrary episode media
        media = episode.last_info_for().get("media", {})
        for name, value in media.items():
            episode.media["episode_data"][name].append(value)

    def on_episode_end(
        self, *, worker, base_env, policies, episode, env_index, **kwargs
    ):
        for name, value in episode.media["episode_data"].items():
            episode.media["episode_data"][name] = np.array(value).tolist()

        for data_type, data_subset in episode.user_data.items():
            for name, value in data_subset.items():
                if data_type == "running":
                    episode.custom_metrics[name + "_avg"] = np.mean(value)
                    episode.custom_metrics[name + "_sum"] = np.sum(value)
                    episode.hist_data[name] = value
                else:
                    episode.custom_metrics[name] = value
                    episode.hist_data[name] = [value]


class DataLoggerCallback(LoggerCallback):
    def __init__(self):
        self._trial_continue = {}
        self._trial_local_dir = {}

    def log_trial_start(self, trial):
        trial.init_logdir()
        self._trial_local_dir[trial] = osp.join(trial.logdir, "episode_data")
        os.makedirs(self._trial_local_dir[trial], exist_ok=True)

    def log_trial_result(self, iteration, trial, result):
        if "episode_data" not in result["episode_media"]:
            return

        step = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION]
        data_file = osp.join(
            self._trial_local_dir[trial], f"data-{step:08d}.json"
        )

        num_episodes = result["episodes_this_iter"]
        data = result["episode_media"]["episode_data"]
        episode_data = data[-num_episodes:]

        if "evaluation" in result:
            data = result["evaluation"]["episode_media"]["episode_data"]
            episode_data += data[-num_episodes:]

        json.dump(episode_data, open(data_file, "w"))

This creates a bunch of JSONs in the trial dir based on the media, i.e.:

<log-dir>/trial-..../episode_data:
data-00040000.json  data-00180000.json  data-00320000.json  data-00460000.json  
data-00600000.json  data-00740000.json  data-00880000.json  data-01020000.json  
...

For the scalars it creates the corresponding Tensorboard entries, e.g. ray/tune/custom_metrics/<data-key>_avg_mean. The naming is a bit weird, avg_mean means average through the episode, averaged between episodes, and avg_min is average through the episode and minimum between episodes (same as the episode_reward_{mean,max,min} semantics). You can change this to whatever fits your needs in CustomCallbacks.on_episode_end. Also, this adds the non-aggregated data to the histograms, not just the aggregated data to the scalars.

And one more thing, I’m using this with Tune, I’m not sure how the callbacks are handled if you’re running plain RLlib, but I think it should work regardless.

I hope that helps, let me know if something is not clear.

1 Like

@klausk55 ,

depending on what data you want to store (pickled objects might not work with the following) you can use RLlib’s build-in Offline-API. The only thing you have to do for logging the data to JSON files is to define

"output": "path/to/the/folder/of/output/files" 

in your Trainer config.

RLlib then uses the JsonWriter object to write out observations, actions, rewards, etc. for each timestep.

If you need to store more than the standard output you can use the info object returned by the environment’s step() function. Simply store your additional data in a dictionary and returned it in your step function and the Offline-API will record it in the JSON output file:

def step(self, action):
     ...
     info_dict = {
            "data_1": myObject1,
            "data_2": myObject2,
     }
     return obs, reward, done, info_dict
1 Like

Thanks @Lars_Simon_Zehnder also for your idea! I guess this is a simple and very fast way to store some data from the env. Provided that we are talking about the same, I have this already in use and it generates all these output-…json files in my log_dir. In my case, they contain the data of MultiAgentBatches with the size of rollout_fragment_length.
To me, @vakker00’s suggestion is not as straightforward as yours, but it should allow for more flexiblity and customization (e.g. tracking and logging something throughout an episode).

Maybe also helpful and interesting for others: