Hey, I had the same issue, but depending on what you’re logging it might be different.
Also, this setup is quite tailored to my specific reporting needs, so probably you can throw out a lot, but I’ll just leave it as is for reference.
In my env step I have this:
def step(actions):
........
info = {
"media": media,
"data": {
"running": { },
"final": { },
},
}
return obs, reward, done, info
The media
can be pretty much anything, I used a dictionary of arrays, and in data
I have running
stats, that are kept for each step, and final
is only a single value throughout the episode (e.g. if the env counts the total number of something from the reset). But this can be simplified and shaped in whatever way you want it.
Then I have the following custom callbacks:
class CustomCallbacks(DefaultCallbacks):
def on_episode_start(
self, *, worker, base_env, policies, episode, env_index, **kwargs
):
episode.media["episode_data"] = defaultdict(list)
episode.user_data = {"final": {}, "running": defaultdict(list)}
def on_episode_step(
self, *, worker, base_env, episode, env_index, **kwargs
):
# Running metrics -> keep all values
# Final metrics -> only keep the current value
for data_type, data_subset in episode.user_data.items():
data = episode.last_info_for().get("data", {}).get(data_type)
for name, value in data.items():
if data_type == "running":
data_subset[name].append(value)
else:
data_subset[name] = value
# Arbitrary episode media
media = episode.last_info_for().get("media", {})
for name, value in media.items():
episode.media["episode_data"][name].append(value)
def on_episode_end(
self, *, worker, base_env, policies, episode, env_index, **kwargs
):
for name, value in episode.media["episode_data"].items():
episode.media["episode_data"][name] = np.array(value).tolist()
for data_type, data_subset in episode.user_data.items():
for name, value in data_subset.items():
if data_type == "running":
episode.custom_metrics[name + "_avg"] = np.mean(value)
episode.custom_metrics[name + "_sum"] = np.sum(value)
episode.hist_data[name] = value
else:
episode.custom_metrics[name] = value
episode.hist_data[name] = [value]
class DataLoggerCallback(LoggerCallback):
def __init__(self):
self._trial_continue = {}
self._trial_local_dir = {}
def log_trial_start(self, trial):
trial.init_logdir()
self._trial_local_dir[trial] = osp.join(trial.logdir, "episode_data")
os.makedirs(self._trial_local_dir[trial], exist_ok=True)
def log_trial_result(self, iteration, trial, result):
if "episode_data" not in result["episode_media"]:
return
step = result.get(TIMESTEPS_TOTAL) or result[TRAINING_ITERATION]
data_file = osp.join(
self._trial_local_dir[trial], f"data-{step:08d}.json"
)
num_episodes = result["episodes_this_iter"]
data = result["episode_media"]["episode_data"]
episode_data = data[-num_episodes:]
if "evaluation" in result:
data = result["evaluation"]["episode_media"]["episode_data"]
episode_data += data[-num_episodes:]
json.dump(episode_data, open(data_file, "w"))
This creates a bunch of JSONs in the trial dir based on the media
, i.e.:
<log-dir>/trial-..../episode_data:
data-00040000.json data-00180000.json data-00320000.json data-00460000.json
data-00600000.json data-00740000.json data-00880000.json data-01020000.json
...
For the scalars it creates the corresponding Tensorboard entries, e.g. ray/tune/custom_metrics/<data-key>_avg_mean
. The naming is a bit weird, avg_mean
means average through the episode, averaged between episodes, and avg_min
is average through the episode and minimum between episodes (same as the episode_reward_{mean,max,min}
semantics). You can change this to whatever fits your needs in CustomCallbacks.on_episode_end
. Also, this adds the non-aggregated data to the histograms, not just the aggregated data to the scalars.
And one more thing, I’m using this with Tune, I’m not sure how the callbacks are handled if you’re running plain RLlib, but I think it should work regardless.
I hope that helps, let me know if something is not clear.