I have been trying to use RLlib, and came up to a system that “works” but when computing action gives me results that seems totally random.
I’ll try to describe the problem i am implementing in RLlib:
3 servers running are recieving X requests per step, 2 parameters exists:
max requests that a server can satisfy in that step, number of requests incoming in that step (those are my obs)
agent answers with an action:
action is the number of requests the server tries to forward to the other twos.
short example
agent 1 = 30 incoming req, 20 max requests
agent 2 = 10 incoming req, 30 max requests
agent 3 = 100 incoming req, 50 max requets
if agent1 chooses action=10 he is communicating that he wants to forward 10 requests, which will be splitted between the other two agents:
agent 2 and agent 3 are getting 5 requests each.
Note that all non satisfied requests in a step are lost, so if agent1 tries to forward more requests than the others availability those remains non satisfied.
My reward equals to how many request an agent cannot satisfy:
Lets suppose agent 1 tries to forward 20 request but the others cannot satisfy it, he gets a negative reward -20. if agent 2 do not tries to forward, gets zero rewards.
My goal is that at each step the non-satisfied requests gets minimized (so reward gets maximized)
So, my questions are:
does the reward system makes sense? its my first RL problem so i am not sure if this is the best way to design a reward.
Main doubt:
What is the “structure” of my system? I am talking about managing horizion, episodes, and “done”.
My env should be continous, i guess ( i will take obs from a csv, i do not want to reset the obs at every step)
considering that my goal is to maximize the reward, how should i consider the episode setting? just a single long episode? or each action<->obs interaction has to be a single episode?
So, i am attaching a solution that i made, but action computing seems non-sense:
I have “fixed” obs for testing my env ( I am using fixed/repeated observation to check if my system is learning base cases, is that a good idea?) but still, even if the obs coming from the env are the same, the actions computed are always different and non relevant. Is that normal? Shouldn’t I expect same action for the same obs?
I am providing my code below, ( I am sorry, it’s a mess , but it’s WIP)
Hope anyone could help me! (and sorry for the wall of text)
Thanks in advance!
import argparse
import numpy as np
from gym.spaces import Discrete, Tuple, Box, Dict
import pandas as pd
import ray
from ray import tune
from ray.rllib.agents.maml.maml_torch_policy import KLCoeffMixin as \
TorchKLCoeffMixin
from ray.rllib.agents.ppo.ppo import PPOTrainer
from ray.rllib.agents.ppo.ppo_tf_policy import PPOTFPolicy, KLCoeffMixin, \
ppo_surrogate_loss as tf_loss
from ray.rllib.evaluation.postprocessing import compute_advantages, \
Postprocessing
from ray.rllib.models import ModelCatalog
from ray.rllib.policy.sample_batch import SampleBatch
from ray.rllib.policy.tf_policy import LearningRateSchedule, \
EntropyCoeffSchedule
from ray.rllib.policy.torch_policy import LearningRateSchedule as TorchLR, \
EntropyCoeffSchedule as TorchEntropyCoeffSchedule
from ray.rllib.utils.annotations import override
from ray.rllib.utils.framework import try_import_tf, try_import_torch
from ray.rllib.utils.test_utils import check_learning_achieved
from ray.rllib.utils.tf_ops import explained_variance, make_tf_callable
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.models.tf.fcnet import FullyConnectedNetwork
from ray.rllib.models.modelv2 import ModelV2
tf1, tf, tfv = try_import_tf()
torch, nn = try_import_torch()
OPPONENT_OBS = "opponent_obs"
OPPONENT_ACTION = "opponent_action"
AGENT_OBS_SPACE = Dict({
"action_mask": Box(0, 1.0, (100,)),
"observation": Tuple([Discrete(100),Discrete(100)])
})
parser = argparse.ArgumentParser()
parser.add_argument(
"--framework",
choices=["tf", "tf2", "tfe", "torch"],
default="tf",
help="The DL framework specifier.")
parser.add_argument(
"--as-test",
action="store_true",
help="Whether this script should be run as a test: --stop-reward must "
"be achieved within --stop-timesteps AND --stop-iters.")
parser.add_argument(
"--stop-iters",
type=int,
default=200,
help="Number of iterations to train.")
parser.add_argument(
"--stop-timesteps",
type=int,
default=100000,
help="Number of timesteps to train.")
parser.add_argument(
"--stop-reward",
type=float,
default=7.99,
help="Reward at which we stop training.")
from typing import Dict as typingDict
from ray.rllib.utils.typing import TensorType as typingTensorType
from ray.rllib.utils.typing import TensorType, List, ModelConfigDict
from ray.rllib.models.modelv2 import restore_original_dimensions
class modFullyConnectedNetwork(FullyConnectedNetwork):
def forward(self, input_dict: typingDict[str, TensorType], state: List[TensorType], seq_lens: TensorType) -> (TensorType, List[TensorType]):
original_obs = restore_original_dimensions(input_dict["obs"], self.obs_space, "[tf|torch]")
model_out, self._value_out = self.base_model(original_obs["observation"][0])
inf_mask = tf.maximum(tf.math.log(original_obs["action_mask"]), tf.float32.min)
masked_logits = model_out + inf_mask
return masked_logits, state
class DFaaSCriticModel(TFModelV2):
"""Multi-agent model that implements a centralized value function."""
def __init__(self, obs_space, action_space, num_outputs, model_config, name):
super(DFaaSCriticModel, self).__init__(obs_space, action_space, num_outputs, model_config, name)
# Base of the model
self.model = modFullyConnectedNetwork(Box(-1.0, 1.0, (100,)), action_space, num_outputs, model_config, name)
# Central VF maps (obs, opp_obs, opp_act) -> vf_pred
obs = tf.keras.layers.Input(shape=(300, ), name="obs")
opp_obs1 = tf.keras.layers.Input(shape=(300, ), name="opp_obs1")
opp_act1 = tf.keras.layers.Input(shape=(300, ), name="opp_act1")
opp_obs2 = tf.keras.layers.Input(shape=(300, ), name="opp_obs2")
opp_act2 = tf.keras.layers.Input(shape=(300, ), name="opp_act2")
concat_obs = tf.keras.layers.Concatenate(axis=1)(
[obs, opp_obs1, opp_act1, opp_obs2, opp_act2])
central_vf_dense = tf.keras.layers.Dense(
1200, activation=tf.nn.tanh, name="c_vf_dense")(concat_obs)
central_vf_out = tf.keras.layers.Dense(
1, activation=None, name="c_vf_out")(central_vf_dense)
self.central_vf = tf.keras.Model(
inputs=[obs, opp_obs1, opp_act1, opp_obs2, opp_act2], outputs=central_vf_out)
@override(ModelV2)
def forward(self, input_dict, state, seq_lens):
return self.model.forward(input_dict, state, seq_lens)
def central_value_function(self, obs, opponent_obs1, opponent_actions1, opponent_obs2, opponent_actions2):
return tf.reshape(
self.central_vf([
obs, opponent_obs1,
tf.one_hot(tf.cast(opponent_actions1, tf.int32), 300),opponent_obs2,
tf.one_hot(tf.cast(opponent_actions2, tf.int32), 300)
]), [-1])
@override(ModelV2)
def value_function(self):
return self.model.value_function() # not used
class CentralizedValueMixin:
"""Add method to evaluate the central value function from the model."""
def __init__(self):
if self.config["framework"] != "torch":
self.compute_central_vf = make_tf_callable(self.get_session())(
self.model.central_value_function)
else:
self.compute_central_vf = self.model.central_value_function
OPPONENT_OBS1 = "opponent_obs1"
OPPONENT_OBS2 = "opponent_obs1"
OPPONENT_ACTION1 = "opponent_action1"
OPPONENT_ACTION2 = "opponent_action2"
# Grabs the opponent obs/act and includes it in the experience train_batch,
# and computes GAE using the central vf predictions.
def centralized_critic_postprocessing(policy,
sample_batch,
other_agent_batches=None,
episode=None):
if policy.loss_initialized():
assert other_agent_batches is not None
opponent_batches = []
for opponent_n_batch in other_agent_batches.values():
opponent_batches.append(opponent_n_batch)
#record the opponent obs and actions in the trajectory
sample_batch[OPPONENT_OBS1] = opponent_batches[0][1][SampleBatch.CUR_OBS]
sample_batch[OPPONENT_ACTION1] = opponent_batches[0][1][SampleBatch.ACTIONS]
sample_batch[OPPONENT_OBS2] = opponent_batches[1][1][SampleBatch.CUR_OBS]
sample_batch[OPPONENT_ACTION2] = opponent_batches[1][1][SampleBatch.ACTIONS]
sample_batch[SampleBatch.VF_PREDS] = policy.compute_central_vf(
sample_batch[SampleBatch.CUR_OBS], sample_batch[OPPONENT_OBS1],
sample_batch[OPPONENT_ACTION1], sample_batch[OPPONENT_OBS2],
sample_batch[OPPONENT_ACTION2])
else:
# Policy hasn't been initialized yet, use zeros.
sample_batch[OPPONENT_OBS1] = np.zeros_like(
sample_batch[SampleBatch.CUR_OBS])
sample_batch[OPPONENT_ACTION1] = np.zeros_like(
sample_batch[SampleBatch.ACTIONS])
sample_batch[OPPONENT_OBS2] = np.zeros_like(
sample_batch[SampleBatch.CUR_OBS])
sample_batch[OPPONENT_ACTION2] = np.zeros_like(
sample_batch[SampleBatch.ACTIONS])
sample_batch[SampleBatch.VF_PREDS] = np.zeros_like(
sample_batch[SampleBatch.REWARDS], dtype=np.float32)
completed = sample_batch["dones"][-1]
if completed:
last_r = 0.0
else:
last_r = sample_batch[SampleBatch.VF_PREDS][-1]
train_batch = compute_advantages(
sample_batch,
last_r,
policy.config["gamma"],
policy.config["lambda"],
use_gae=policy.config["use_gae"])
return train_batch
def loss_with_central_critic(policy, model, dist_class, train_batch):
CentralizedValueMixin.__init__(policy)
func = tf_loss
vf_saved = model.value_function
model.value_function = lambda: policy.model.central_value_function(
train_batch[SampleBatch.CUR_OBS], train_batch[OPPONENT_OBS1],
train_batch[OPPONENT_ACTION1], train_batch[OPPONENT_OBS2],
train_batch[OPPONENT_ACTION2])
policy._central_value_out = model.value_function()
loss = func(policy, model, dist_class, train_batch)
model.value_function = vf_saved
return loss
def setup_tf_mixins(policy, obs_space, action_space, config):
# Copied from PPOTFPolicy (w/o ValueNetworkMixin).
KLCoeffMixin.__init__(policy, config)
EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
config["entropy_coeff_schedule"])
LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"])
def setup_torch_mixins(policy, obs_space, action_space, config):
# Copied from PPOTorchPolicy (w/o ValueNetworkMixin).
TorchKLCoeffMixin.__init__(policy, config)
TorchEntropyCoeffSchedule.__init__(policy, config["entropy_coeff"],
config["entropy_coeff_schedule"])
TorchLR.__init__(policy, config["lr"], config["lr_schedule"])
def central_vf_stats(policy, train_batch, grads):
# Report the explained variance of the central value function.
return {
"vf_explained_var": explained_variance(
train_batch[Postprocessing.VALUE_TARGETS],
policy._central_value_out)
}
CCPPOTFPolicy = PPOTFPolicy.with_updates(
name="CCPPOTFPolicy",
postprocess_fn=centralized_critic_postprocessing,
loss_fn=loss_with_central_critic,
before_loss_init=setup_tf_mixins,
grad_stats_fn=central_vf_stats,
mixins=[
LearningRateSchedule, EntropyCoeffSchedule, KLCoeffMixin,
CentralizedValueMixin
])
def get_policy_class(config):
if config["framework"] == "tf":
return CCPPOTFPolicy
CCTrainer = PPOTrainer.with_updates(
name="CCPPOTrainer",
default_policy=CCPPOTFPolicy,
get_policy_class=get_policy_class,
)
from gym.spaces import MultiDiscrete, Dict, Discrete
import numpy as np
from ray.rllib.env.multi_agent_env import MultiAgentEnv, ENV_STATE
def mask(x):
y = np.zeros(100)
y[:x] = 1
return y
class MyTwoStepGame(MultiAgentEnv):
action_space = Discrete(100)
observation_space =AGENT_OBS_SPACE
def __init__(self, env_config):
self.state = None
self.agent_1 = 0
self.agent_2 = 1
self.agent_3 = 2
self.count = 0
#mock lists, will be substituted with .csv in the future
self.invoc_1 = [20] * 100000
self.invoc_2 = [50] * 100000
self.invoc_3 = [60] * 100000
self.max_1 = [30] * 100000
self.max_2 = [10] * 100000
self.max_3 = [40] * 100000
self.max = [self.max_1, self.max_2, self.max_3]
# MADDPG emits action logits instead of actual discrete actions
self.actions_are_logits = env_config.get("actions_are_logits", False)
self.one_hot_state_encoding = env_config.get("one_hot_state_encoding",
False)
self.with_state = env_config.get("separate_state_space", False)
self.observation_space =AGENT_OBS_SPACE
self.with_state = False
def reset(self):
self.count=0
self.state = np.array([1, 0, 0])
return self._obs()
def step(self, action_dict):
indexes = [0,1,2]
capacita = [max(0,int(self.max_1[self.count]-(self.invoc_1[self.count]-action_dict[0]))),
max(0,int(self.max_2[self.count]-(self.invoc_2[self.count]-action_dict[1]))),
max(0,int(self.max_3[self.count]-(self.invoc_3[self.count]-action_dict[2])))]
persi = [0,0,0]
gestiti = [0,0,0]
index = 0
for action in action_dict.values():
split_action = int(action/2)
indexes_copy = indexes.copy()
restanti = 0
primo_giro = True
for indice in indexes_copy:
gestiti[index] += min(split_action,capacita[indice])
restanti = max(0,split_action-capacita[indice])
capacita[indice] -= gestiti[index]
if primo_giro:
split_action+= restanti
primo_giro=False
else:
persi[index] -= restanti -gestiti[index]
index+=1
persi[0] = gestiti[0] - self.invoc_1[self.count]
persi[1] = gestiti[1] - self.invoc_2[self.count]
persi[2] = gestiti[2] - self.invoc_3[self.count]
if self.count<1000:
done = False
else:
done = True
self.count = self.count +1
rewards = {
self.agent_1: persi[0],
self.agent_2: persi[1],
self.agent_3: persi[2]
}
def capacita_limiter(num):
if num >= 100: return 99
if num <= 0: return 0
else: return num
obs = {
self.agent_1: {
"action_mask": mask(self.invoc_1[0]-1),
"observation": [int(capacita_limiter(self.invoc_1[0])),capacita_limiter(int(self.max_1[0]))]
},
self.agent_2: {
"action_mask": mask(self.invoc_2[0]-1),
"observation":[int(capacita_limiter(self.invoc_2[0])),capacita_limiter(int(self.max_2[0]))]
},
self.agent_3: {
"action_mask": mask(self.invoc_3[0]-1),
"observation": [int(capacita_limiter(self.invoc_3[0])),capacita_limiter(int(self.max_3[0]))]
}
}
dones = {"__all__": done}
infos = {}
return obs, rewards, dones, infos
def _obs(self):
return {
self.agent_1: {
"action_mask": mask(1),
"observation": [0,0]
},
self.agent_2:{
"action_mask": mask(1),
"observation": [0,0]
},
self.agent_3: {
"action_mask": mask(1),
"observation": [0,0]
}
}
if __name__ == "__main__":
ray.init(num_cpus=12, num_gpus=0)
args = parser.parse_args()
ModelCatalog.register_custom_model(
"cc_model", DFaaSCriticModel)
config = {
"env": MyTwoStepGame,
"batch_mode": "complete_episodes",
# Use GPUs iff `RLLIB_NUM_GPUS` env var set to > 0.
"num_gpus": 0,
"num_workers": 0,
"soft_horizon": True,
"no_done_at_end": False,
"multiagent": {
"policies": {
"shared_policy": (None, MyTwoStepGame.observation_space, Discrete(100), {}),
},
"policy_mapping_fn": lambda aid, **kwargs: "shared_policy",
},
"model": {
"custom_model": "cc_model",
},
"framework": args.framework,
}
stop = {
"training_iteration": args.stop_iters,
"timesteps_total": args.stop_timesteps,
"episode_reward_mean": args.stop_reward,
}
results = tune.run(CCTrainer, config=config, stop=stop, verbose=1,checkpoint_at_end=True)
if args.as_test:
check_learning_achieved(results, args.stop_reward)