Hi @mannyv and @Lars_Simon_Zehnder ,
Thank you for your replies!
Yes, I noticed that episodes_total
is the number of completed episodes. Therefore I logged started_episode
in on_episode_start()
, so I should expect more started_episode_max
than episodes_total
, including failed episodes from failed workers. However, I see the opposite where the number of episodes_total
is significantly greater than started_episode_max
. And I set self.started_episode
as part of the global attribute for the callbacks since I will need to preserve global training stats and data across all rollouts. Therefore, I have set all the global PLR variables and objects in the init of PLRCallbacks
. And I checked that the global objects were the same with identical object ids across the entire experiment. Therefore, I would like to know whether I am doing proper aggregation by directly updating these global variables and objects in the callback or not.
As stated in the paper, I need global objects, i.e. level store
and level sampler
, to maintain training progress, and dynamically adjust the PLR training protocol throughout the experiment. I have provided the following sample PLRCallbacks
as a reference.
from dcd.level_replay.level_sampler import LevelSampler
from dcd.level_replay.level_store import LevelStore
...
class PLRCallbacks(DefaultCallbacks):
def __init__(
self,
*args,
**kwargs,
):
# All these arguments will be used across all episodes during this experiment
plr_args = types.SimpleNamespace(**kwargs)
plr_args = make_plr_args(plr_args)
# Set up PLR
self.level_store = None
self.level_samplers = {}
self.current_level_seeds = None
self.weighted_num_edits = 0
self.latest_env_stats = defaultdict(float)
self.is_dr = False
self.is_training_env = True
# use_byte_encoding
data_info = {
'numpy': True,
'dtype': '<U32',
'shape': (10, 6)
}
self.level_store = LevelStore(data_info=data_info)
self.level_samplers = {}
self.level_samplers['agent'] = LevelSampler(**plr_args)
self.current_level_seeds = [-1 for i in range(plr_args["num_actors"])]
self._default_level_sampler = self.all_level_samplers[0]
self.base_levels = None
self.level_replay_times = 0
self.not_level_replay_times = 0
self.started_episode = 0
super().__init__()
def __call__(self, *args, **kwargs):
return self
def _sample_replay_decision(self):
return self._default_level_sampler.sample_replay_decision()
def _reconcile_level_store_and_samplers(self):
all_replay_seeds = set()
for level_sampler in self.all_level_samplers:
all_replay_seeds.update([x for x in level_sampler.seeds if x >= 0])
self.level_store.reconcile_seeds(all_replay_seeds)
def _update_level_samplers_with_external_unseen_sample(self, seeds, solvable=None):
level_samplers = self.all_level_samplers
for level_sampler in level_samplers:
level_sampler.observe_external_unseen_sample(seeds, solvable)
def _update_plr_with_current_unseen_levels(self, levels, passible=None, reject_unsolvable_seeds=True, parent_seeds=None):
self.current_level_seeds = \
self.level_store.insert(levels, parent_seeds=parent_seeds)
self._update_level_samplers_with_external_unseen_sample(
self.current_level_seeds, solvable=passible)
def on_episode_start(
self,
*,
worker: RolloutWorker,
base_env: BaseEnv,
policies: Dict[str, Policy],
episode: Episode,
env_index: int,
**kwargs
):
# Make sure this episode has just been started (only initial obs
# logged so far).
assert episode.length == 0, (
"ERROR: `on_episode_start()` callback should be called right "
"after env reset!"
)
# R-PLR
# Sample replay-decision
self.level_replay = False
self.level_replay = self._sample_replay_decision()
if self.level_replay:
# Yes replay
# - Sample a replay level from level store
self.current_level_seeds = [self._default_level_sampler.sample_replay_level() for _ in range(worker.num_workers)]
levels = [self.level_store.get_level(seed) for seed in self.current_level_seeds]
# Set env to level
for level in levels:
for env in worker.foreach_env( lambda env: env):
env.reset_to_level(level)
# Set train for blue agent
if not policies["blue"].model.training:
policies["blue"].model.train()
# Update level_replay_times for records
self.level_replay_times += 1
else:
# No replay
# Sample level from level generator by reset to random level
# FIXME - Assume using DR for generating new level
worker.foreach_env( lambda env: env.reset()) # env.reset_random())
# Add new level to level store
levels = worker.foreach_env( lambda env: env.encoding.tobytes())
self._update_plr_with_current_unseen_levels(levels=levels, parent_seeds=None)
# OR
# - Sample random level from generator
# - Set env to random
# Set no train for blue agent
if policies["blue"].model.training:
policies["blue"].model.eval()
self.started_episode += 1
episode.custom_metrics["started_episode"] = self.started_episode
episode.custom_metrics["started_episode"] = self.started_episode
def on_episode_step(
self,
*,
worker: RolloutWorker,
base_env: BaseEnv,
policies: Dict[str, Policy],
episode: Episode,
env_index: int,
**kwargs
):
# Make sure this episode is ongoing.
assert episode.length > 0, (
"ERROR: `on_episode_step()` callback should not be called right "
"after env reset!"
)
if self.level_replay:
if not policies["blue"].model.training:
policies["blue"].model.train()
else:
if policies["blue"].model.training:
policies["blue"].model.eval()
def on_episode_end(
self,
*,
worker: RolloutWorker,
base_env: BaseEnv,
policies: Dict[str, Policy],
episode: Episode,
env_index: int,
**kwargs
):
# Check if there are multiple episodes in a batch, i.e.
# "batch_mode": "truncate_episodes".
if worker.policy_config["batch_mode"] == "truncate_episodes":
# Make sure this episode is really done.
assert episode.batch_builder.policy_collectors["blue"].batches[
-1
]["dones"][-1], (
"ERROR: `on_episode_end()` should only be called "
"after episode is done!"
)
rollout_batch = episode.batch_builder.policy_collectors["blue"].batches[-1]
# R-PLR
# - Compute PLR score S
# This is where we can get the advantages
episode.user_data["mean_score"] = rollout_batch["advantages"].mean()
episode.user_data["max_score"] = rollout_batch["advantages"].max()
# ========= update level sampler and level store with rollouts ========= #
rollout_batch.actor_index = worker.worker_index - 1
rollout_batch.seed_t = self.current_level_seeds[episode.env_id]
rollout_batch.score = episode.user_data["mean_score"]
rollout_batch.max_score = episode.user_data["max_score"]
rollout_batch.num_steps = len(rollout_batch)
rollout_batch.grounded_value = rollout_batch["rewards"].max()
self._default_level_sampler.update_with_rollouts(rollout_batch)
self._default_level_sampler.after_update()
episode.custom_metrics["working_seed_buffer_size"] = self._default_level_sampler.working_seed_buffer_size
self._reconcile_level_store_and_samplers()
@property
def all_level_samplers(self):
if len(self.level_samplers) == 0:
return []
return list(filter(lambda x: x is not None, [v for _, v in self.level_samplers.items()]))