Off policy algorithms start doing the same action

How severe does this issue affect your experience of using Ray?

  • None: Just asking a question out of curiosity

Hi all,

I’ve been using various algorithms to see what would be best suited for a task and I’ve noticed that any off policy algos I use (SAC, APEX_DDPG, TD3) start doing the same action constantly regardless of input after about 50k steps. Most of the time the actions will be at the end of the range (2 actions spanning from -1 to 1 and the action will be +1, +1 or -1, -1)

I’m using the default config parameters except each fcnet is 4 layers of 1024.

Is this expected behaviour without changing any of the hyperparameters or do I need to start looking for a bug in my code?

Thanks in advance,
Theo

Hi @theo ,

You’ll need to give more context. Most importantly, what environment are you using?
Can you post a reproduction script?

Hi @arturn ,

I’m using an external simulator which plugs into the policy client / server objects. The code for the client and the server are below. If it’s just a case of hyperparameter tuning then that’s great as I was expecting to do so anyway I just wanted to get a baseline using the default configs. My worry is that something else is going wrong.

Client:

from ray.rllib.env.policy_client import PolicyClient

from AcEnv import AcEnv

IP = "<REDACTED>"
PORT = 29200
ADDR = (IP, PORT)
SIZE = 1024
FORMAT = "utf-8"

def main(car_num, port, address="localhost", inference_mode="local", no_train=False, off_policy=False, stop_reward=9999):
    env = AcEnv(car_num, IP, PORT, SIZE, FORMAT)

    client = PolicyClient(
        f"http://{address}:{port}", inference_mode=inference_mode
    )

    # Start a new episode.
    obs = env.reset()
    eid = client.start_episode(training_enabled=not no_train)

    step_count = 1
    rewards = 0.0
    reward_list = []
    while True:
        # Compute an action randomly (off-policy) and log it.
        if off_policy:
            action = env.action_space.sample()
            client.log_action(eid, obs, action)
        # Compute an action locally or remotely (on server).
        # No need to log it here as the action
        else:
            action = client.get_action(eid, obs)

        step_count += 1

        # Perform a step in the external simulator (env).
        obs, reward, done, info = env.step(action)
        rewards += reward
        reward_list.append(reward)

        # Log next-obs, rewards, and infos.
        client.log_returns(eid, reward, info=info)

        # if step_count % 10 == 0:  # For debugging the new feature space
        #     print()
        #     print()
        #     print(obs)

        # Reset the episode if done.
        if done or step_count >= 660:  # Around 1 minute
            #print("Total reward:", rewards)
            if rewards >= stop_reward:
                #print("Target reward achieved, exiting")
                exit(0)

            rewards = 0.0
            step_count = 0
            #print(f"###################Car {car_num}###########################")
            #print(f"Reward list: {reward_list}")
            #print(f"################End of Car {car_num}#######################")

            # End the old episode.
            client.end_episode(eid, obs)

            # Start a new episode.
            obs = env.reset()
            eid = client.start_episode(training_enabled=not no_train)

Server:

from __future__ import division

import argparse
from math import floor

import os

import numpy as np
import ray
from gymnasium.spaces import Box
from ray import air, tune
from ray.rllib.algorithms import AlgorithmConfig
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.algorithms.registry import get_algorithm_class
from ray.rllib.algorithms.sac import SACConfig
from ray.rllib.env.policy_server_input import PolicyServerInput
from ray.rllib.algorithms.callbacks import DefaultCallbacks

from ray.rllib.evaluation import Episode, RolloutWorker
from ray.rllib.policy import Policy
from ray.rllib.policy.sample_batch import SampleBatch
from typing import Dict, Tuple

from ray.tune.registry import get_trainable_cls

SERVER_ADDRESS = "localhost"
# In this example, the user can run the policy server with
# n workers, opening up listen ports 9900 - 990n (n = num_workers - 1)
# to each of which different clients may connect.
SERVER_BASE_PORT = 9900  # + worker-idx - 1

CHECKPOINT_FILE = "last_checkpoint_{}.out"

class CustomCallbacks(DefaultCallbacks):
    def on_postprocess_trajectory(
        self,
        *,
        worker: RolloutWorker,
        episode: Episode,
        agent_id: str,
        policy_id: str,
        policies: Dict[str, Policy],
        postprocessed_batch: SampleBatch,
        original_batches: Dict[str, Tuple[Policy, SampleBatch]],
        **kwargs
    ) -> None:
        # print("Starting custom callback")
        # Set reward for each step to the reward of the next step.#
        for i in range(len(postprocessed_batch["rewards"]) -1):
            # print(postprocessed_batch["rewards"][i])
            postprocessed_batch["rewards"][i] = postprocessed_batch["rewards"][i + 1]



def get_cli_args():
    """Create CLI parser and return parsed arguments"""
    parser = argparse.ArgumentParser()

    # Example-specific args.
    parser.add_argument(
        "--port",
        type=int,
        default=SERVER_BASE_PORT,
        help="The base-port to use (on localhost). " f"Default is {SERVER_BASE_PORT}.",
    )
    parser.add_argument("--log-level", type=int, default=30)
    parser.add_argument("--verbose", type=int, default=0, help="Verbose mode.")
    parser.add_argument(
        "--callbacks-verbose",
        action="store_true",
        help="Activates info-messages for different events on "
        "server/client (episode steps, postprocessing, etc..).",
    )
    parser.add_argument(
        "--num-workers",
        type=int,
        default=4,
        help="The number of workers to use. Each worker will create "
        "its own listening socket for incoming experiences.",
    )
    parser.add_argument(
        "--no-restore",
        action="store_true",
        help="Do not restore from a previously saved checkpoint (location of "
        "which is saved in `last_checkpoint_[algo-name].out`).",
    )

    # General args.
    parser.add_argument(
        "--run",
        default="PPO",
        choices=["SAC", "TD3", "APEX_DDPG", "PPO", "APPO", "A2C", "A3C"],  # Add the rest here
        help="The RLlib-registered algorithm to use.",
    )
    parser.add_argument("--layer-size", type=int, default=0)
    parser.add_argument("--layer-count", type=int, default=0)
    parser.add_argument("--n-step", type=int, default=1)
    parser.add_argument("--num-cpus", type=int, default=4)
    parser.add_argument("--num-cpus-for-local", type=int, default=1)
    parser.add_argument("--num-gpus", type=int, default=1)
    parser.add_argument("--num-gpus-per-worker", type=float, default=0)
    parser.add_argument(
        "--framework",
        choices=["tf", "tf2", "torch"],
        default="tf",
        help="The DL framework specifier.",
    )
    parser.add_argument(
        "--use-lstm",
        action="store_true",
        help="Whether to auto-wrap the model with an LSTM. Only valid option for "
        "--run=[IMPALA|PPO|R2D2]",
    )
    parser.add_argument(
        "--stop-iters", type=int, default=99999, help="Number of iterations to train."
    )
    parser.add_argument(
        "--stop-timesteps",
        type=int,
        default=1000000,
        help="Number of timesteps to train.",
    )
    parser.add_argument(
        "--stop-reward",
        type=float,
        default=99999.9,
        help="Reward at which we stop training.",
    )
    parser.add_argument(
        "--as-test",
        action="store_true",
        help="Whether this script should be run as a test: --stop-reward must "
        "be achieved within --stop-timesteps AND --stop-iters.",
    )
    parser.add_argument(
        "--no-tune",
        action="store_true",
        help="Run without Tune using a manual train loop instead. Here,"
        "there is no TensorBoard support.",
    )
    parser.add_argument(
        "--local-mode",
        action="store_true",
        help="Init Ray in local mode for easier debugging.",
    )

    args = parser.parse_args()
    print(f"Running with following CLI args: {args}")
    return args


if __name__ == "__main__":
    args = get_cli_args()
    ray.init(log_to_driver=False, logging_level=args.log_level)

    # `InputReader` generator (returns None if no input reader is needed on
    # the respective worker).
    def _input(ioctx):
        # We are remote worker or we are local worker with num_workers=0:
        # Create a PolicyServerInput.
        if ioctx.worker_index > 0 or ioctx.worker.num_workers == 0:
            return PolicyServerInput(
                ioctx,
                SERVER_ADDRESS,
                args.port + ioctx.worker_index - (1 if ioctx.worker_index > 0 else 0),
            )
        # No InputReader (PolicyServerInput) needed.
        else:
            return None

    # Algorithm config. Note that this config is sent to the client only in case
    # the client needs to create its own policy copy for local inference.
    config = get_trainable_cls(args.run).get_default_config()

    # if args.num_cpus_for_local <= 0 or args.num_cpus_for_local > args.num_cpus:
    #     raise ValueError(
    #         "num_cpus_for_local must be > 0 and <= num_cpus: "
    #         f"{args.num_cpus_for_local} vs. {args.num_cpus}"
    #     )

    if args.num_workers > 0:
        worker_cpus = args.num_cpus / (args.num_workers + 1)
        if worker_cpus > 1:
            worker_cpus = floor(worker_cpus)
    else:
        worker_cpus = args.num_cpus

    (config
        # Indicate that the Algorithm we setup here doesn't need an actual env.
        # Allow spaces to be determined by user (see below).
        .environment(
            env=None,
            observation_space=Box(
                np.array(
                    (([-1e3] * 6) + ([-1.] * 3)) * 40 + [-np.inf] * 3 + [-np.inf] * 3 + [-1.] * 3  # +-1000 is an overestimate from visual inspection, hopefully it will be fine
                    + [0.] + [0.] + [0.] + [-1.] + [0.] * 4 + [0.] * 4 + [0.] * 4,
                    dtype=np.float32
                ),
                np.array(
                    (([1e3] * 6) + ([1.] * 3)) * 40 + [np.inf] * 3 + [np.inf] * 3 + [1.] * 3
                    + [np.inf] + [1.] + [1.] + [1.] + [1.] * 4 + [np.inf] * 4 + [np.inf] * 4,
                    dtype=np.float32
                ),
                dtype=np.float32
            ),
            action_space=Box(low=-1., high=1., shape=(2,)),
        )
        # DL framework to use.
        .framework(args.framework)
        # Create a "chatty" client/server or not.
        .callbacks(CustomCallbacks)
        # Use the `PolicyServerInput` to generate experiences.
        .offline_data(input_=_input)
        # Use n worker processes to listen on different ports.
        .rollouts(
            num_rollout_workers=args.num_workers,
            recreate_failed_workers=True,
        )
        # Allocate resources
        .resources(
            num_gpus=args.num_gpus,
            num_cpus_per_worker=worker_cpus,
            num_gpus_per_worker=args.num_gpus_per_worker,
        )
        # Disable OPE, since the rollouts are coming from online clients.
        .evaluation(off_policy_estimation_methods={})
        # Set to INFO so we'll see the server's actual address:port.
        # .debugging(log_level="WARN")
     )

    if args.layer_size > 0 and args.layer_count > 0:
        config.model.update(
            {
                "fcnet_hiddens": [args.layer_size] * args.layer_count,
            }
        )

    if args.run == "SAC":
        if args.layer_size > 0 and args.layer_count > 0:
            config.q_model_config.update({
                "fcnet_hiddens": [args.layer_size] * args.layer_count,
            })
            config.policy_model_config.update({
                "fcnet_hiddens": [args.layer_size] * args.layer_count,
            })

    checkpoint_path = CHECKPOINT_FILE.format(args.run)
    # Attempt to restore from checkpoint, if possible.
    if not args.no_restore and os.path.exists(checkpoint_path):
        checkpoint_path = open(checkpoint_path).read()
    else:
        checkpoint_path = None


    stop = {
        "training_iteration": args.stop_iters,
        "timesteps_total": args.stop_timesteps,
        "episode_reward_mean": args.stop_reward,
    }

    tune.Tuner(
        args.run, param_space=config, run_config=air.RunConfig(stop=stop, verbose=args.verbose),
    ).fit()

I don’t know about the dynamics of your environment still but 4 layers of 1024 neurons is already enough capacity for the problems we solve ourselves as part of our tests.

Just to make sure: The learnt solutions are not by any chance close to optimal already? AcEnv sounds a little bit like a thermostat and simple control tasks can be learnt quickly.
If so, you can probably go to hyperparameter tuning already.

If the actions make no sense at all it can be a sign for an array of things. From unstable training to nasty mistakes where timesteps of some values are off by one etc.

The environment is a racing simulator. The solutions are no where near optimal but I’m currently doing various runs with different algorithms and a range of learning rates for each algo. By tomorrow evening I should have some data on a couple of the off policy algorithms which will hopefully shed some light on it.

timesteps of some values are off by one

What do you mean by this?

What do you mean by this?

For example that your are training on states that are shifted by one.
Or that your environment does not return the current observation but the last one.
Stuff like that does not surface easily because the values are all of correct shape and in their bounds.

So the observations returned are definitely the correct ones. I am sending rewards 1 action late but I’ve corrected that with the on_postprocess_trajectory() callback. I’ve manually checked these values after the callback is executed and they are associated with the correct actions. Am I correct in assuming that this callback is run before the data is used for training?

The image below shows PPO vs APEX_DDPG with the same network sizes, activation function and the best learning rate from 1e-3 to 1e-8 (learning rates were changed by a factor of 10).

I appreciate all the help.

Hi @theo ,

Yes, on_postprocess_trajectory () is run before training.
Have you played with
n_step, target_network_update_freq, tau ?
Can you post some metrics from your tensorboards?
Actor and critic loss and how do q values develop?

Hi @arturn ,

I haven’t yet played around with those parameters but I have them in my plan as I’m doing an evaluation of different algorithms and how the hyperparameters affect results.

There doesn’t seem to be any logging for actor and critic loss for any of the APEX_DDPG runs I’ve done. Here are the mean, min and max q values for 3 of the runs with learning rates:
Dark Blue: 1e-5
Light Blue: 1e-6
Orange: 1e-8

Here is a small video snippet showing the behaviour I’m seeing.

As you can see it’s just consistently driving with full throttle and full steering lock. This happens after about 2 minutes of what looks like random actions (or at least varying actions). Conversely, with A2C and PPO it looks like things are working more as expected as there is some variation in the actions being performed and after about 500k steps it starts to stay on the track and make progress along the road.