Hi @sven1977 ,
I had a query on a related topic you mentioned for the above concern - which is “custom callback objects are not centralized”.
I’m interested in visualising the trajectory of individual workers during training process (tune.run()).
I got the trajectory logged into the console by using the on_postprocess_trajectory method in the callback class.
But as you said, since callbacks are not centralised, I’m unable to append the trajectory separately for each worker and save it to a file. This is because, at the end of the training, the callback triggered are that of PPOTrainer (not Rolloutworker) and I’m unable to fetch the trajectory dataframe for each of three workers as they are being saved in a different instance of the callback method. Is there a better/cleaner way of doing what i need?
Below is the custom post_trajectory callback I have defined
def on_postprocess_trajectory(self,
*,
worker: RolloutWorker,
episode: Episode,
agent_id: AgentID,
policy_id: PolicyID,
policies: Dict[PolicyID, Policy],
postprocessed_batch: SampleBatch,
original_batches: Dict[AgentID, Tuple[Policy, SampleBatch]],
**kwargs):
traj1 =[]
traj2 =[]
traj3 =[]
traj0 =[]
episode_id = episode.episode_id
worker_id = worker.worker_index
# info = kwargs['info']
try:
logging.info(f"post_processed_next_obs {postprocessed_batch}")
logging.info(f"post_processed_next_obs {np.shape(postprocessed_batch['new_obs'])}")
for i in range(len(postprocessed_batch["obs"])):
action_lows=policies[policy_id].action_space.low
action_highs=policies[policy_id].action_space.high
obs_low=obs_space.low
obs_high=obs_space.high
unsquashed_actions=space_utils.unsquash_action(postprocessed_batch["actions"], policies[policy_id].action_space_struct)
# print(unsquashed_actions)
step = {
"episode_id": episode_id,
"worker id": worker_id,
"state": {"volume":obs_low[0] + postprocessed_batch["obs"][i][0]*(obs_high[0]-obs_low[0]),
"ca":obs_low[1] + postprocessed_batch["obs"][i][1]*(obs_high[1]-obs_low[1]),
"rtime":obs_low[2] + postprocessed_batch["obs"][i][2]*(obs_high[2]-obs_low[2])},
"action": {"qa":unsquashed_actions[i][0],"qs":unsquashed_actions[i][1]},
"reward": postprocessed_batch["rewards"][i],
"next_state": {"volume":obs_low[0] + postprocessed_batch["new_obs"][i][0]*(obs_high[0]-obs_low[0]),
"ca":obs_low[1] + postprocessed_batch["new_obs"][i][1]*(obs_high[1]-obs_low[1]),
"rtime":obs_low[2] + postprocessed_batch["new_obs"][i][2]*(obs_high[2]-obs_low[2])},
"terminated?": postprocessed_batch["terminateds"][i],
"truncated?": postprocessed_batch["truncateds"][i]
}
if worker_id == 1.0:
traj1.append(step)
elif worker_id == 2.0:
traj2.append(step)
elif worker_id == 3.0:
traj3.append(step)
elif worker_id == 0.0:
traj0.append(step)
if worker_id == 1.0:
self.df1 = self.df1.append(pd.DataFrame(traj1))
if self.df1.shape[0] >= 13300:
self.df1.to_csv('trajectory1.txt', sep='\t', index=False)
print(f"this is the current dataframe for worker {worker_id} \n",self.df1)
elif worker_id == 2.0:
self.df2 = self.df2.append(pd.DataFrame(traj2))
if self.df2.shape[0] >= 13300:
self.df2.to_csv('trajectory2.txt', sep='\t', index=False)
print(f"this is the current dataframe for worker {worker_id} \n",self.df2)
elif worker_id ==3.0:
self.df3 = self.df3.append(pd.DataFrame(traj3))
if self.df3.shape[0] >= 13300:
self.df3.to_csv('trajectory3.txt', sep='\t', index=False)
print(f"this is the current dataframe for worker {worker_id} \n",self.df3)
elif worker_id == 0.0:
self.df0 = self.df0.append(pd.DataFrame(traj0))
self.df0.to_csv('trajectory0.txt', sep='\t', index=False)
print(f"this is the current dataframe for worker {worker_id} \n",self.df0)