How does multi-CPU work within Ray?

Hello,

I am training a PPO agent on a 16 CPUs cluster on Azure with a custom environment.
If I understand well, there is a balance to be find between the number of CPUs assigned to the workers for the sampling task (num_worker*num_cpu_per_worker) and the number of CPU assigned to the learning task (num_cpu_for_driver).

However, with num_cpu_for_driver=1 I get a mean time per iteration of 5.5s and increasing the number of CPUs to 13 only passes the mean time per iteration to approximately 2s.

So, how does multi-CPU work in Ray?
Are the minibatches back-propagated within only one CPU or are they dispatched accross the num_cpu_for_drivers CPUs?
Is it weird that the learning time doesn’t improve that much when increasing the number of CPU by 13?

Thanks,
Abderrahim

cc @kai Can you address the question?

Actually let me cc @sven1977 for questions around RLLib algorithms.

Generally I think the driver can leverage multiple threads only to a certain degree, and it might also be limited by the incoming number of worker batches. But I’m not familiar enough with the PPO implementation to know if this might be the case or if we always should have enough data to train on.

It is not shocking that at certain point, there is not enough work to split (and there is a fixed cost of managing multiple workers). Thus, it is not because you multiply the number of CPUs by 13 that your running time will divide by 13. However, I’m quite curious to understand about how parallelization works in a multi-CPU environment.

The settings @Abderraim is citing looks like

tune.run(
    config={
        #...
        # Parallel Training CPU
        'num_cpus_for_driver': 13,
         'tf_session_args': {
            'intra_op_parallelism_threads': 0,
            'inter_op_parallelism_threads': 0,
                'device_count': {
                 'CPU': 13,
             }
          },
         'local_tf_session_args': {
            'intra_op_parallelism_threads': 0,
            'inter_op_parallelism_threads': 0,
        },
        #...
}

Thus, as we specify the number of cpus available in the tensorflow session, I understand that the parallelization happens at tensorflow level. Can anyone confirm that?

But I do not understand is how sgd is performed in this case. The minibatches seems entirely managed by RLlib


# sgd.py

def do_minibatch_sgd(samples, policies, local_worker, num_sgd_iter,
                     sgd_minibatch_size, standardize_fields):
    """Execute minibatch SGD.
    """
    #...
    for policy_id in policies.keys():
       #...
        for i in range(num_sgd_iter):
            iter_extra_fetches = defaultdict(list)
            for minibatch in minibatches(batch, sgd_minibatch_size):
                batch_fetches = (local_worker.learn_on_batch(
                    MultiAgentBatch({
                        policy_id: minibatch
                    }, minibatch.count)))[policy_id]
                # …
    return fetches

Thus, I understand that whatever parallelization is being done, it is done at minibatch level. Is that right?

Lastly, as Abderrahim asks, it puzzles me how the forward/backward propagation are actually performed. I don’t see anywhere a declaration of a tensorflow distribution strategy. Can we pilot it in RLlib?