My RLlib implementation seems to compute random actions

I have been trying to use RLlib, and came up to a system that “works” but when computing action gives me results that seems totally random.
I’ll try to describe the problem i am implementing in RLlib:
3 servers running are recieving X requests per step, 2 parameters exists:
max requests that a server can satisfy in that step, number of requests incoming in that step (those are my obs)
agent answers with an action:
action is the number of requests the server tries to forward to the other twos.
short example
agent 1 = 30 incoming req, 20 max requests
agent 2 = 10 incoming req, 30 max requests
agent 3 = 100 incoming req, 50 max requets

if agent1 chooses action=10 he is communicating that he wants to forward 10 requests, which will be splitted between the other two agents:
agent 2 and agent 3 are getting 5 requests each.
Note that all non satisfied requests in a step are lost, so if agent1 tries to forward more requests than the others availability those remains non satisfied.
My reward equals to how many request an agent cannot satisfy:
Lets suppose agent 1 tries to forward 20 request but the others cannot satisfy it, he gets a negative reward -20. if agent 2 do not tries to forward, gets zero rewards.
My goal is that at each step the non-satisfied requests gets minimized (so reward gets maximized)

So, my questions are:
does the reward system makes sense? its my first RL problem so i am not sure if this is the best way to design a reward.

Main doubt:
What is the “structure” of my system? I am talking about managing horizion, episodes, and “done”.
My env should be continous, i guess ( i will take obs from a csv, i do not want to reset the obs at every step)
considering that my goal is to maximize the reward, how should i consider the episode setting? just a single long episode? or each action<->obs interaction has to be a single episode?

So, i am attaching a solution that i made, but action computing seems non-sense:
I have “fixed” obs for testing my env ( I am using fixed/repeated observation to check if my system is learning base cases, is that a good idea?) but still, even if the obs coming from the env are the same, the actions computed are always different and non relevant. Is that normal? Shouldn’t I expect same action for the same obs?

I am providing my code below, ( I am sorry, it’s a mess :sweat_smile: , but it’s WIP)
Hope anyone could help me! (and sorry for the wall of text)
Thanks in advance!


import argparse
import numpy as np
from gym.spaces import Discrete, Tuple, Box, Dict

import pandas as pd

import ray
from ray import tune
from ray.rllib.agents.maml.maml_torch_policy import KLCoeffMixin as \
    TorchKLCoeffMixin
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy, KLCoeffMixin, \
    ppo_surrogate_loss as tf_loss
from ray.rllib.evaluation.postprocessing import compute_advantages, \
    Postprocessing
from ray.rllib.models import ModelCatalog
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy import LearningRateSchedule, \
    EntropyCoeffSchedule
from ray.rllib.policy.torch_policy import LearningRateSchedule as TorchLR, \
    EntropyCoeffSchedule as TorchEntropyCoeffSchedule
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.rllib.utils.tf_ops import explained_variance, make_tf_callable
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.models.modelv2 import ModelV2

tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()

OPPONENT_OBS = "opponent_obs"
OPPONENT_ACTION = "opponent_action"

AGENT_OBS_SPACE = Dict({
    "action_mask": Box(0, 1.0, (100,)),
    "observation": Tuple([Discrete(100),Discrete(100)])
})

parser = argparse.ArgumentParser()
parser.add_argument(
    "--framework",
    choices=["tf", "tf2", "tfe", "torch"],
    default="tf",
    help="The DL framework specifier.")
parser.add_argument(
    "--as-test",
    action="store_true",
    help="Whether this script should be run as a test: --stop-reward must "
    "be achieved within --stop-timesteps AND --stop-iters.")
parser.add_argument(
    "--stop-iters",
    type=int,
    default=200,
    help="Number of iterations to train.")
parser.add_argument(
    "--stop-timesteps",
    type=int,
    default=100000,
    help="Number of timesteps to train.")
parser.add_argument(
    "--stop-reward",
    type=float,
    default=7.99,
    help="Reward at which we stop training.")

from typing import Dict as typingDict
from ray.rllib.utils.typing import TensorType as typingTensorType

from ray.rllib.utils.typing import TensorType, List, ModelConfigDict
from ray.rllib.models.modelv2 import restore_original_dimensions



class modFullyConnectedNetwork(FullyConnectedNetwork):
    def forward(self, input_dict: typingDict[str, TensorType], state: List[TensorType], seq_lens: TensorType) -> (TensorType, List[TensorType]):

        original_obs = restore_original_dimensions(input_dict["obs"], self.obs_space, "[tf|torch]")
        model_out, self._value_out = self.base_model(original_obs["observation"][0])
        inf_mask = tf.maximum(tf.math.log(original_obs["action_mask"]), tf.float32.min)
        masked_logits = model_out + inf_mask

        return masked_logits, state

class DFaaSCriticModel(TFModelV2):
    """Multi-agent model that implements a centralized value function."""

    def __init__(self, obs_space, action_space, num_outputs, model_config, name):
        super(DFaaSCriticModel, self).__init__(obs_space, action_space, num_outputs, model_config, name)



        # Base of the model
        self.model = modFullyConnectedNetwork(Box(-1.0, 1.0, (100,)), action_space, num_outputs, model_config, name)
        
                # Central VF maps (obs, opp_obs, opp_act) -> vf_pred
        obs = tf.keras.layers.Input(shape=(300, ), name="obs")
        opp_obs1 = tf.keras.layers.Input(shape=(300, ), name="opp_obs1")
        opp_act1 = tf.keras.layers.Input(shape=(300, ), name="opp_act1")
        opp_obs2 = tf.keras.layers.Input(shape=(300, ), name="opp_obs2")
        opp_act2 = tf.keras.layers.Input(shape=(300, ), name="opp_act2")
        concat_obs = tf.keras.layers.Concatenate(axis=1)(
            [obs, opp_obs1, opp_act1, opp_obs2, opp_act2])
        central_vf_dense = tf.keras.layers.Dense(
            1200, activation=tf.nn.tanh, name="c_vf_dense")(concat_obs)
        central_vf_out = tf.keras.layers.Dense(
            1, activation=None, name="c_vf_out")(central_vf_dense)
        self.central_vf = tf.keras.Model(
            inputs=[obs, opp_obs1, opp_act1, opp_obs2, opp_act2], outputs=central_vf_out)

    @override(ModelV2)
    def forward(self, input_dict, state, seq_lens):
        return self.model.forward(input_dict, state, seq_lens)


    def central_value_function(self, obs, opponent_obs1, opponent_actions1, opponent_obs2, opponent_actions2):
        return tf.reshape(
            self.central_vf([
                obs, opponent_obs1,
                tf.one_hot(tf.cast(opponent_actions1, tf.int32), 300),opponent_obs2,
                tf.one_hot(tf.cast(opponent_actions2, tf.int32), 300)
            ]), [-1])

    @override(ModelV2)
    def value_function(self):
        return self.model.value_function()  # not used




class CentralizedValueMixin:
    """Add method to evaluate the central value function from the model."""

    def __init__(self):
        if self.config["framework"] != "torch":
            self.compute_central_vf = make_tf_callable(self.get_session())(
                self.model.central_value_function)
        else:
            self.compute_central_vf = self.model.central_value_function


OPPONENT_OBS1 = "opponent_obs1"
OPPONENT_OBS2 = "opponent_obs1"
OPPONENT_ACTION1 = "opponent_action1"
OPPONENT_ACTION2 = "opponent_action2"


# Grabs the opponent obs/act and includes it in the experience train_batch,
# and computes GAE using the central vf predictions.
def centralized_critic_postprocessing(policy,
                                      sample_batch,
                                      other_agent_batches=None,
                                      episode=None):

    if policy.loss_initialized():
        assert other_agent_batches is not None

        opponent_batches = []

        for opponent_n_batch in other_agent_batches.values():
            opponent_batches.append(opponent_n_batch)


        #record the opponent obs and actions in the trajectory
        sample_batch[OPPONENT_OBS1] = opponent_batches[0][1][SampleBatch.CUR_OBS]
        sample_batch[OPPONENT_ACTION1] = opponent_batches[0][1][SampleBatch.ACTIONS]
        sample_batch[OPPONENT_OBS2] = opponent_batches[1][1][SampleBatch.CUR_OBS]
        sample_batch[OPPONENT_ACTION2] = opponent_batches[1][1][SampleBatch.ACTIONS]

        sample_batch[SampleBatch.VF_PREDS] = policy.compute_central_vf(
            sample_batch[SampleBatch.CUR_OBS], sample_batch[OPPONENT_OBS1],
            sample_batch[OPPONENT_ACTION1], sample_batch[OPPONENT_OBS2],
            sample_batch[OPPONENT_ACTION2])

 
    else:
        # Policy hasn't been initialized yet, use zeros.
        sample_batch[OPPONENT_OBS1] = np.zeros_like(
            sample_batch[SampleBatch.CUR_OBS])
        sample_batch[OPPONENT_ACTION1] = np.zeros_like(
            sample_batch[SampleBatch.ACTIONS])
        sample_batch[OPPONENT_OBS2] = np.zeros_like(
            sample_batch[SampleBatch.CUR_OBS])
        sample_batch[OPPONENT_ACTION2] = np.zeros_like(
            sample_batch[SampleBatch.ACTIONS])    
        sample_batch[SampleBatch.VF_PREDS] = np.zeros_like(
            sample_batch[SampleBatch.REWARDS], dtype=np.float32)

    completed = sample_batch["dones"][-1]
    if completed:
        last_r = 0.0
    else:
        last_r = sample_batch[SampleBatch.VF_PREDS][-1]

    train_batch = compute_advantages(
        sample_batch,
        last_r,
        policy.config["gamma"],
        policy.config["lambda"],
        use_gae=policy.config["use_gae"])



    return train_batch


def loss_with_central_critic(policy, model, dist_class, train_batch):
    CentralizedValueMixin.__init__(policy)
    func = tf_loss

    vf_saved = model.value_function
    model.value_function = lambda: policy.model.central_value_function(
        train_batch[SampleBatch.CUR_OBS], train_batch[OPPONENT_OBS1],
        train_batch[OPPONENT_ACTION1], train_batch[OPPONENT_OBS2],
        train_batch[OPPONENT_ACTION2])

    policy._central_value_out = model.value_function()
    loss = func(policy, model, dist_class, train_batch)

    model.value_function = vf_saved

    return loss



def setup_tf_mixins(policy, obs_space, action_space, config):
    # Copied from PPOTFPolicy (w/o ValueNetworkMixin).
    KLCoeffMixin.__init__(policy, config)
    EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
                                  config["entropy_coeff_schedule"])
    LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])


def setup_torch_mixins(policy, obs_space, action_space, config):
    # Copied from PPOTorchPolicy  (w/o ValueNetworkMixin).
    TorchKLCoeffMixin.__init__(policy, config)
    TorchEntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
                                       config["entropy_coeff_schedule"])
    TorchLR.__init__(policy, config["lr"], config["lr_schedule"])


def central_vf_stats(policy, train_batch, grads):
    # Report the explained variance of the central value function.
    return {
        "vf_explained_var": explained_variance(
            train_batch[Postprocessing.VALUE_TARGETS],
            policy._central_value_out)
    }


CCPPOTFPolicy = PPOTFPolicy.with_updates(
    name="CCPPOTFPolicy",
    postprocess_fn=centralized_critic_postprocessing,
    loss_fn=loss_with_central_critic,
    before_loss_init=setup_tf_mixins,
    grad_stats_fn=central_vf_stats,
    mixins=[
        LearningRateSchedule, EntropyCoeffSchedule, KLCoeffMixin,
        CentralizedValueMixin
    ])



def get_policy_class(config):
    if config["framework"] == "tf":
        return CCPPOTFPolicy

CCTrainer = PPOTrainer.with_updates(
    name="CCPPOTrainer",
    default_policy=CCPPOTFPolicy,
    get_policy_class=get_policy_class,
)



from gym.spaces import MultiDiscrete, Dict, Discrete
import numpy as np

from ray.rllib.env.multi_agent_env import MultiAgentEnv, ENV_STATE


def mask(x):
    y = np.zeros(100)
    y[:x] = 1
    return y


class MyTwoStepGame(MultiAgentEnv):
    action_space = Discrete(100)
    observation_space =AGENT_OBS_SPACE

    def __init__(self, env_config):

        self.state = None
        self.agent_1 = 0
        self.agent_2 = 1
        self.agent_3 = 2

        self.count = 0

        #mock lists, will be substituted with .csv in the future
        self.invoc_1 = [20] * 100000
        self.invoc_2 = [50] * 100000
        self.invoc_3 = [60] * 100000
        self.max_1 = [30] * 100000
        self.max_2 = [10] * 100000
        self.max_3 = [40] * 100000

        self.max = [self.max_1, self.max_2, self.max_3]
        
        # MADDPG emits action logits instead of actual discrete actions
        self.actions_are_logits = env_config.get("actions_are_logits", False)
        self.one_hot_state_encoding = env_config.get("one_hot_state_encoding",
                                                     False)
        self.with_state = env_config.get("separate_state_space", False)
        self.observation_space =AGENT_OBS_SPACE

        self.with_state = False



    def reset(self):
        self.count=0
        self.state = np.array([1, 0, 0])
        return self._obs()



    def step(self, action_dict):

        indexes = [0,1,2]
        capacita = [max(0,int(self.max_1[self.count]-(self.invoc_1[self.count]-action_dict[0]))),
                    max(0,int(self.max_2[self.count]-(self.invoc_2[self.count]-action_dict[1]))),
                    max(0,int(self.max_3[self.count]-(self.invoc_3[self.count]-action_dict[2])))]
        persi = [0,0,0]
        gestiti = [0,0,0]

        index = 0

        for action in action_dict.values():

            split_action = int(action/2)
            indexes_copy = indexes.copy()
            restanti = 0
            primo_giro = True

            for indice in indexes_copy:
                
                gestiti[index] += min(split_action,capacita[indice])
                restanti = max(0,split_action-capacita[indice])
                capacita[indice] -= gestiti[index]
                if primo_giro:
                    split_action+= restanti
                    primo_giro=False   
                else:
                    persi[index] -= restanti -gestiti[index]


            index+=1

        persi[0] = gestiti[0] - self.invoc_1[self.count]  
        persi[1] = gestiti[1] - self.invoc_2[self.count]
        persi[2] = gestiti[2] - self.invoc_3[self.count]


        if self.count<1000:
            done = False
        else:
            done = True

        self.count = self.count +1


        rewards = {
            self.agent_1: persi[0],
            self.agent_2: persi[1],
            self.agent_3: persi[2]

        }
        
        def capacita_limiter(num):
            if num >= 100: return 99
            if num <= 0: return 0    
            else: return num


        obs = {
                self.agent_1: {
                "action_mask": mask(self.invoc_1[0]-1),
                "observation": [int(capacita_limiter(self.invoc_1[0])),capacita_limiter(int(self.max_1[0]))]
                },
                self.agent_2: {
                "action_mask": mask(self.invoc_2[0]-1),
                "observation":[int(capacita_limiter(self.invoc_2[0])),capacita_limiter(int(self.max_2[0]))]
                }, 
                self.agent_3: {
                "action_mask": mask(self.invoc_3[0]-1),
                "observation": [int(capacita_limiter(self.invoc_3[0])),capacita_limiter(int(self.max_3[0]))]
                } 
            }
        dones = {"__all__": done}
        infos = {}

        return obs, rewards, dones, infos




    def _obs(self):
        return {
            self.agent_1: {
            "action_mask": mask(1),
            "observation": [0,0]
                },
            self.agent_2:{
                "action_mask": mask(1),
                "observation": [0,0]
            },
            self.agent_3: {
                "action_mask": mask(1),
                "observation": [0,0]
            }
        }



if __name__ == "__main__":
    ray.init(num_cpus=12, num_gpus=0)
    args = parser.parse_args()

    ModelCatalog.register_custom_model(
        "cc_model", DFaaSCriticModel)

    config = {
        "env": MyTwoStepGame,
        "batch_mode": "complete_episodes",
        # Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
        "num_gpus": 0,
        "num_workers": 0,
        "soft_horizon": True,
        "no_done_at_end": False,
        "multiagent": {
            "policies": {
                "shared_policy": (None, MyTwoStepGame.observation_space, Discrete(100), {}),
            },
            "policy_mapping_fn": lambda aid, **kwargs: "shared_policy",
        },
        "model": {
            "custom_model": "cc_model",
        },
        "framework": args.framework,
    }

    stop = {
        "training_iteration": args.stop_iters,
        "timesteps_total": args.stop_timesteps,
        "episode_reward_mean": args.stop_reward,
    }

    results = tune.run(CCTrainer, config=config, stop=stop, verbose=1,checkpoint_at_end=True)

    if args.as_test:
        check_learning_achieved(results, args.stop_reward)

This is a simple load balancing problem.

I don’t think you need RL or RLlib for it.
Algorithms like random 2 are extremely fast and pretty much optimal:

What I wrote is a truncated version of my problem, i am starting from there to make it more complex then. I am aware that, like that, reinforcement learning is overkill. But still, shouldn’t I be able to solve this with RL/RLLib?

understand. at a high level, I feel like the problem you described doesn’t seem to be an episodic problem. I.e., you can decide the optimal action based solely on the information available at each time step. That’s why it may feel awkward to construct an episode for this problem. There is not really an accumulated reward to be optimized.

negative reward is fine, but RL in general does not handle unbounded input and action spaces well. For example, it will have hard time understanding and differentiating arbitrarily large input values, and making sensible decisions on equally large action values.

in summary, RL may be able to solve this problem, but I don’t know if it is the right tool.

one thing you can try as a first step: you can generate a bunch of optimal actions using existing load balancing algorithm, and try to train a policy using supervised learning.
I wouldn’t consider RL if you can’t even replicate an existing load balancing policy using supervised learning methods.

My obs space is actually bounded, i am using a Discrete(100).
Strange thing is, even if RL might not be best tool for the job, i am trying with very simple examples (for example, one agent overloaded and two agents empty). I expected that on those exceptional/borderline cases at least my agents would train, but i am getting nothing.
That’s why I think that my setup has wrong config about horizon, soft horizon, done at end, iterations etc.
How would you manage those configs on my case?