[RLlib] Pytorch multiple optimizers support

  • High: It blocks me to complete my task.

Hi, I’d like to know whether having multiple optimizers in a TorchPolicyV2 is currently supported or not.

By looking at the code at link, we can see that in case of gradients == _directStepOptimizerSingleton, that seems supported;

    @override(Policy)
    @DeveloperAPI
    def apply_gradients(self, gradients: ModelGradients) -> None:
        if gradients == _directStepOptimizerSingleton:
            for i, opt in enumerate(self._optimizers):
                opt.step()
        else:
            # TODO(sven): Not supported for multiple optimizers yet.
            assert len(self._optimizers) == 1
            for g, p in zip(gradients, self.model.parameters()):
                if g is not None:
                    if torch.is_tensor(g):
                        p.grad = g.to(self.device)
                    else:
                        p.grad = torch.from_numpy(g).to(self.device)

            self._optimizers[0].step()

On the other hand, multiple optimizers do optimize the model’s parameters depending on the specific loss defined for that optimizer, as defined in the example and asserted at assert len(loss_out) == len(self._optimizers) in the following code.

TorchPolicyV2._multi_gpu_parallel_grad_calc._worker

def _worker(shard_idx, model, sample_batch, device):
            torch.set_grad_enabled(grad_enabled)
            try:
                with NullContextManager() if device.type == "cpu" else torch.cuda.device(  # noqa: E501
                    device
                ):
                    loss_out = force_list(
                        self.loss(model, self.dist_class, sample_batch)
                    )

                    # Call Model's custom-loss with Policy loss outputs and
                    # train_batch.
                    loss_out = model.custom_loss(loss_out, sample_batch)

                    assert len(loss_out) == len(self._optimizers)

                    # Loop through all optimizers.
                    grad_info = {"allreduce_latency": 0.0}

                    parameters = list(model.parameters())
                    all_grads = [None for _ in range(len(parameters))]
                    for opt_idx, opt in enumerate(self._optimizers):
                      # Erase gradients in all vars of the tower that this
                      # optimizer would affect.
                      param_indices = self.multi_gpu_param_groups[opt_idx]
                      for param_idx, param in enumerate(parameters):
                          if param_idx in param_indices and param.grad is not None:
                              param.grad.data.zero_()
                      # Recompute gradients of loss over all variables.
                      loss_out[opt_idx].backward(retain_graph=True)
                      grad_info.update(
                          self.extra_grad_process(opt, loss_out[opt_idx])
                      )

                        grads = []
                        # Note that return values are just references;
                        # Calling zero_grad would modify the values.
                        for param_idx, param in enumerate(parameters):
                            if param_idx in param_indices:
                                if param.grad is not None:
                                    grads.append(param.grad)
                                all_grads[param_idx] = param.grad

                        if self.distributed_world_size:
                            start = time.time()
                            if torch.cuda.is_available():
                                # Sadly, allreduce_coalesced does not work with
                                # CUDA yet.
                                for g in grads:
                                    torch.distributed.all_reduce(
                                        g, op=torch.distributed.ReduceOp.SUM
                                    )
                            else:
                                torch.distributed.all_reduce_coalesced(
                                    grads, op=torch.distributed.ReduceOp.SUM
                                )

                            for param_group in opt.param_groups:
                                for p in param_group["params"]:
                                    if p.grad is not None:
                                        p.grad /= self.distributed_world_size

                            grad_info["allreduce_latency"] += time.time() - start

                with lock:
                    results[shard_idx] = (all_grads, grad_info)
            except Exception as e:
                import traceback

                with lock:
                    results[shard_idx] = (
                        ValueError(
                            e.args[0]
                            + "\n traceback"
                            + traceback.format_exc()
                            + "\n"
                            + "In tower {} on device {}".format(shard_idx, device)
                        ),
                        e,
                    )

However, doing

for i, opt in enumerate(self._optimizers):
    opt.step()

corresponds to run all the optimizers on the same .grad values stored inside the model parameters. Is this expected behavior?

Moreover, by analysing the _worker definition above, it emerges that each optimizer erases the gradients of the previous ones, and then backpropagates the gradients relative to its loss.

for opt_idx, opt in enumerate(self._optimizers):
                        # Erase gradients in all vars of the tower that this
                        # optimizer would affect.
                        param_indices = self.multi_gpu_param_groups[opt_idx]
                        for param_idx, param in enumerate(parameters):
                            if param_idx in param_indices and param.grad is not None:
                                param.grad.data.zero_()
                        # Recompute gradients of loss over all variables.
                        loss_out[opt_idx].backward(retain_graph=True)
                        grad_info.update(
                            self.extra_grad_process(opt, loss_out[opt_idx])
                        )

This would correspond to all opt.step() optimizing using the gradients computed by the last optimizer, which I suppose is not correct behavior.

Overall, I’d say that the current implementation is bugged in case of multiple optimizers, can you confirm this issue? Thanks in advance