Seeking recommendations for implementing Dual Curriculum Design in RLlib

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hi Team,

I am currently trying to implement Dual Curriculum Design, in particular, * Robust PLR in RLlib. In order to estimate Regret and use it as the fitness score for the task and agent pair, the paper introduced the Positive Value Loss, which needed to access the average and max advantages as implemented in their level_sampler.

Since I would like to store the estimate Regret in the train_results from a PPO trainer, so I can access this info in a callback function to update the Level Sampler. I am currently blocked on not knowing how to get the average and max advantages from RLlib’s PPO implementation. I see that in PPO Torch Policy, the postprocess_trajectory() is returning the GAE values via compute_gae_for_sample_batch().

Can you advise me on how to get the average and max advantages from RLlib’s PPO implementation?

Thank you,
Heng

Thanks to the official rllib example custom_metrics_and_callbacks.py , I am able to gather the max and mean advantages by accessing the advantages with postprocessed_batch["advantages"] in the on_postprocess_trajectory() call back.

Now I have another straightforward question. Since * Robust PLR may switch train and eval during each episode based on the sample replay decision. To enable switch training and eval for all workers per episode, is by using policies["default_policy"].model.eval() or policies["default_policy"].model.train() should be sufficient enough for me to toggle training and eval for specific episodes?

1 Like

Hi Team,

I have implemented the PLRCallbacks(), and I have another RLLib question where I tried to understand correlation between episodes_total in the train_results and the frequency of calling on_episode_start() with multiple workers.

Since I called the sample_replay_decision() at the beginning of on_episode_start(), I noticed that when when running with more workers, there are more number of episodes_total being logged than the my custom metric started_episode_max that’s being logged at the beginning of on_episode_start(). Would you please let me know the reason behind it? This is critical for how frequent we can call level replay in the Ray PLR callback.

Here I set num_workers=1

== Status ==
Current time: 2023-03-14 18:14:15 (running for 00:00:37.25)
Memory usage on this node: 12.1/125.6 GiB 
Using FIFO scheduling algorithm.
Resources requested: 3.0/3 CPUs, 1.0/2 GPUs, 0.0/52.46 GiB heap, 0.0/26.23 GiB objects (0.0/1.0 accelerator_type:G)
Number of trials: 1/1 (1 RUNNING)
+---------------------------------+----------+-------------------+------------------+------------------------+------------------------+-----------------------+--------+------------------+------+---------------+--------------+--------------+--------------------+------------------+--------------------+-------------------+
| Trial name                      | status   | loc               | Custom Metrics   |   working_seed_buffer_ |   level_replay_times_m |   started_episode_max |   iter |   total time (s) |   ts |   reward_mean |   reward_max |   reward_min |   episode_len_mean |   episodes_total |   blue_reward_mean |   red_reward_mean |
|                                 |          |                   |                  |               size_max |                     ax |                       |        |                  |      |               |              |              |                    |                  |                    |                   |
|---------------------------------+----------+-------------------+------------------+------------------------+------------------------+-----------------------+--------+------------------+------+---------------+--------------+--------------+--------------------+------------------+--------------------+-------------------|
| PPO_DCD_78223_00000 | RUNNING  | 10.1.60.61:669871 |                  |                      2 |                      3 |                     3 |      2 |          23.1688 | 2000 |       5.33107 |      8.22913 |      2.43302 |                999 |                2 |            5.87292 |         -0.541841 |
+---------------------------------+----------+-------------------+------------------+------------------------+------------------------+-----------------------+--------+------------------+------+---------------+--------------+--------------+--------------------+------------------+--------------------+-------------------+


Here I set num_workers=10

== Status ==
Current time: 2023-03-14 18:27:44 (running for 00:00:43.37)
Memory usage on this node: 19.1/125.6 GiB 
Using FIFO scheduling algorithm.
Resources requested: 12.0/12 CPUs, 1.0/2 GPUs, 0.0/52.42 GiB heap, 0.0/26.21 GiB objects (0.0/1.0 accelerator_type:G)
Number of trials: 1/1 (1 RUNNING)
+---------------------------------+----------+-------------------+------------------+------------------------+------------------------+-----------------------+--------+------------------+-------+---------------+--------------+--------------+--------------------+------------------+--------------------+-------------------+
| Trial name                      | status   | loc               | Custom Metrics   |   working_seed_buffer_ |   level_replay_times_m |   started_episode_max |   iter |   total time (s) |    ts |   reward_mean |   reward_max |   reward_min |   episode_len_mean |   episodes_total |   blue_reward_mean |   red_reward_mean |
|                                 |          |                   |                  |               size_max |                     ax |                       |        |                  |       |               |              |              |                    |                  |                    |                   |
|---------------------------------+----------+-------------------+------------------+------------------------+------------------------+-----------------------+--------+------------------+-------+---------------+--------------+--------------+--------------------+------------------+--------------------+-------------------|
| PPO_DCD_56bf2_00000 | RUNNING  | 10.1.60.61:765156 |                  |                      2 |                      3 |                     3 |      2 |          12.9763 | 20000 |       3.63211 |       12.149 |     -3.37321 |                992 |               20 |            4.65237 |          -1.02026 |
+---------------------------------+----------+-------------------+------------------+------------------------+------------------------+-----------------------+--------+------------------+-------+---------------+--------------+--------------+--------------------+------------------+--------------------+-------------------+


1 Like

Hi @heng2j,

good to see you here. I have not yet worked with PLR, but try to give some thoughts in regard to the metrics.
If you use multiple workers you might want to check how exactly the different callbacks entities are collecting the metric data and how it is later on aggregated. It looks to me that each worker now counts individually and does have the same number of started episodes than the worker in the single worker setup.

Let me know, if this could help.

Best, Simon

1 Like

Hey @Lars_Simon_Zehnder!

Good to see you here as well! I hope everything is going well for you!

I have tried to let the single-worker experiment run for a more extended period (10M time steps). Even in this experiment, the number of episodes_total and started_episode_max also diverged. Eventually, the number of episodes_total was almost double of started_episode_max.

I will take a deeper dive to debug this. And I will keep you posted. Once again. Thank you Simon for your help!

Hi @heng2j,

I think there are two points that might be relevant.

The first is that episodes_total is the number of completed episodes not the number of started episodes.

The second is that each rollout worker runs in a seperate process and each callback in a rollout worker has its own independent instance. So if you are taking episode stats in a callback then each rollout worker will only have a count of the number of episodes that have been stated by that rollout worker.
If you want an aggregated sum across the workers then you will need to do that your self potentially in the on_sample_end callback.

2 Likes

Hey @heng2j,

thanks, all well on my side. Let’s update these days.

In regard to @mannyv 's answer: you can check the different callback instances by calling id() on the callbacks and see that they are different.

His hint about the ended episodes counted in total_episodes is imo crucial as the episodes that started might still not be finished. You might also want to think about if and how you want to account for crashed workers and their unfinished episodes when aggregating the metrics from the individual workers.

When aggregating the individual ones you could take a look into the on_train_result()-callback as it makes available the algorithm with its attribute workers with their corresponding callbacks attribute. At this time the result dict also contains episode data (rollout metrics) and gives you e.g. faulty_episodes; but tbh I have never tried it.

1 Like

Hi @mannyv and @Lars_Simon_Zehnder ,

Thank you for your replies!

Yes, I noticed that episodes_total is the number of completed episodes. Therefore I logged started_episode in on_episode_start(), so I should expect more started_episode_max than episodes_total, including failed episodes from failed workers. However, I see the opposite where the number of episodes_total is significantly greater than started_episode_max. And I set self.started_episode as part of the global attribute for the callbacks since I will need to preserve global training stats and data across all rollouts. Therefore, I have set all the global PLR variables and objects in the init of PLRCallbacks. And I checked that the global objects were the same with identical object ids across the entire experiment. Therefore, I would like to know whether I am doing proper aggregation by directly updating these global variables and objects in the callback or not.

As stated in the paper, I need global objects, i.e. level store and level sampler, to maintain training progress, and dynamically adjust the PLR training protocol throughout the experiment. I have provided the following sample PLRCallbacks as a reference.


from dcd.level_replay.level_sampler import LevelSampler
from dcd.level_replay.level_store import LevelStore

...

class PLRCallbacks(DefaultCallbacks):

    
    def __init__(
        self,
        *args,
        **kwargs,
    ):

        # All these arguments will be used across all episodes during this experiment
        plr_args = types.SimpleNamespace(**kwargs)
        plr_args = make_plr_args(plr_args)

        # Set up PLR
        self.level_store = None
        self.level_samplers = {}
        self.current_level_seeds = None
        self.weighted_num_edits = 0
        self.latest_env_stats = defaultdict(float)
        self.is_dr = False
        self.is_training_env = True
        # use_byte_encoding
        data_info = {
            'numpy': True,
            'dtype': '<U32',
            'shape': (10, 6)
        }
        self.level_store = LevelStore(data_info=data_info)
        self.level_samplers = {}
        self.level_samplers['agent'] = LevelSampler(**plr_args) 
        self.current_level_seeds = [-1 for i in range(plr_args["num_actors"])]
        self._default_level_sampler = self.all_level_samplers[0]
        self.base_levels = None
        self.level_replay_times = 0
        self.not_level_replay_times = 0
        self.started_episode = 0

        super().__init__()


    def __call__(self, *args, **kwargs):
        return self

    def _sample_replay_decision(self):
        return self._default_level_sampler.sample_replay_decision()

    def _reconcile_level_store_and_samplers(self):
        all_replay_seeds = set()
        for level_sampler in self.all_level_samplers:
            all_replay_seeds.update([x for x in level_sampler.seeds if x >= 0])
        self.level_store.reconcile_seeds(all_replay_seeds)

    def _update_level_samplers_with_external_unseen_sample(self, seeds, solvable=None):
        level_samplers = self.all_level_samplers
        for level_sampler in level_samplers:
            level_sampler.observe_external_unseen_sample(seeds, solvable)

    def _update_plr_with_current_unseen_levels(self, levels, passible=None, reject_unsolvable_seeds=True, parent_seeds=None):
        self.current_level_seeds = \
            self.level_store.insert(levels, parent_seeds=parent_seeds)

        self._update_level_samplers_with_external_unseen_sample(
            self.current_level_seeds, solvable=passible)


    def on_episode_start(
        self,
        *,
        worker: RolloutWorker,
        base_env: BaseEnv,
        policies: Dict[str, Policy],
        episode: Episode,
        env_index: int,
        **kwargs
    ):
        # Make sure this episode has just been started (only initial obs
        # logged so far).
        assert episode.length == 0, (
            "ERROR: `on_episode_start()` callback should be called right "
            "after env reset!"
        )

        # R-PLR 
        # Sample replay-decision 
        self.level_replay = False
        self.level_replay = self._sample_replay_decision()

        if self.level_replay:
            # Yes replay
            # - Sample a replay level from level store
            self.current_level_seeds = [self._default_level_sampler.sample_replay_level() for _ in range(worker.num_workers)]
            levels = [self.level_store.get_level(seed) for seed in self.current_level_seeds]

            #  Set env to level 
            for level in levels:
                for env in worker.foreach_env( lambda env: env):
                     env.reset_to_level(level)
            
            #  Set train for blue agent
            if not policies["blue"].model.training:
                policies["blue"].model.train()

            # Update level_replay_times for records
            self.level_replay_times += 1
                
        else:
            # No replay
            # Sample level from level generator by reset to random level
            # FIXME - Assume using DR for generating new level
            worker.foreach_env( lambda env: env.reset()) # env.reset_random())

            # Add new level to level store 
            levels = worker.foreach_env( lambda env: env.encoding.tobytes())
            self._update_plr_with_current_unseen_levels(levels=levels, parent_seeds=None)

            # OR 
            #   - Sample random level from generator
            #       - Set env to random 

            #  Set no train for blue agent
            if policies["blue"].model.training:
               policies["blue"].model.eval()

        
        self.started_episode += 1
        episode.custom_metrics["started_episode"] = self.started_episode
        episode.custom_metrics["started_episode"] = self.started_episode




    def on_episode_step(
        self,
        *,
        worker: RolloutWorker,
        base_env: BaseEnv,
        policies: Dict[str, Policy],
        episode: Episode,
        env_index: int,
        **kwargs
    ):
        # Make sure this episode is ongoing.
        assert episode.length > 0, (
            "ERROR: `on_episode_step()` callback should not be called right "
            "after env reset!"
        )

        if self.level_replay:
            if not policies["blue"].model.training:
                policies["blue"].model.train()
        else:
            if policies["blue"].model.training:
                policies["blue"].model.eval()


    def on_episode_end(
        self,
        *,
        worker: RolloutWorker,
        base_env: BaseEnv,
        policies: Dict[str, Policy],
        episode: Episode,
        env_index: int,
        **kwargs
    ):
        # Check if there are multiple episodes in a batch, i.e.
        # "batch_mode": "truncate_episodes".
        if worker.policy_config["batch_mode"] == "truncate_episodes":
            # Make sure this episode is really done.
            assert episode.batch_builder.policy_collectors["blue"].batches[
                -1
            ]["dones"][-1], (
                "ERROR: `on_episode_end()` should only be called "
                "after episode is done!"
            )

        rollout_batch = episode.batch_builder.policy_collectors["blue"].batches[-1]


        # R-PLR
        # - Compute PLR score S
        # This is where we can get the advantages
        episode.user_data["mean_score"] = rollout_batch["advantages"].mean()
        episode.user_data["max_score"] = rollout_batch["advantages"].max()


        # ========= update level sampler and level store  with rollouts ========= # 
        rollout_batch.actor_index = worker.worker_index - 1
        rollout_batch.seed_t =  self.current_level_seeds[episode.env_id]
        rollout_batch.score = episode.user_data["mean_score"]
        rollout_batch.max_score = episode.user_data["max_score"]
        rollout_batch.num_steps = len(rollout_batch)
        rollout_batch.grounded_value = rollout_batch["rewards"].max() 

        self._default_level_sampler.update_with_rollouts(rollout_batch)
        self._default_level_sampler.after_update()

        episode.custom_metrics["working_seed_buffer_size"] = self._default_level_sampler.working_seed_buffer_size

        self._reconcile_level_store_and_samplers()


    @property
    def all_level_samplers(self):
        if len(self.level_samplers) == 0:
            return []

        return list(filter(lambda x: x is not None, [v for _, v in self.level_samplers.items()]))



Hi @heng2j,

Which part are you considering to have global information because I do not see anything here that will be global across the rollout workers. I may be missing something.

As far as I understand each of those callbacks is a seperate instance in each rollout worker so mutating a variable in the callback of worker 1 will not be observed inside the callbacks of other workers.

If you need a truly globally synchronized data then your best bet might be to create a ray actor to hold that information and then pass a handle to that actor in the env config so that each callback can access the remote ray actor.

One more thing
I am not sure what you are intending with the code snippet below but I do not think this is doing what you think it is doing. During the rollout phase when these callbacks are used whether the model is in training or eval mode will have little effect. It will change how dropout and batchnorm behave but that is about it.

What exactly are you trying to do there?

policies["blue"].model.train()

https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.train

2 Likes

Hi @mannyv,

Yeah, for the global information, I thought the variables and objects I defined inside init of PLRCallbacks could be referenced globally across the entire experiment. As you suggested, I would have created a remote ray actor to hold all the global information there and allow each callback to access this remote ray actor.

For setting policies["blue"].model.train()/eval(), I was trying to set the training protocol for each episode dynamically. In Robust PLR , at the beginning of each episode, PLR will determine if we will run the episode with a newly generated scenario or restore to a scenario that Robust PLR previously saved in the Level Store. If we run the episode with a newly generated scenario, we only perform eval without updating the policy on this new scenario. And we will calculate a fitness score for this new scenario based on the returns from the evaluation. The new scenario and its fitness score will then be saved in the PLR Level store.
On the other hand, if PLR decides to restore a previously stored scenario, aka replay, we will perform training to update the policy from the replayed trajectories. So, for policies["blue"].model.train()/eval(), I was trying to toggle the training protocol based on the replay decision. And that’s what I was trying to ask in my 2nd post: Seeking recommendations for implementing Dual Curriculum Design in RLlib - #2 by heng2j

Hi @heng2j ,

regrettably, using global variables in Ray is not that straightforward as different actors manage their own states and run in individual processes (so unless we program a global state update it will not happen, see also the antipattern).

In your case each RolloutWorker will have its Callbacks entity and will call it on e.g. episode end. As these are different ones for each RolloutWorker they all hold their own state and there will be no sync of states unless you explicitly program it into your code. Note, the algorithm object has its own callback entity itself - so when you call on_train_result() the callback is not any one of the callbacks that has collected custom metrics in the RolloutWorkers.

If you want to manage a global state you can do it only at a point where you have access to all RolloutWorkers and their corresponding callbacks. on_train_result() is for example such a point. At this point you could collect the metrics from all RolloutWorkers and aggregate them. Also, your metric might not be an episode metric (RolloutMetrics; as it counts over more than a single episode) but instead part of the LearnerStatsDict (I do not know exactly what you want to measure, but you might want to sum the single metrics instead of averaging them). The latter is the result parameter in the on_train_result() method.

1 Like

Thank you so much Simon for your detailed responses! I will redesign the PLRCallback and the PLR Actor this week based on the recommendations you and @mannyv provided.

1 Like

Hi @mannyv,

As I mentioned above, for alternating training and eval on an episodic basis, any recommendation for implementing this in RLLib?

Hi @mannyv ,

My colleague pointed me to this code snippet where post processing of trajectory per episode seems can be skipped by providing {"training_enabled" : False} in the info dict.

Just want to double check with you to see if this is right approach to switching between training and eval per episode.

Thank you,
Heng