Num_gpu, rollout_workers, learner_workers, evaluation_workers purpose + resource allocation

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hi all,

I’ve come to realise my understanding of the way certain parameters regarding num_gpu, rollout_workers (as well as cpu and gpu per worker), learner_worker, and evaluation worker is flawed and the documentation I could find did not help me.

So I’m hoping someone can clarify the following points:

All of this is assuming a single gpu is available and a 12core/24 thread cpu on a single machine.

  1. For num_gpu parameter: Number of GPUs to allocate to the algorithm process → What does this mean? What is the algorithm process? What is it responsible and what does it do?

  2. num_rollout_workers: Am I correct in understanding that any requests for actions will be automatically load-balanced between n number of rollout workers?
    2.5: When specifying num_cpu and num_gpu per rollout worker, those resources will be available to each worker to perform the forward/prediction passes when getting get_action?
    2.75: If gpu’s are assigned to the worker, they each make a copy of the model in the gpu for the forward pass, thus not really needing cpu’s?

  3. Is the learner_worker the one that does the training loop? So assigning it cpu+gpu means it will use those resources (if given a gpu, makes a copy of the model in gpu) for training?
    3.25: After a training loop, are these resources released?
    3.5: Reason for above, if you look at the code below. I assign a total of 1.0 gpu (0.25 for algorithm?, 0.25 for 2 workers, and 0.25 for learner), and 6 cpu (2 cpu for 2 workers, and 2 for the learner). However, tune.run reports utilising only 5.0 cpu and 0.75 gpu. Why is the rest not used? And why is there 5.0 cpu? My guess 4 for workers, and 1 for the driver → in which case how to specify resources for the driver, and what is the difference between the driver and the algorithm cpu?

  4. What is the evaluation worker? What is it’s purpose? And can anyone provide any explanation/documentation links please.

Sample code below:

import ray
from ray.rllib.env import PolicyServerInput
from ray.rllib.algorithms.ppo import PPOConfig

import numpy as np
import argparse
from gymnasium.spaces import MultiDiscrete, Box

ray.init(num_cpus=9, num_gpus=1, log_to_driver=False, configure_logging=False)

ppo_config = PPOConfig()

parser = argparse.ArgumentParser(description='Optional app description')
parser.add_argument('-ip', type=str, help='IP of this device')

parser.add_argument('-checkpoint', type=str, help='location of checkpoint to restore from')

args = parser.parse_args()


def _input(ioctx):
    # We are remote worker, or we are local worker with num_workers=0:
    # Create a PolicyServerInput.
    if ioctx.worker_index > 0 or ioctx.worker.num_workers == 0:
        return PolicyServerInput(
            ioctx,
            args.ip,
            55556 + ioctx.worker_index - (1 if ioctx.worker_index > 0 else 0),
        )
    # No InputReader (PolicyServerInput) needed.
    else:
        return None


x = 320
y = 240

# kl_coeff, ->default 0.2
# ppo_config.gamma = 0.01  # vf_loss_coeff used to be 0.01??
# "entropy_coeff": 0.00005,
# "clip_param": 0.1,
ppo_config.gamma = 0.998  # default 0.99
ppo_config.lambda_ = 0.99  # default 1.0???
ppo_config.kl_target = 0.01  # default 0.01
ppo_config.rollout_fragment_length = 128
# ppo_config.train_batch_size = 8500
# ppo_config.train_batch_size = 10000
ppo_config.train_batch_size = 12000
ppo_config.sgd_minibatch_size = 512
# ppo_config.num_sgd_iter = 2  # default 30???
ppo_config.num_sgd_iter = 7  # default 30???
# ppo_config.lr = 3.5e-5  # 5e-5
ppo_config.lr = 9e-5  # 5e-5

ppo_config.model = {
    "vf_share_layers": True,

    "use_lstm": True,
    "max_seq_len": 32,
    "lstm_cell_size": 128,
    "lstm_use_prev_action": True,

    "conv_filters": [
        # 240 X 320
        [16, [5, 5], 3],
        [32, [5, 5], 3],
        [64, [5, 5], 3],
        [128, [3, 3], 2],
        [256, [3, 3], 2],
        [512, [3, 3], 2],
    ],
    "conv_activation": "relu",
    "post_fcnet_hiddens": [512],
    "post_fcnet_activation": "relu"
}
ppo_config.batch_mode = "complete_episodes"
ppo_config.simple_optimizer = True

ppo_config.env = None
ppo_config.observation_space = Box(low=0, high=1, shape=(y, x, 1), dtype=np.float32)
ppo_config.action_space = MultiDiscrete(
    [
        2,  # W
        2,  # A
        2,  # S
        2,  # D
        2,  # Space
        2,  # H
        2,  # J
        2,  # K
        2  # L
    ]
)
ppo_config.env_config = {
    "sleep": True,
    'replayOn': False
}

ppo_config.rollouts(num_rollout_workers=2, enable_connectors=False)
ppo_config.offline_data(input_=_input)

ppo_config.framework_str = 'torch'
ppo_config.log_sys_usage = False
ppo_config.compress_observations = True
ppo_config.shuffle_sequences = False
ppo_config.num_gpus = 0.25
ppo_config.num_gpus_per_worker = 0.25
ppo_config.num_cpus_per_worker = 2
ppo_config.num_learner_workers = 1
ppo_config.num_cpus_per_learner_worker = 2
ppo_config.num_gpus_per_learner_worker = 0.5

tempyy = ppo_config.to_dict()

from ray import tune

name = "" + args.checkpoint
print(f"Starting: {name}")

tune.run("PPO",
         resume='AUTO',
         config=tempyy,
         name=name, keep_checkpoints_num=None, checkpoint_score_attr="episode_reward_mean",
         max_failures=1,
         checkpoint_freq=5, checkpoint_at_end=True)

Image for 3.5:

Hi @Denys_Ashikhin ,

Thanks for asking these questions.

  1. Simply speaking, the Algorithm process is the one that computes gradients and sends them to workers.
  2. There is no loadbalancing. Each worker owns a number of environments and steps through them.
    2.5 Each *_per_worker resources will be available per worker, not shared.
    2.75 If workers have no GPU, the model is kept in RAM. If GPUs are assigned, each worker still needs a minimum of one CPU (they are Ray actors).
    3 Yes.
    3.25 No.
    3.5 The learner workers are needed only for multi-gpu-training.

For multi-gpu training, set number of workers greater than 1 and set
num_gpus_per_learner_worker accordingly (e.g. 4 GPUs total, and model
needs 2 GPUs: num_learner_workers = 2 and
num_gpus_per_learner_worker = 2)

In your case, you should not set num_gpus_per_learner_worker.

  1. An evaluation worker is also simply a rollout worker, only that the collected samples are not used. Only the metrics are collected.
1 Like

Hi @arturn,

Thanks for the response, it really clarifies some things I sort-of pieced together. I have some follow-up questions:

  1. If I have 2 rollout workers, purely CPU, how does assigning more than 1 cpu each help them? As in, what gets parallelized?
  2. If I have 2 rollout workers, and batch_mode: "complete_episodes" does the algorithm process expect each worker to report the same number of episodes? That is, let’s say I have 5 external envs, 2 connect to worker A and 3 connect to worker B, rollout worker B will naturally have more episodes to push → is this an issue?
    I’m just worried it’s doing something synchronously and awaiting for episodes from A then B then A then B (making B get backlogged).
  1. You’d rarely use this. Maybe if you want rolloutworkers to share CPU with num_cpus_per_worker=0.x.
  2. No, that is not the expectation. In PPO, such backlogging does not happen because sampling is synchronous. Samples from an earlier sampling policy are not kept around for another training iteration.

Just to clarify, even thought rollout_worker A finishes 2x as many episodes as rollout_worker B, the main driver does not wait for rollout_worker B.

As in, let’s my train batch is 10000 steps. Each episode is 100 steps (so 100 episodes total).

If worker A has 2x as many external envs, it would finish 2x as fast - meaning that optimally the trainer loop would have 666 worker A episodes and 333 worker B episodes.

And that is the behaviour currently in rrllib?

Samples from an earlier sampling policy are not kept around for another training iteration. → What happens to samples that are taken during a training loop? In my case, 1 training loop takes ~120 seconds. During this time, rollout workers are continuing to generate samples → what happens to those?

1 Like

@Denys_Ashikhin Is there any update on this? I too want to know about

Samples from an earlier sampling policy are not kept around for another training iteration. → What happens to samples that are taken during a training loop? In my case, 1 training loop takes ~120 seconds. During this time, rollout workers are continuing to generate samples → what happens to those?

Sadly I never heard back, however, I have a sneaking suspicion they were stuck in limbo (memory) eating up my ram and causing an OOM (my machine has 96gb of it so it took like a day to crash, but still did).

I found this [RLlib] Issue 30139: PolicyServerInput OOMs if incoming samples cannot be handled in time (e.g. when too many clients are connected). by sven1977 · Pull Request #31400 · ray-project/ray · GitHub which was in response to [RLlib] PolicyServerInput memory leak · Issue #30139 · ray-project/ray · GitHub

After copying the pr manually, it fixed my memory leak. So my guess (no clue if it’s correct) is those memories were just stuck in the queue that was changed in the pr, and since they were from an older policy, just ignored until memory was consumed and crashed.

All of that to say that I THINK: during training, it doesn't consume incoming/new episodes until the training is done. Then after, anything from older policies are just ignored.

@Denys_Ashikhin

Why are your rollout workers continuing to generate samples while the buffer is full and undergoing training? Since PPO can’t use samples from an older policy I would think that the correct response is to stop all rollout workers from sampling and wait for the trainer to perform SGD on the existing samples. Once training is complete, the new policy is distributed to all rollout workers and sampling continues. Is this what you are observing? If the rollout workers are continuing to sample while training is happening then that seems like a bug.

Even in the case where I use policy_clients with remote (so the policy_server has the model and clients just send observation+rewards back) the server still continues to receive actions/observations and provides actions back so the clients keep playing the game to train. I figured those extra actions/observations should just be dropped/ignored but it doesn’t seem like the case here?

So the clients have no idea whether episodes should be terminated early if the server is in the middle of a loop or not.