Hi Kai,
The custom_model code is here:
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
import tensorflow as tf
import numpy as np
class MyKerasModel(TFModelV2):
def import_from_h5(self, h5_file: str) → None:
print("h5 ", h5_file)
pass
def __init__(self, obs_space, action_space, num_outputs, model_config,
name):
super(MyKerasModel, self).__init__(obs_space, action_space,
num_outputs, model_config, name)
# print(f"""
# Model config:
# obs_space = {obs_space}
# action_space = {action_space}
# num_outputs = {num_outputs}
# model_config = {model_config}
# name = {name}
# """)
self.inputs = tf.keras.layers.Input(
shape=obs_space.shape, name="observations")
if self.num_outputs is None:
self.num_outputs = int(np.product(self.obs_space.shape))
# normalization layer
layer_0 = tf.keras.layers.BatchNormalization(axis=-1,
momentum=0.99,
epsilon=0.001,
center=True,
scale=True,
beta_initializer="zeros",
gamma_initializer="ones",
moving_mean_initializer="zeros",
moving_variance_initializer="ones",
beta_regularizer=None,
gamma_regularizer=None,
beta_constraint=None,
gamma_constraint=None)(self.inputs)
layer_1 = tf.keras.layers.Dense(
128,
name="my_layer1",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(0.0001))(layer_0)
# X_train = np.reshape(obs_space, (obs_space.shape[0], 1, obs_space.shape[1]))
# layer_2 =tf.keras.layers.LSTM(256, activation='tanh', recurrent_activation='sigmoid',
# use_bias=True, kernel_initializer=normc_initializer(0.01),
# recurrent_initializer='orthogonal',
# input_shape=obs_space.shape,
# bias_initializer='zeros', unit_forget_bias=True,
# kernel_regularizer=None, recurrent_regularizer=None, bias_regularizer=None,
# activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
# bias_constraint=None, dropout=0.0, recurrent_dropout=0.0,
# return_sequences=False, return_state=False, go_backwards=False, stateful=False, time_major=False, unroll=False)(self.inputs)
layer_2 = tf.keras.layers.Dense(
128,
name="my_layer2",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(0.01))(layer_1)
layer_3 = tf.keras.layers.Dense(
110,
name="my_layer3",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(0.01))(layer_2)
layer_4 = tf.keras.layers.Dense(
100,
name="my_layer4",
activation=tf.nn.relu,
kernel_initializer=normc_initializer(0.01))(layer_3)
layer_out = tf.keras.layers.Dense(
self.num_outputs,
name="my_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(layer_4)
value_out = tf.keras.layers.Dense(
1,
name="value_out",
activation=None,
kernel_initializer=normc_initializer(0.01))(layer_4)
self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])
def forward(self, input_dict, state, seq_lens):
model_out, self._value_out = self.base_model(input_dict["obs_flat"])
return model_out, state
def value_function(self):
return tf.reshape(self._value_out, [-1])
The environmet code is here:
from operator import xor, and_
from gym.spaces import Discrete, Box, Dict, MultiDiscrete, MultiBinary, Tuple
from pandas.core.frame import DataFrame
from pandas.core.groupby.generic import DataFrameGroupBy
from ray.rllib.utils.spaces.repeated import Repeated
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from random import randint
import numpy as np
import pandas as pd
from enum import Enum, unique
from network import Link, NetworkNode, LinkState, NodeType
import constants
from aircraftdata import AircraftDataReader, AircraftDataReaderPreload
from collections import OrderedDict, defaultdict
from random import randint, randrange
from timeit import default_timer as timer
import ray
import sknetwork as skn
from scipy import sparse
NEIGHBOR_ID = 0
AZIMUTH = 2
DISTANCE = 1
ELEVATION = 3
LINK_TIME = 4
class BitsEnv(MultiAgentEnv):
metadata = {‘render.modes’: [‘human’]}
def get_action_space(self):
return self.action_space
def get_agent_state(self):
return self.agent_state
def get_neighbor_state(self):
return self.neighbor_state
def __init__(self, config):
self.reader = AircraftDataReader(config.get("data_directory", "precompute_distance_30/"))
# self.reader = ray.get_actor(name="data_reader", namespace="bits")
# self._total_episode_reward = None
self._max_steps = config.get("max_steps", 100)
self._offset_start = config.get("offset_start", False)
self._log_steps = config.get("log_steps", False)
self._log_step_time = config.get("log_step_time", False)
self._start_offset_min = config.get("offset_start_offset_min", 0)
self._start_offset_max = config.get("offset_start_offset_max", 0)
if self._offset_start:
self._step_start_offset = randrange(self._start_offset_min, self._start_offset_max, 250)
else:
self._step_start_offset = 3000
self._individual_rewards = config.get("individual_rewards", True)
self.max_neighborhood_size = config.get("max_neighborhood_size", 1)
self.number_of_terminals = config.get("number_of_terminals", 3)
self._last_done = {}
self._last_obs = {}
self._last_conn_quota = 0
self._step_count = 0
self.connected_aircraft_ratio = 0
self._reward_wait_time = 5
# 0: do nothing, 1: terminate link; MultiBinary is not supported by rllib
# self.terminate_action = MultiDiscrete([2 for terminal in range(self.number_of_terminals)])
self.action_space = MultiDiscrete(
[self.max_neighborhood_size for terminal in range(self.number_of_terminals)])
# 0: free, 1: in use; MultiBinary is not supported by rllib
self.terminal_state = MultiDiscrete([2 for terminal in range(self.number_of_terminals)])
self.free_terminals_state = Discrete(self.number_of_terminals + 1)
self.agent_state = Dict(
{"type": Discrete(2),
"hasDownlink": Discrete(2),
"freeTerminals": self.free_terminals_state,
"terminalState": self.terminal_state, "latitude": Box(-180, 180, shape=(1,)),
"longitude": Box(-180, 180, shape=(1,)), "altitude": Box(-20, 150000, shape=(1,)),
"speed": Box(0, 800, shape=(1,)),
"heading": Box(0, 501, shape=(1,))})
self.neighbor_state = Dict(
{"azimuth": Box(0, 360, shape=(1,)),
"distance": Box(0, 400000, shape=(1,)),
"elevation": Box(-90, 90, shape=(1,)),
"freeTerminals": self.free_terminals_state,
"hasDownlink": Discrete(2),
"linkTime": Box(0, 100000, shape=(1,)),
"relative_heading": Box(0, 501, shape=(1,)),
"speed": Box(0, 800, shape=(1,)),
"terminalState": self.terminal_state,
"type": Discrete(2)
})
self.observation_space = Dict({"ownState": self.agent_state, "other": Repeated(self.neighbor_state, max_len=self.max_neighborhood_size)})
# agent: id => latitude, longitude, altitude, heading, speed
# self.agents = {"aircraft_"+str(i): NetworkNode("aircraft_"+str(i), self.agent_state.sample(), NodeType.AIRCRAFT) for i in range(int(self.n_agents/2))}
self.agents = {}
self._agent_ids = set()
# self.link_matrix = pd.DataFrame(np.full((self.n_agents, self.n_agents), 0), index=self.agents.keys(), columns=self.agents.keys())
# self.link_matrix['basestation_2']['aircraft_0'] = 3
# self.link_matrix['aircraft_0']['aircraft_1'] = 3
# @profile
def step(self, action_dict):
if self._log_steps:
print("Step: ", self._step_count)
if self._log_step_time:
start = timer()
# print("Wall time: " + str(self._step_count+self._step_start_offset))
# ref = self.reader.get_data.remote(self._step_count + self._step_start_offset)
# (ac, nb) = ray.get(ref)
# nb: dict
(ac, nb) = self.reader.get_data(self._step_count + self._step_start_offset, cache=False)
# Update agents
to_remove = self.update_agents(ac, self._step_count)
done = { id: id in to_remove for id in self._agent_ids }
agent_ids = set(action_dict.keys())
agent_ids = agent_ids.difference(to_remove)
agent_ids = list(agent_ids)
node_ids = agent_ids.copy()
node_ids.append('sla_conn')
n = len(node_ids)
agent_ids_map = dict(zip(agent_ids, range(n)))
link_matrix = np.zeros((n,n), dtype=np.bool)
sla_conn_idx = node_ids.index('sla_conn')
base_station_ids = [index for index, value in enumerate(node_ids) if value.startswith('bs_')]
link_matrix[sla_conn_idx, base_station_ids] = True
# Execute actions
for i in agent_ids:
neighborhood = None
if i in nb.keys():
neighborhood = nb[self.agents[i].id][NEIGHBOR_ID]
if isinstance(neighborhood, str):
neighborhood = {neighborhood}
else:
neighborhood = set(neighborhood)
self.agents[i].step(action_dict[i], self._step_count, self.agents, neighborhood, link_matrix, agent_ids_map)
link_matrix = np.logical_or(link_matrix, link_matrix.T)
# cnt_val = np.logical_and(link_matrix_values, link_matrix_values.T).sum()
# cnt_inv = np.logical_xor(link_matrix_values, link_matrix_values.T).sum()
# print(f'{cnt_val//2}/{cnt_inv}')
nodes_connected = self.get_connectivity_from_matrix(link_matrix, sla_conn_idx)
divisiable = n-1 - len(base_station_ids)
if divisiable > 0: # changed
conn_quota = len(nodes_connected - len(base_station_ids)) / divisiable
else:
conn_quota = 0
# print(f'connected: {int(conn_quota * 100)}%')
rew = dict(zip(agent_ids, [(1 + conn_quota - self._last_conn_quota)**2 - 1] * len(agent_ids)))
self._last_conn_quota = conn_quota
# print(nodes_connected)
# Calculate observation per agent
obs = self.get_agent_obs(nb)
np_node_ids = np.array(node_ids)
node_ids_connected = set(np_node_ids[nodes_connected])
for id in self.agents:
if id in node_ids_connected:
self.agents[id]._state["hasDownlink"] = 1
else:
self.agents[id]._state["hasDownlink"] = 0
# # Done when max steps
# done = self._last_done
done_all = self._step_count >= self._max_steps or self._step_count + self._step_start_offset >= 8640
done["__all__"] = done_all
self._last_obs = obs
self._step_count += 1
if self._step_count < self._reward_wait_time:
rew = {}
if self._log_step_time:
end = timer()
print (f'Step time: {end - start}')
return obs, rew, done, {}
def update_link_matrix(self):
self.link_matrix = pd.DataFrame(np.full((len(self.agents), len(self.agents)), 0), index=self.agents.keys(),
columns=self.agents.keys())
for _, agent in self.agents.items():
for terminal in agent.terminals:
link = terminal.link
if link != None:
# print(link)
self.link_matrix.loc[link.end1.id, link.end2.id] = link.state
self.link_matrix.loc[link.end2.id, link.end1.id] = link.state
# print(self.link_matrix)
def calculate_reward_modifier_for_established_link_length(self, steps):
result = steps * constants.REWARD_FOR_LONG_LINKS
if result > constants.MAX_REWARD_FOR_LONG_LINKS:
return constants.MAX_REWARD_FOR_LONG_LINKS
if result < constants.MIN_REWARD_FOR_LONG_LINKS:
return constants.MIN_REWARD_FOR_LONG_LINKS
return result
def calculate_reward_modifier_for_downlink_length(self, steps):
result = steps * constants.REWARD_FOR_LONG_DOWNLINK
if result > constants.MAX_REWARD_FOR_LONG_DOWNLINK:
return constants.MAX_REWARD_FOR_LONG_DOWNLINK
if result < constants.MIN_REWARD_FOR_LONG_DOWNLINK:
return constants.MIN_REWARD_FOR_LONG_DOWNLINK
return result
def get_connectivity_from_matrix(self, link_matrix: np.ndarray, root_node_id: int):
# _rew = {}
if link_matrix.shape[0] > 1:
adj = sparse.csr_matrix(link_matrix)
nodes_connected = skn.sknetwork.path.breadth_first_search(adjacency=adj, source=root_node_id, return_predecessors=False)
return nodes_connected
else:
return np.array([], dtype=np.int)
def get_agent_rewards(self):
_rew = {}
connected_aircraft = 0
total_aircraft = 0
# Add reward per agent
for i, agent in self.agents.items():
individual_reward = 0
link_rewards = 0
for terminal in agent.terminals:
link = terminal.link
if link is not None:
if link.state == LinkState.ESTABLISHED:
link_rewards += constants.REWARD_FOR_ESTABLISHED_LINK * self.calculate_reward_modifier_for_established_link_length(
self._step_count)
elif link.state == LinkState.AGREED:
link_rewards += constants.REWARD_FOR_AGREED_LINK
elif link.state == LinkState.INITIATED:
link_rewards += constants.REWARD_FOR_INITIATED_LINK
# link_rewards = 0
individual_reward += link_rewards
if agent._type == NodeType.AIRCRAFT:
total_aircraft += 1
if agent._state["hasDownlink"] == 1:
connected_aircraft += 1
individual_reward += constants.REWARD_FOR_DOWNLINK * self.calculate_reward_modifier_for_downlink_length(
self._step_count)
else:
pass
if not self._individual_rewards:
individual_reward = 0
_rew[i] = individual_reward
if total_aircraft == 0:
self.connected_aircraft_ratio = 0
else:
self.connected_aircraft_ratio = connected_aircraft / total_aircraft
for index, value in _rew.items():
_rew[index] = value + self.connected_aircraft_ratio / len(
_rew) * 100 / self._max_steps # *constants.REWARD_WEIGHT_FOR_CONNECTED_AIRCRAFT_RATIO
return _rew
def get_agent_obs(self, nb: dict):
_obs = {}
# Add observation per Agent
for i, agent in self.agents.items():
others = []
obs_mapping = []
if i in nb.keys():
neighborhood = nb[i]
# if type(neighborhood) != pd.DataFrame:
# neighborhood = pd.DataFrame(neighborhood).transpose()
# for _, x in neighborhood.iterrows():
# print(neighborhood)
for x in neighborhood:
# def do_work(x):
neighbor_node_state = self.agents[x[NEIGHBOR_ID]]._state
neighbor_state = {
"azimuth": np.array([x[AZIMUTH]]),
"distance": np.array([x[DISTANCE]]),
"elevation": np.array([x[ELEVATION]]),
"freeTerminals": neighbor_node_state['freeTerminals'],
"hasDownlink": neighbor_node_state['hasDownlink'],
"linkTime": np.array([min(86400, x[LINK_TIME])]),
"relative_heading": neighbor_node_state["heading"],
"speed": neighbor_node_state["speed"],
"terminalState": neighbor_node_state['terminalState'],
"type": neighbor_node_state['type']
}
# neighbor_state = self.neighbor_state.sample()
others.append(neighbor_state)
obs_mapping.append(x[NEIGHBOR_ID])
# neighborhood.apply(do_work, axis=1, raw=True)
_obs[i] = {"other": others, "ownState": agent._state}
agent.last_observation_mapping = obs_mapping
# print(_obs)
return _obs
def update_agents(self, ac, now):
to_remove = []
for id, agent in self.agents.items():
if agent._type == NodeType.AIRCRAFT:
if id not in ac.index.values:
to_remove.append(id)
for terminal in agent.terminals:
if terminal.link != None:
terminal.link.set_state(LinkState.TERMINATED, now)
terminal.link = None
for i in to_remove:
del self.agents[i]
self._agent_ids.remove(i)
# for index, aircraft in ac.iterrows():
def do_updates(aircraft):
node_id = aircraft.name
if node_id not in self.agents:
self._agent_ids.add(node_id)
if node_id.startswith("ac"):
node_type = NodeType.AIRCRAFT
has_downlink = 0
else:
node_type = NodeType.BASESTATION
has_downlink = 1
node_state = {
"altitude": np.array([aircraft["alt"]]), "freeTerminals": self.number_of_terminals,
"hasDownlink": has_downlink,
"heading": np.array([aircraft["heading"]]),
"latitude": np.array([aircraft["lat"]]), "longitude": np.array([aircraft["lon"]]),
"speed": np.array([aircraft["speed"]]),
"terminalState": [0 for terminal in range(self.number_of_terminals)], "type": node_type._value_
}
new_node = NetworkNode(node_id, node_state, node_type)
return (node_id, new_node)
else:
agent = self.agents[node_id]
old_state = agent._state
node_state = {
"altitude": np.array([aircraft["alt"]]), "freeTerminals": old_state['freeTerminals'],
"hasDownlink": old_state['hasDownlink'],
"heading": np.array([aircraft["heading"]]), "latitude": np.array([aircraft["lat"]]),
"longitude": np.array([aircraft["lon"]]),
"speed": np.array([aircraft["speed"]]),
"terminalState": old_state['terminalState'], "type": old_state['type']
}
agent._state = node_state
return (node_id, agent)
tuple_list = ac.apply(do_updates, axis=1).tolist()
self.agents = dict(tuple_list)
# for agent in self.agents.values():
# print(agent.description())
return to_remove
def reset(self):
self._last_done = {}
self._last_obs = {}
self._last_conn_quota = 0
self._step_count = 0
self.connected_aircraft_ratio = 0
self.agents = {}
if self._offset_start:
self._step_start_offset = randint(self._start_offset_min, self._start_offset_max)
else:
self._step_start_offset = 3000
print("Initializing Env with offset: " + str(self._step_start_offset))
(ac, nb) = self.reader.get_data(self._step_count + self._step_start_offset, cache=False)
self.update_agents(ac, self._step_count)
obs = self.get_agent_obs(nb)
self._step_count += 1
# self.link_matrix = pd.DataFrame(np.full((len(self.agents), len(self.agents)), 0), index=self.agents.keys(), columns=self.agents.keys())
# self._total_episode_reward = [0 for _ in range(self.n_agents)]
# self._agent_dones = [False for _ in range(self.n_agents)]
return obs
def propagate_network(self):
aircraft_with_downlink = set()
visited = set()
for _, agent in self.agents.items():
if agent._type == NodeType.BASESTATION:
visited_stations = set()
self.propagate_network_recursive(agent, visited, aircraft_with_downlink, visited_stations, 0)
# print("\taircraft_with_downlink ", aircraft_with_downlink)
for _, agent in self.agents.items():
if agent._type == NodeType.AIRCRAFT:
if agent in aircraft_with_downlink:
agent._state["hasDownlink"] = 1
agent.downlink_since = self._step_count
else:
agent._state["hasDownlink"] = 0
agent.downlink_hops = -1
agent.downlink_since = -1
def propagate_network_recursive(self, current, visited, aircraft_with_downlink, visited_stations, downlink_hops):
visited.add(current)
downlink_hops += 1
# print("downlink_hops: ", downlink_hops)
# print("Current: ", current)
# print("Visited: ", visited)
# print("Index: ", self.link_matrix[current].index)
for terminal in current.terminals:
link = terminal.link
if link != None:
other_end = link.get_other_end(current)
# print("\tOther end id: ", other_end_id)
# print("\tLink to other end: ", link_to_other_end)
if link.state == LinkState.ESTABLISHED and (
not (other_end in visited) or other_end.downlink_hops > downlink_hops):
other_end._state["hasDownlink"] = 1
# print("Setting", other_end_id, " to 1")
if other_end._type == NodeType.AIRCRAFT:
aircraft_with_downlink.add(other_end)
other_end.downlink_hops = downlink_hops
elif other_end._type == NodeType.BASESTATION:
visited_stations.add(other_end)
self.propagate_network_recursive(other_end, visited, aircraft_with_downlink, visited_stations,
downlink_hops)
Here is the docker-image fie:
RLLib Docker container based on Google Tensorflow 2.5.1
Assumes an environment configured to use a proxy if necessary
ARG GPU=-gpu
FROM tensorflow/tensorflow:2.5.1$GPU
ENV PYTHONPATH=/workspace
WORKDIR /workspace
COPY dev-image/certs/* /usr/local/share/ca-certificates
RUN update-ca-certificates
RUN apt-key del 7fa2af80
&& apt-key adv --fetch-keys http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1604/x86_64/3bf863cc.pub
&& apt-get update
RUN add-apt-repository “deb Index of /compute/cuda/repos/ubuntu2004/x86_64 /”
&& apt-get update
&& apt-get install libcudnn8
&& apt-get install libcudnn8-dev
RUN apt-get install -y ssh git
RUN python -m pip config --global set http.sslVerify false
RUN python -m pip config --global set global.trusted-host “s3-us-west-2.amazonaws.com pypi.org files.pythonhosted.org pypi.python.org”
RUN python -m pip install --upgrade pip
RUN pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl
COPY requirements.txt .
RUN pip install -r requirements.txt
RUN useradd user
RUN mkdir /home/user
RUN chown -R user:user /home/user
USER user
ARG commit_id=c722ca6c7eed3d7987c0d5c3df5c45f6b15e77d1
RUN curl -sSL --insecure “https://update.code.visualstudio.com/commit:${commit_id}/server-linux-x64/stable” -o /tmp/vscode-server-linux-x64.tar.gz
RUN mkdir -p ~/.vscode-server/bin/${commit_id}
RUN tar zxvf /tmp/vscode-server-linux-x64.tar.gz -C ~/.vscode-server/bin/${commit_id} --strip 1
RUN touch ~/.vscode-server/bin/${commit_id}/0
VOLUME ["/workspace"]
TensorBoard port
EXPOSE 6006
My apologies if this is all a bit overwhelming. Thank you for your time!
Best,
Anup