Reducing rendering frequency with EnvRenderCallback

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

Hi,

I am trying to log my custom environment with the new API stack and managed to get the example in env_rendering_and_recording.py to work.

I wanted to reduce the frequency of rendering. Ideally, it should happen every X iterations (parameter).
Looking at the code this should be possible just by introducing a flag logged_this_iteration that tracks whether logging occurred. This flag would be reset inside the on_train_result method.
Yet, this does not work. It seems as if the state cannot be shared between on_episode_end and on_train_result methods.

How would I achieve the goal I want:

class EnvRenderCallback(DefaultCallbacks):
    """A custom callback to render the environment.

    This can be used to create videos of the episodes for some or all EnvRunners
    and some or all env indices (in a vectorized env). These videos can then
    be sent to e.g. WandB as shown in this example script here.

    We override the `on_episode_step` method to create a single ts render image
    and temporarily store it in the Episode object.
    """

    def __init__(self, env_runner_indices: Optional[Sequence[int]] = None):
        super().__init__()
        # Only render and record on certain EnvRunner indices?
        self.env_runner_indices = env_runner_indices
        self._record_new_video = True

    def on_episode_step(
            self,
            *,
            episode,
            env_runner,
            metrics_logger,
            env,
            env_index,
            rl_module,
            **kwargs,
    ) -> None:
        """On each env.step(), we add the render image to our Episode instance.

        Note that this would work with MultiAgentEpisodes as well.
        """
        if (
                self.env_runner_indices is not None
                and env_runner.worker_index not in self.env_runner_indices
        ):
            return

        # Skip recording if this episode will not be rendered
        if not self._record_new_video:
            return

        # If we have a vector env, only render the sub-env at index 0.
        if isinstance(env.unwrapped, gymnasium.vector.VectorEnv):
            image = env.envs[0].render()
        # Render the gym.Env.
        else:
            image = env.render()

        # Original render images for CartPole are 400x600 (hxw). We'll downsize here to
        # a very small dimension (to save space and bandwidth).
        # IMPORTANT: Not resizing images often results in OOM error
        image = resize(image, 128, 256)
        # For WandB videos, we need to put channels first.
        image = np.transpose(image, axes=[2, 0, 1])
        # Add the compiled single-step image as temp. data to our Episode object.
        # Once the episode is done, we'll compile the video from all logged images
        # and log the video with the EnvRunner's `MetricsLogger.log_...()` APIs.
        # See below:
        # `on_episode_end()`: We compile the video and maybe store it).
        # `on_sample_end()` We log the best and worst video to the `MetricsLogger`.
        episode.add_temporary_timestep_data("render_images", image)

    def on_episode_end(
            self,
            *,
            episode,
            env_runner,
            metrics_logger,
            env,
            env_index,
            rl_module,
            **kwargs,
    ) -> None:
        # Skip if we should not render this episode
        if not self._record_new_video:
            return

        self._episodes_seen += 1

        # Pull all images from the temp. data of the episode.
        images = episode.get_temporary_timestep_data("render_images")
        # `images` is now a list of 3D ndarrays

        # Create a video from the images by simply stacking them AND
        # adding an extra B=1 dimension. Note that Tune's WandB logger currently
        # knows how to log the different data types by the following rules:
        # array is shape=3D -> An image (c, h, w).
        # array is shape=4D -> A batch of images (B, c, h, w).
        # array is shape=5D -> A video (1, L, c, h, w), where L is the length of the
        # video.
        # -> Make our video ndarray a 5D one.
        video = np.expand_dims(np.stack(images, axis=0), axis=0)
        self._video = video

        # `video` is from the best episode in this cycle (iteration).
        metrics_logger.log_value(
            "episode_videos",
            self._video,
            # Do not reduce the videos (across the various parallel EnvRunners).
            # This would not make sense (mean over the pixels?). Instead, we want to
            # log all best videos of all EnvRunners per iteration.
            reduce=None,
            # B/c we do NOT reduce over the video data (mean/min/max), we need to
            # make sure the list of videos in our MetricsLogger does not grow
            # infinitely and gets cleared after each `reduce()` operation, meaning
            # every time, the EnvRunner is asked to send its logged metrics.
            clear_on_reduce=True,
        )

        self._record_new_video = False

    def on_train_result(
        self,
        *,
        algorithm: "Algorithm",
        metrics_logger: Optional[MetricsLogger] = None,
        result: dict,
        **kwargs,
    ) -> None:
        self._record_new_video = True