Persisting values across callbacks

I’d like to know if I can add an attribute to my DefaultCallbacks and have it persist across multiple on_episode_end callbacks.

What i want is a metric that will average a value over the last 100 episodes. However, there are two things I need in this metric:

  1. if the model completed less than 100 episodes, I still want to sum them up and divide by 100.
  2. At some points in training, I need to “reset” this list of 100 episodes as if the model were starting again.

So, my thoughts were “let’s make a custom_metric.” If I add a collections.deque to my DefaultCallbacks with maxlen=100 and append to it at every on_episode_end, I can check it at the on_train_result callback (where I need it) and add it to the results dictionary. However, I do not know exactly if, when RAY parallelizes into multiple workers, there will be multiple instances of this callback object. Will I be able to trust this value? the on_episode_end callback is called in a “centralized” way?

Hey @BrunoBSM , custom callback objects are not centralized, meaning each worker (num_workers) will have its own instance of your provided class and will thus do its own on_episode_end call.

However, you can add episode.hist_data (build lists with values per-timestep) and episode.custom_metrics (put per-episode metrics here).

In the on_train_results method (only called on your “learner”, meaning centralized), you should then see these metrics in the results argument:

This is from our example script in ray.rllib.examples.custom_metrics_and_callbacks.py

1 Like

Hi @sven1977 ,
I had a query on a related topic you mentioned for the above concern - which is “custom callback objects are not centralized”.
I’m interested in visualising the trajectory of individual workers during training process (tune.run()).
I got the trajectory logged into the console by using the on_postprocess_trajectory method in the callback class.
But as you said, since callbacks are not centralised, I’m unable to append the trajectory separately for each worker and save it to a file. This is because, at the end of the training, the callback triggered are that of PPOTrainer (not Rolloutworker) and I’m unable to fetch the trajectory dataframe for each of three workers as they are being saved in a different instance of the callback method. Is there a better/cleaner way of doing what i need?

Below is the custom post_trajectory callback I have defined

def on_postprocess_trajectory(self,
        *,
        worker: RolloutWorker,
        episode: Episode,
        agent_id: AgentID,
        policy_id: PolicyID,
        policies: Dict[PolicyID, Policy],
        postprocessed_batch: SampleBatch,
        original_batches: Dict[AgentID, Tuple[Policy, SampleBatch]],
        **kwargs):

        
        traj1 =[]
        traj2 =[]
        traj3 =[]
        traj0 =[]
        episode_id = episode.episode_id
        worker_id = worker.worker_index
        # info = kwargs['info']

        try:
            logging.info(f"post_processed_next_obs {postprocessed_batch}")
            logging.info(f"post_processed_next_obs {np.shape(postprocessed_batch['new_obs'])}")
  
        for i in range(len(postprocessed_batch["obs"])):
            action_lows=policies[policy_id].action_space.low
            action_highs=policies[policy_id].action_space.high
            obs_low=obs_space.low
            obs_high=obs_space.high
           
            unsquashed_actions=space_utils.unsquash_action(postprocessed_batch["actions"], policies[policy_id].action_space_struct)
            # print(unsquashed_actions)
            step = {
            "episode_id": episode_id,
            "worker id": worker_id,
            "state": {"volume":obs_low[0] + postprocessed_batch["obs"][i][0]*(obs_high[0]-obs_low[0]),
            "ca":obs_low[1] + postprocessed_batch["obs"][i][1]*(obs_high[1]-obs_low[1]), 
            "rtime":obs_low[2] + postprocessed_batch["obs"][i][2]*(obs_high[2]-obs_low[2])},
            "action": {"qa":unsquashed_actions[i][0],"qs":unsquashed_actions[i][1]},
            "reward": postprocessed_batch["rewards"][i],
            "next_state": {"volume":obs_low[0] + postprocessed_batch["new_obs"][i][0]*(obs_high[0]-obs_low[0]),
            "ca":obs_low[1] + postprocessed_batch["new_obs"][i][1]*(obs_high[1]-obs_low[1]), 
            "rtime":obs_low[2] + postprocessed_batch["new_obs"][i][2]*(obs_high[2]-obs_low[2])},
            "terminated?": postprocessed_batch["terminateds"][i],
            "truncated?": postprocessed_batch["truncateds"][i]
            }
            if worker_id == 1.0:
                traj1.append(step)
            elif worker_id == 2.0:
                traj2.append(step)
            elif worker_id == 3.0:
                 traj3.append(step)
            elif worker_id == 0.0:
                traj0.append(step)
                

            
            
        if worker_id == 1.0:
                self.df1 = self.df1.append(pd.DataFrame(traj1))
                if self.df1.shape[0] >= 13300:
                    self.df1.to_csv('trajectory1.txt', sep='\t', index=False)
                    print(f"this is the current dataframe for worker {worker_id} \n",self.df1)

        elif worker_id == 2.0:
                self.df2 = self.df2.append(pd.DataFrame(traj2))
                if self.df2.shape[0] >= 13300:
                    self.df2.to_csv('trajectory2.txt', sep='\t', index=False)
                    print(f"this is the current dataframe for worker {worker_id} \n",self.df2)
        elif worker_id ==3.0:
                self.df3 = self.df3.append(pd.DataFrame(traj3))
                if self.df3.shape[0] >= 13300:
                    self.df3.to_csv('trajectory3.txt', sep='\t', index=False)
                    print(f"this is the current dataframe for worker {worker_id} \n",self.df3)
        elif worker_id == 0.0:
                self.df0 = self.df0.append(pd.DataFrame(traj0))
                self.df0.to_csv('trajectory0.txt', sep='\t', index=False)
                print(f"this is the current dataframe for worker {worker_id} \n",self.df0)