What is ray.rllib.execution.concurrency_ops?

Hi, I found it is hard to understand the Concurrently in ray.rllib.execution.concurrency_ops — Ray 1.11.0, could you please provide a figure to illustrate how it works?

Hi @GoingMyWay ,

the Concurrency operator can execute a number of LocalIterators in different modes (the most often used one is round_robin with equal weights that then executes the operations in the ops list alternatingly).

Almost every operation in RLlib (and Ray in general) is packed into an iterator. Let us for an example take the DQN agent. In the execution_plan() of this agent there are constructed two main operations, namely the store_op (that collects experiences with the workers and writes them into the replay buffer) and the replay_op (that samples batches from the replay buffer makes some SGD steps, updates the priorities of the replay buffer and updates the target network, if necessary).
These two operations are each defined via a LocalIterator object that is able to iterate over the operations to be done in each step. If you call the LocalIterator of the store_op you will execute a rollout that collects experiences (exactly max(num_workers, 1) x rollout_fragment_length) in the environment and then stores the resulting batch of these experiences in the replay buffer. If you call the LocalIterator of the replay_op it will sample some batches of experiences (of size train_batch_size each) and uses them to make some SGD steps).

What you want to do to train your agent is to repeatedly collect experiences and train on new experience batches and to do this the Concurrently operator is used that makes a union of the LocalIterators in the ops list and calls their next() function in alternating mode.

1 Like

@Lars_Simon_Zehnder Thanks for the quick response and the helpful answer. When using round_robin mode, does it mean at each train iter, the trainer first call store_op and then call train_op? Can I add another op to it?


I tried to add a new op. But it seems that the other ops are not called more frequently during training than the newly added one.

@GoingMyWay ,

you are almost right. The Trainable iterates during each step() once through the store_op and then once or more times through the train_op (even though during a single train_op several batches can be used to make several SGD steps).

When adding operations to the list ops of the Concurrently operator make sure that you return the right operation output. You can control this by output_indexes. For example, the execution_plan of the DQNTrainer returns only the output of the second LocalIterator:

train_op = Concurrently(
            [store_op, replay_op],

If you add another operation and this operation should return a value, you need to specify this in the output_indexes as well.

When using round_robin mode you can also change the round_robin_weights and define some operations to be run more often. For example, if you set [2, 1, *] with three operations in the ops list Concurrently in round_robin mode will iterate the first LocalIterator twice (maybe sampling and storing), the second LocalIterator (maybe training) once and the third operator until it is empty.

The DQNTrainer uses the calculate_rr_weights() function to compute the round_robin_weights. It relies thereby on your configuration: with the training_intensity configuration parameter you have a “knurl” that helps you to adjust the number of training iterations in relation to the number of sampling iterations. But feel free to set the round_robin_weights hard-coded in your own implementations with Ray.

Dear @Lars_Simon_Zehnder, thank you. I am trying to debug and make all the ops called in the right order and equal frequency.