Tune.run() with docker is not using gpu

  • High: It blocks me to complete my task.

Hi,

tune.run() does not recognise the GPU for training. I am using tensorflow with ray. I have attahed the training status below(image 1). When I did some searching I found that Cuda does not support the usage of dicts or strings (python - Numba kernel is not allowing dictionaries or string functions - Stack Overflow).

Problem: I am working on a Multi-agent reinforcement learning problem with a coustom environment. Oservations return a dict and each agent has a unique identifier which is of type string. I am using a Docker container with ubuntu windows sub-system for linux to train.

Questions:

  1. Does the use of dicts and strings a problem to use gpu for training? Is there a work around with data types? Does ray work with Numpy ND arrays for observations instead of dicts?
  2. Usage of resources_per_trail gives and error following this link ( A Guide To Parallelism and Resources — Ray 1.12.1). I don’t understand what is the problem.
  3. My tensorflow recognises the GPU but ray does not. How do I debug this issue?(Image 2)
  4. Could it be that the docker is not using the GPU?

I am using ray 1.12.0 and tensorflow 2.8.0. It would be of great help if you can help me with this.

Thank you!

Hi, I have attacehd the second image here. As i was allowed to use only one image in the question.

image 2:

Thanks!

Hi @Anup877,

it seems like Ray is not detecting your GPUs at all (it says 0/0 GPUs) in the string.

This can be due to two reasons: 1) You’ve started your Ray cluster (ray start or ray.init()) without specifying GPUs. 2) The GPUs are not recognized in your Docker container.

Is the second screenshot on your host system or is this from the same Docker container that your Tune script is running in?

How do you connect to Ray? Can you share parts of your training script? Specifically, if you’re calling ray.init() or ray start somewhere, that would be interesting.

Also the tune.run() call would be interesting to help with the other issues.

For the very first question, that depends on your application and I’m not familiar enough with this specific type of training to know the answer. You’ll usually want to tokenize your data for efficient computation.

1 Like

Hi Kai,

Thanks for your inputs. initialising Ray cluster certainly seems to be working, when it shows the training status, but actually does not seems to be using the GPUs.

And reiterating my first question, is it obligatory to use dicts for observations and strings for agent identifiers( Models, Preprocessors, and Action Distributions — Ray 1.12.1)? Can we use other faster data structures?

I have attached the screenshots from the ray status(image 1), ray dashboard(image 2), and windows performance(image 3).

ray_status

Thank you for the quick responce! I appreciate your help!

For the RLLib-related question (1) cc @avnishn and @sven1977 who have more context here.

For the GPU thing, in the screenshot you’ve just posted it seems like Ray does detect the GPU. Do you see any GPU-related log output?

Again, if you could post parts of your training script, that would be much appreciated. It doesn’t have to contain any private information, but your call to tune.run() and the config argument would be helpful. E.g. is your RLLib-config set to use GPUs?

Hi Kai,

Yes. The first image says that resources requested are 12/12 cpus and 1/1 GPUs. But the Ray dashboard does not seem to be using any GPUs.

I have pasted the code here:

import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.rllib.models import ModelCatalog
from env_p import Env
import matplotlib.pyplot as plt
from datetime import datetime
import numpy as np
from custom_model import MyKerasModel
# env config
max_neighborhood_size = 31
data_dir = '/workspace/precompute_distance_30/'
# Maximum number of steps in each episode
max_steps = 300  # need change
offset_start = True
offset_start_offset_min = 100
offset_start_offset_max = 8500
individual_rewards = True
# The number of times tune.report() has been called
training_iterations = 10  # changed
samples_per_iteration = 1
test_steps = 2
train_and_evaluate = True
run_or_experiment = "PPO"
# for local
gpu_count = 1
num_workers = 10  # need change
# for GOOGLE
# gpu_count = 0  # when running on CPU
# num_workers = 1
# specify the output directory here
out_dir = './custom_results/'
def env_creator(_):
    return Env(_)
def select_policy(agent_id):
    if agent_id.startswith("bs"):
        return "basestations"
    return "aircrafts"
def rl_train(lr):
    ray.init(dashboard_host="0.0.0.0", num_gpus=1)
    env_config = {
        "max_steps": max_steps,
        "max_neighborhood_size": max_neighborhood_size,
        "individual_rewards": individual_rewards,
        "offset_start": offset_start,
        "offset_start_offset_min": offset_start_offset_min,
        "offset_start_offset_max": offset_start_offset_max,
        "data_directory": data_dir,
        "log_steps": False,
        "log_step_time": False
    }
    tune.register_env(
        'Env', lambda config: Env(config=env_config))
    ModelCatalog.register_custom_model("CUSTOM", MyKerasModel)
    # if gpu_count > 0:
    #     num_gpus = 0.0001  # Driver GPU
    #     num_gpus_per_worker = (gpu_count - num_gpus) / num_workers
    # else:
    #     num_gpus_per_worker = 0
    single_env = Env(env_config)
    action_space = single_env.action_space
    observation_space = single_env.observation_space
    config_PPO = {
        # Whether the LSTM is time-major (TxBx..) or batch-major (BxTx..).
        "num_workers": 1,
        "num_envs_per_worker": num_workers,
        # "num_gpus_per_worker": 100,
        "rollout_fragment_length": 1,
        "sgd_minibatch_size": 256,
        "num_sgd_iter": 1,
        # "vtrace": False,
        "batch_mode": "complete_episodes",
        "shuffle_sequences": True,
        "train_batch_size": num_workers * max_steps,
        "preprocessor_pref": 'rllib',  # definitely faster than 'deepmind' from experiments
        "lr": tune.grid_search([0.0001, 5e-5, 5e-6]),
        "log_level": "WARN",
        "framework": "tf",
        "eager_tracing": True,
        "collect_metrics_timeout": 180,
        # Smooth metrics over this many episodes.
        "metrics_smoothing_episodes": 10,
        "num_gpus": 1,  # need to change
        "num_cpus_per_worker": num_workers,
        "num_cpus_for_driver": 2,
        "multiagent": {
            "policies": {
                # the first tuple value is None -> uses default policy
                "aircrafts": (None, observation_space, action_space, {"gamma": 0.99}),
                "basestations": (None, observation_space, action_space, {"gamma": 0.99}),
            },
            "policy_mapping_fn": select_policy
        },
        "no_done_at_end": False,
        "env_config": env_config,
        "env": "Env",
    }
    config_APPO = {
        # Whether the LSTM is time-major (TxBx..) or batch-major (BxTx..).
        "num_workers": num_workers,
        "num_envs_per_worker": 1,
        # "num_gpus_per_worker": 100,
        "rollout_fragment_length": 1,
        # "sgd_minibatch_size": 100,
        "num_sgd_iter": 1,
        # "vtrace": False,
        "batch_mode": "complete_episodes",
        # "shuffle_sequences": True,
        "train_batch_size": num_workers * max_steps,
        "preprocessor_pref": "rllib",
        "lr": lr,
        "log_level": "WARN",
        "framework": "tf",
        "eager_tracing": True,
        "collect_metrics_timeout": 180,
        # Smooth metrics over this many episodes.
        "metrics_smoothing_episodes": 10,
        "num_gpus": gpu_count,
        "num_multi_gpu_tower_stacks": 2,
        "num_cpus_per_worker": 1,
        "num_cpus_for_driver": 2,
        "multiagent": {
            "policies": {
                # the first tuple value is None -> uses default policy
                "aircrafts": (None, observation_space, action_space, {"gamma": 0.99}),
                "basestations": (None, observation_space, action_space, {"gamma": 0.99}),
            },
            "policy_mapping_fn": select_policy
        },
        "no_done_at_end": False,
        "env_config": env_config,
        "env": "Env",
        "model": {
            # # == LSTM  if you need the extra memory in network uncomment lines 176-186==
            # # Whether to wrap the model with an LSTM.
            "use_lstm": True,
            # # Max seq len for training the LSTM, defaults to 20.
            # "max_seq_len": 30,
            # # Size of the LSTM cell.
            # "lstm_cell_size": 128,
            # # Whether to feed a_{t-1} to LSTM (one-hot encoded if discrete).
            "lstm_use_prev_action": True,
            # # # Whether to feed r_{t-1} to LSTM.
            "lstm_use_prev_reward": True,
            # "custom_model": "CUSTOM"
        }
    }
    configs = {
        "PPO": config_PPO,
        "APPO": config_APPO
    }
    analysis = tune.run(
        run_or_experiment,
        num_samples=1,
        # scheduler=ASHAScheduler(metric="time_this_iter_s", mode="min"),
        stop={"training_iteration": training_iterations},
        reuse_actors=False,
        log_to_file=True,
        local_dir=out_dir,
        checkpoint_freq=5,
        checkpoint_at_end=True,
        config=configs[run_or_experiment])

if __name__ == "__main__":
    try:
        rl_train(5e-6)
    except Exception as e:
        print(str(e))
    finally:
        if ray.is_initialized():
            ray.shutdown()

Hi @Anup8777,

your code looks correct. I’ve also ran a reduced version of it on a GPU machine and it worked fine (and utilized the GPU).

Thus, the problem will be in one of your custom modules, e.g. the model or the policy. Can you share the code for those as well?

Here’s the reduced script I ran:

import ray
import gym
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.rllib.models import ModelCatalog

import matplotlib.pyplot as plt
from datetime import datetime
import numpy as np

# env config
max_neighborhood_size = 31
data_dir = '/workspace/precompute_distance_30/'
# Maximum number of steps in each episode
max_steps = 300  # need change
offset_start = True
offset_start_offset_min = 100
offset_start_offset_max = 8500
individual_rewards = True
# The number of times tune.report() has been called
training_iterations = 10  # changed
samples_per_iteration = 1
test_steps = 2
train_and_evaluate = True
run_or_experiment = "PPO"
# for local
gpu_count = 1
num_workers = 10  # need change
# for GOOGLE
# gpu_count = 0  # when running on CPU
# num_workers = 1
# specify the output directory here
out_dir = './custom_results/'

def rl_train(lr):
    ray.init()
    env_config = {
        # "max_steps": max_steps,
        # "max_neighborhood_size": max_neighborhood_size,
        # "individual_rewards": individual_rewards,
        # "offset_start": offset_start,
        # "offset_start_offset_min": offset_start_offset_min,
        # "offset_start_offset_max": offset_start_offset_max,
        # "data_directory": data_dir,
        # "log_steps": False,
        # "log_step_time": False
    }

    # if gpu_count > 0:
    #     num_gpus = 0.0001  # Driver GPU
    #     num_gpus_per_worker = (gpu_count - num_gpus) / num_workers
    # else:
    #     num_gpus_per_worker = 0

    config_PPO = {
        # Whether the LSTM is time-major (TxBx..) or batch-major (BxTx..).
        "num_workers": 1,
        "num_envs_per_worker": num_workers,
        # "num_gpus_per_worker": 100,
        "rollout_fragment_length": 1,
        "sgd_minibatch_size": 256,
        "num_sgd_iter": 1,
        # "vtrace": False,
        "batch_mode": "complete_episodes",
        "shuffle_sequences": True,
        "train_batch_size": num_workers * max_steps,
        "preprocessor_pref": 'rllib',  # definitely faster than 'deepmind' from experiments
        "lr": tune.grid_search([0.0001, 5e-5, 5e-6]),
        "log_level": "WARN",
        "framework": "tf",
        "eager_tracing": True,
        "collect_metrics_timeout": 180,
        # Smooth metrics over this many episodes.
        "metrics_smoothing_episodes": 10,
        "num_gpus": 1,  # need to change
        "num_cpus_per_worker": num_workers,
        "num_cpus_for_driver": 2,
        # "multiagent": {
        #     "policies": {
        #         # the first tuple value is None -> uses default policy
        #         # "aircrafts": (None, observation_space, action_space, {"gamma": 0.99}),
        #         # "basestations": (None, observation_space, action_space, {"gamma": 0.99}),
        #     },
        #     "policy_mapping_fn": select_policy
        # },
        "no_done_at_end": False,
        "env_config": env_config,
        "env": "CartPole-v0",
    }
  
    configs = {
        "PPO": config_PPO,
    }
    analysis = tune.run(
        run_or_experiment,
        num_samples=1,
        # scheduler=ASHAScheduler(metric="time_this_iter_s", mode="min"),
        stop={"training_iteration": training_iterations},
        reuse_actors=False,
        log_to_file=True,
        local_dir=out_dir,
        checkpoint_freq=5,
        checkpoint_at_end=True,
        config=configs[run_or_experiment])

    

try:
    rl_train(5e-6)
except Exception as e:
    print(str(e))
finally:
    if ray.is_initialized():
        ray.shutdown()

Hi Kai,

The custom_model code is here:

from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.models.tf.tf_modelv2 import TFModelV2
import tensorflow as tf
import numpy as np

class MyKerasModel(TFModelV2):
def import_from_h5(self, h5_file: str) → None:
print("h5 ", h5_file)
pass

def __init__(self, obs_space, action_space, num_outputs, model_config,
             name):
    super(MyKerasModel, self).__init__(obs_space, action_space,
                                       num_outputs, model_config, name)
    # print(f"""
    #   Model config:
    #   obs_space = {obs_space}
    #   action_space = {action_space}
    #   num_outputs = {num_outputs}
    #   model_config = {model_config}
    #   name = {name}
    # """)
    self.inputs = tf.keras.layers.Input(
        shape=obs_space.shape, name="observations")
    if self.num_outputs is None:
        self.num_outputs = int(np.product(self.obs_space.shape))
    # normalization layer
    layer_0 = tf.keras.layers.BatchNormalization(axis=-1,
                                                 momentum=0.99,
                                                 epsilon=0.001,
                                                 center=True,
                                                 scale=True,
                                                 beta_initializer="zeros",
                                                 gamma_initializer="ones",
                                                 moving_mean_initializer="zeros",
                                                 moving_variance_initializer="ones",
                                                 beta_regularizer=None,
                                                 gamma_regularizer=None,
                                                 beta_constraint=None,
                                                 gamma_constraint=None)(self.inputs)
    layer_1 = tf.keras.layers.Dense(
        128,
        name="my_layer1",
        activation=tf.nn.relu,
        kernel_initializer=normc_initializer(0.0001))(layer_0)

    # X_train = np.reshape(obs_space, (obs_space.shape[0], 1, obs_space.shape[1]))
    # layer_2 =tf.keras.layers.LSTM(256, activation='tanh', recurrent_activation='sigmoid',
    # use_bias=True, kernel_initializer=normc_initializer(0.01),
    # recurrent_initializer='orthogonal',
    # input_shape=obs_space.shape,
    # bias_initializer='zeros', unit_forget_bias=True,
    # kernel_regularizer=None, recurrent_regularizer=None, bias_regularizer=None,
    # activity_regularizer=None, kernel_constraint=None, recurrent_constraint=None,
    # bias_constraint=None, dropout=0.0, recurrent_dropout=0.0,
    # return_sequences=False, return_state=False, go_backwards=False, stateful=False, time_major=False, unroll=False)(self.inputs)
    layer_2 = tf.keras.layers.Dense(
        128,
        name="my_layer2",
        activation=tf.nn.relu,
        kernel_initializer=normc_initializer(0.01))(layer_1)
    layer_3 = tf.keras.layers.Dense(
        110,
        name="my_layer3",
        activation=tf.nn.relu,
        kernel_initializer=normc_initializer(0.01))(layer_2)
    layer_4 = tf.keras.layers.Dense(
        100,
        name="my_layer4",
        activation=tf.nn.relu,
        kernel_initializer=normc_initializer(0.01))(layer_3)

    layer_out = tf.keras.layers.Dense(
        self.num_outputs,
        name="my_out",
        activation=None,
        kernel_initializer=normc_initializer(0.01))(layer_4)
    value_out = tf.keras.layers.Dense(
        1,
        name="value_out",
        activation=None,
        kernel_initializer=normc_initializer(0.01))(layer_4)
    self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])

def forward(self, input_dict, state, seq_lens):
    model_out, self._value_out = self.base_model(input_dict["obs_flat"])
    return model_out, state

def value_function(self):
    return tf.reshape(self._value_out, [-1])

The environmet code is here:

from operator import xor, and_
from gym.spaces import Discrete, Box, Dict, MultiDiscrete, MultiBinary, Tuple
from pandas.core.frame import DataFrame
from pandas.core.groupby.generic import DataFrameGroupBy
from ray.rllib.utils.spaces.repeated import Repeated
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from random import randint
import numpy as np
import pandas as pd
from enum import Enum, unique
from network import Link, NetworkNode, LinkState, NodeType
import constants
from aircraftdata import AircraftDataReader, AircraftDataReaderPreload
from collections import OrderedDict, defaultdict
from random import randint, randrange
from timeit import default_timer as timer
import ray

import sknetwork as skn
from scipy import sparse

NEIGHBOR_ID = 0
AZIMUTH = 2
DISTANCE = 1
ELEVATION = 3
LINK_TIME = 4

class BitsEnv(MultiAgentEnv):
metadata = {‘render.modes’: [‘human’]}

def get_action_space(self):
    return self.action_space

def get_agent_state(self):
    return self.agent_state

def get_neighbor_state(self):
    return self.neighbor_state

def __init__(self, config):

    self.reader = AircraftDataReader(config.get("data_directory", "precompute_distance_30/"))
    # self.reader = ray.get_actor(name="data_reader", namespace="bits")

    # self._total_episode_reward = None
    self._max_steps = config.get("max_steps", 100)
    self._offset_start = config.get("offset_start", False)
    self._log_steps = config.get("log_steps", False)
    self._log_step_time = config.get("log_step_time", False)
    self._start_offset_min = config.get("offset_start_offset_min", 0)
    self._start_offset_max = config.get("offset_start_offset_max", 0)
    if self._offset_start:
        self._step_start_offset = randrange(self._start_offset_min, self._start_offset_max, 250)
    else:
        self._step_start_offset = 3000
    self._individual_rewards = config.get("individual_rewards", True)
    self.max_neighborhood_size = config.get("max_neighborhood_size", 1)
    self.number_of_terminals = config.get("number_of_terminals", 3)

    self._last_done = {}
    self._last_obs = {}
    self._last_conn_quota = 0
    self._step_count = 0
    self.connected_aircraft_ratio = 0
    
    self._reward_wait_time = 5
    
    # 0: do nothing, 1: terminate link; MultiBinary is not supported by rllib
    # self.terminate_action = MultiDiscrete([2 for terminal in range(self.number_of_terminals)])
    self.action_space = MultiDiscrete(
        [self.max_neighborhood_size for terminal in range(self.number_of_terminals)])

    # 0: free, 1: in use; MultiBinary is not supported by rllib
    self.terminal_state = MultiDiscrete([2 for terminal in range(self.number_of_terminals)])
    self.free_terminals_state = Discrete(self.number_of_terminals + 1)
    self.agent_state = Dict(
        {"type": Discrete(2),
         "hasDownlink": Discrete(2),
         "freeTerminals": self.free_terminals_state,
         "terminalState": self.terminal_state, "latitude": Box(-180, 180, shape=(1,)),
         "longitude": Box(-180, 180, shape=(1,)), "altitude": Box(-20, 150000, shape=(1,)),
         "speed": Box(0, 800, shape=(1,)),
         "heading": Box(0, 501, shape=(1,))})
    self.neighbor_state = Dict(
        {"azimuth": Box(0, 360, shape=(1,)),
         "distance": Box(0, 400000, shape=(1,)),
         "elevation": Box(-90, 90, shape=(1,)),
         "freeTerminals": self.free_terminals_state,
         "hasDownlink": Discrete(2),
         "linkTime": Box(0, 100000, shape=(1,)),
         "relative_heading": Box(0, 501, shape=(1,)),
         "speed": Box(0, 800, shape=(1,)),
         "terminalState": self.terminal_state,
         "type": Discrete(2)
         })
    self.observation_space = Dict({"ownState": self.agent_state, "other": Repeated(self.neighbor_state, max_len=self.max_neighborhood_size)})

    # agent: id => latitude, longitude, altitude, heading, speed
    # self.agents = {"aircraft_"+str(i): NetworkNode("aircraft_"+str(i), self.agent_state.sample(), NodeType.AIRCRAFT) for i in range(int(self.n_agents/2))}
    self.agents = {}
    self._agent_ids = set()

    # self.link_matrix = pd.DataFrame(np.full((self.n_agents, self.n_agents), 0), index=self.agents.keys(), columns=self.agents.keys())
    # self.link_matrix['basestation_2']['aircraft_0'] = 3
    # self.link_matrix['aircraft_0']['aircraft_1'] = 3

# @profile
def step(self, action_dict):
    
    if self._log_steps:
        print("Step: ", self._step_count)

    if self._log_step_time:
        start = timer()
    # print("Wall time: " + str(self._step_count+self._step_start_offset))

    # ref = self.reader.get_data.remote(self._step_count + self._step_start_offset)
    # (ac, nb) = ray.get(ref)

    # nb: dict
    (ac, nb) = self.reader.get_data(self._step_count + self._step_start_offset, cache=False)

    # Update agents
    to_remove = self.update_agents(ac, self._step_count)

    done = { id: id in to_remove for id in self._agent_ids }

    agent_ids = set(action_dict.keys())
    agent_ids = agent_ids.difference(to_remove)
    agent_ids = list(agent_ids)
    node_ids = agent_ids.copy()
    node_ids.append('sla_conn')
    n = len(node_ids)
    agent_ids_map = dict(zip(agent_ids, range(n)))
    link_matrix = np.zeros((n,n), dtype=np.bool)
    sla_conn_idx = node_ids.index('sla_conn')
    base_station_ids = [index for index, value in enumerate(node_ids) if value.startswith('bs_')]
    link_matrix[sla_conn_idx, base_station_ids] = True

    # Execute actions
    for i in agent_ids:
        neighborhood = None
        if i in nb.keys():
            neighborhood = nb[self.agents[i].id][NEIGHBOR_ID]
            if isinstance(neighborhood, str):
                neighborhood = {neighborhood}
            else:
                neighborhood = set(neighborhood)
        self.agents[i].step(action_dict[i], self._step_count, self.agents, neighborhood, link_matrix, agent_ids_map)

    link_matrix = np.logical_or(link_matrix, link_matrix.T)


    # cnt_val = np.logical_and(link_matrix_values, link_matrix_values.T).sum()
    # cnt_inv = np.logical_xor(link_matrix_values, link_matrix_values.T).sum()
    # print(f'{cnt_val//2}/{cnt_inv}')

    nodes_connected = self.get_connectivity_from_matrix(link_matrix, sla_conn_idx)
    divisiable = n-1 - len(base_station_ids)
    if divisiable > 0: # changed
        conn_quota = len(nodes_connected - len(base_station_ids)) / divisiable
    else:
        conn_quota = 0
    # print(f'connected: {int(conn_quota * 100)}%')
    rew = dict(zip(agent_ids, [(1 + conn_quota - self._last_conn_quota)**2 - 1] * len(agent_ids)))
    self._last_conn_quota = conn_quota
    
    # print(nodes_connected)

    # Calculate observation per agent
    obs = self.get_agent_obs(nb)

    np_node_ids = np.array(node_ids)
    node_ids_connected = set(np_node_ids[nodes_connected])
    for id in self.agents:
        if id in node_ids_connected:
            self.agents[id]._state["hasDownlink"] = 1
        else:
            self.agents[id]._state["hasDownlink"] = 0

    # # Done when max steps
    # done = self._last_done

    done_all = self._step_count >= self._max_steps or self._step_count + self._step_start_offset >= 8640
    done["__all__"] = done_all

    self._last_obs = obs

    self._step_count += 1

    if self._step_count < self._reward_wait_time:
        rew = {}

    if self._log_step_time:
        end = timer()
        print (f'Step time: {end - start}')

    return obs, rew, done, {}

def update_link_matrix(self):
    self.link_matrix = pd.DataFrame(np.full((len(self.agents), len(self.agents)), 0), index=self.agents.keys(),
                                    columns=self.agents.keys())
    for _, agent in self.agents.items():
        for terminal in agent.terminals:
            link = terminal.link
            if link != None:
                # print(link)
                self.link_matrix.loc[link.end1.id, link.end2.id] = link.state
                self.link_matrix.loc[link.end2.id, link.end1.id] = link.state
    # print(self.link_matrix)

def calculate_reward_modifier_for_established_link_length(self, steps):
    result = steps * constants.REWARD_FOR_LONG_LINKS
    if result > constants.MAX_REWARD_FOR_LONG_LINKS:
        return constants.MAX_REWARD_FOR_LONG_LINKS
    if result < constants.MIN_REWARD_FOR_LONG_LINKS:
        return constants.MIN_REWARD_FOR_LONG_LINKS
    return result

def calculate_reward_modifier_for_downlink_length(self, steps):
    result = steps * constants.REWARD_FOR_LONG_DOWNLINK
    if result > constants.MAX_REWARD_FOR_LONG_DOWNLINK:
        return constants.MAX_REWARD_FOR_LONG_DOWNLINK
    if result < constants.MIN_REWARD_FOR_LONG_DOWNLINK:
        return constants.MIN_REWARD_FOR_LONG_DOWNLINK
    return result


def get_connectivity_from_matrix(self, link_matrix: np.ndarray, root_node_id: int):
    # _rew = {}
    if link_matrix.shape[0] > 1:
        adj = sparse.csr_matrix(link_matrix)
        nodes_connected = skn.sknetwork.path.breadth_first_search(adjacency=adj, source=root_node_id, return_predecessors=False)
        return nodes_connected
    else:
        return np.array([], dtype=np.int)


def get_agent_rewards(self):
    _rew = {}

    connected_aircraft = 0
    total_aircraft = 0

    # Add reward per agent
    for i, agent in self.agents.items():
        individual_reward = 0
        link_rewards = 0
        for terminal in agent.terminals:
            link = terminal.link
            if link is not None:
                if link.state == LinkState.ESTABLISHED:
                    link_rewards += constants.REWARD_FOR_ESTABLISHED_LINK * self.calculate_reward_modifier_for_established_link_length(
                        self._step_count)
                elif link.state == LinkState.AGREED:
                    link_rewards += constants.REWARD_FOR_AGREED_LINK
                elif link.state == LinkState.INITIATED:
                    link_rewards += constants.REWARD_FOR_INITIATED_LINK
                    # link_rewards = 0
        individual_reward += link_rewards

        if agent._type == NodeType.AIRCRAFT:
            total_aircraft += 1
            if agent._state["hasDownlink"] == 1:
                connected_aircraft += 1
                individual_reward += constants.REWARD_FOR_DOWNLINK * self.calculate_reward_modifier_for_downlink_length(
                    self._step_count)
        else:
            pass
        if not self._individual_rewards:
            individual_reward = 0
        _rew[i] = individual_reward

    if total_aircraft == 0:
        self.connected_aircraft_ratio = 0
    else:
        self.connected_aircraft_ratio = connected_aircraft / total_aircraft

    for index, value in _rew.items():
        _rew[index] = value + self.connected_aircraft_ratio / len(
            _rew) * 100 / self._max_steps  # *constants.REWARD_WEIGHT_FOR_CONNECTED_AIRCRAFT_RATIO
    return _rew

def get_agent_obs(self, nb: dict):
    _obs = {}

    # Add observation per Agent
    for i, agent in self.agents.items():
        others = []
        obs_mapping = []
        if i in nb.keys():
            neighborhood = nb[i]
            # if type(neighborhood) != pd.DataFrame:
            #     neighborhood = pd.DataFrame(neighborhood).transpose()
            # for _, x in neighborhood.iterrows():
            # print(neighborhood)
            for x in neighborhood:
            # def do_work(x):
                neighbor_node_state = self.agents[x[NEIGHBOR_ID]]._state
                neighbor_state = {
                        "azimuth": np.array([x[AZIMUTH]]),
                        "distance": np.array([x[DISTANCE]]),
                        "elevation": np.array([x[ELEVATION]]),
                        "freeTerminals": neighbor_node_state['freeTerminals'],
                        "hasDownlink": neighbor_node_state['hasDownlink'],
                        "linkTime": np.array([min(86400, x[LINK_TIME])]),
                        "relative_heading": neighbor_node_state["heading"],
                        "speed": neighbor_node_state["speed"],
                        "terminalState": neighbor_node_state['terminalState'],
                        "type": neighbor_node_state['type']
                        }
                # neighbor_state = self.neighbor_state.sample()
                others.append(neighbor_state)
                obs_mapping.append(x[NEIGHBOR_ID])
            # neighborhood.apply(do_work, axis=1, raw=True)

        _obs[i] = {"other": others, "ownState": agent._state}
        agent.last_observation_mapping = obs_mapping

    # print(_obs)
    return _obs

def update_agents(self, ac, now):
    to_remove = []
    for id, agent in self.agents.items():
        if agent._type == NodeType.AIRCRAFT:
            if id not in ac.index.values:
                to_remove.append(id)
                for terminal in agent.terminals:
                    if terminal.link != None:
                        terminal.link.set_state(LinkState.TERMINATED, now)
                        terminal.link = None

    for i in to_remove:
        del self.agents[i]
        self._agent_ids.remove(i)

    # for index, aircraft in ac.iterrows():
    def do_updates(aircraft):
        node_id = aircraft.name
        if node_id not in self.agents:
            self._agent_ids.add(node_id)
            if node_id.startswith("ac"):
                node_type = NodeType.AIRCRAFT
                has_downlink = 0
            else:
                node_type = NodeType.BASESTATION
                has_downlink = 1
            node_state = {
                "altitude": np.array([aircraft["alt"]]), "freeTerminals": self.number_of_terminals,
                "hasDownlink": has_downlink,
                "heading": np.array([aircraft["heading"]]),
                "latitude": np.array([aircraft["lat"]]), "longitude": np.array([aircraft["lon"]]),
                "speed": np.array([aircraft["speed"]]),
                "terminalState": [0 for terminal in range(self.number_of_terminals)], "type": node_type._value_
                }
            new_node = NetworkNode(node_id, node_state, node_type)
            return (node_id, new_node)
        else:
            agent = self.agents[node_id]
            old_state = agent._state
            node_state = {
                "altitude": np.array([aircraft["alt"]]), "freeTerminals": old_state['freeTerminals'],
                "hasDownlink": old_state['hasDownlink'],
                "heading": np.array([aircraft["heading"]]), "latitude": np.array([aircraft["lat"]]),
                "longitude": np.array([aircraft["lon"]]),
                "speed": np.array([aircraft["speed"]]),
                "terminalState": old_state['terminalState'], "type": old_state['type']
                }
            agent._state = node_state
            return (node_id, agent)

    tuple_list = ac.apply(do_updates, axis=1).tolist()
    self.agents = dict(tuple_list)

    # for agent in self.agents.values():
    #     print(agent.description())

    return to_remove

def reset(self):
    self._last_done = {}
    self._last_obs = {}
    self._last_conn_quota = 0
    self._step_count = 0
    self.connected_aircraft_ratio = 0
    self.agents = {}

    if self._offset_start:
        self._step_start_offset = randint(self._start_offset_min, self._start_offset_max)
    else:
        self._step_start_offset = 3000
    print("Initializing Env with offset: " + str(self._step_start_offset))

    (ac, nb) = self.reader.get_data(self._step_count + self._step_start_offset, cache=False)
    self.update_agents(ac, self._step_count)
    obs = self.get_agent_obs(nb)
    self._step_count += 1

    # self.link_matrix = pd.DataFrame(np.full((len(self.agents), len(self.agents)), 0), index=self.agents.keys(), columns=self.agents.keys())
    # self._total_episode_reward = [0 for _ in range(self.n_agents)]
    # self._agent_dones = [False for _ in range(self.n_agents)]
    return obs

def propagate_network(self):
    aircraft_with_downlink = set()
    visited = set()

    for _, agent in self.agents.items():
        if agent._type == NodeType.BASESTATION:
            visited_stations = set()
            self.propagate_network_recursive(agent, visited, aircraft_with_downlink, visited_stations, 0)

    # print("\taircraft_with_downlink ", aircraft_with_downlink)

    for _, agent in self.agents.items():
        if agent._type == NodeType.AIRCRAFT:
            if agent in aircraft_with_downlink:
                agent._state["hasDownlink"] = 1
                agent.downlink_since = self._step_count
            else:
                agent._state["hasDownlink"] = 0
                agent.downlink_hops = -1
                agent.downlink_since = -1

def propagate_network_recursive(self, current, visited, aircraft_with_downlink, visited_stations, downlink_hops):
    visited.add(current)
    downlink_hops += 1
    # print("downlink_hops: ", downlink_hops)
    # print("Current: ", current)
    # print("Visited: ", visited)
    # print("Index: ", self.link_matrix[current].index)
    for terminal in current.terminals:
        link = terminal.link
        if link != None:
            other_end = link.get_other_end(current)
            # print("\tOther end id: ", other_end_id)
            # print("\tLink to other end: ", link_to_other_end)
            if link.state == LinkState.ESTABLISHED and (
                    not (other_end in visited) or other_end.downlink_hops > downlink_hops):
                other_end._state["hasDownlink"] = 1
                # print("Setting", other_end_id, " to 1")
                if other_end._type == NodeType.AIRCRAFT:
                    aircraft_with_downlink.add(other_end)
                    other_end.downlink_hops = downlink_hops
                elif other_end._type == NodeType.BASESTATION:
                    visited_stations.add(other_end)
                self.propagate_network_recursive(other_end, visited, aircraft_with_downlink, visited_stations,
                                                 downlink_hops)

Here is the docker-image fie:

RLLib Docker container based on Google Tensorflow 2.5.1

Assumes an environment configured to use a proxy if necessary

ARG GPU=-gpu
FROM tensorflow/tensorflow:2.5.1$GPU
ENV PYTHONPATH=/workspace
WORKDIR /workspace
COPY dev-image/certs/* /usr/local/share/ca-certificates
RUN update-ca-certificates
RUN apt-key del 7fa2af80
&& apt-key adv --fetch-keys http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1604/x86_64/3bf863cc.pub
&& apt-get update

RUN add-apt-repository “deb Index of /compute/cuda/repos/ubuntu2004/x86_64 /”
&& apt-get update
&& apt-get install libcudnn8
&& apt-get install libcudnn8-dev
RUN apt-get install -y ssh git
RUN python -m pip config --global set http.sslVerify false
RUN python -m pip config --global set global.trusted-host “s3-us-west-2.amazonaws.com pypi.org files.pythonhosted.org pypi.python.org
RUN python -m pip install --upgrade pip
RUN pip install https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp36-cp36m-manylinux2014_x86_64.whl
COPY requirements.txt .

RUN pip install -r requirements.txt
RUN useradd user
RUN mkdir /home/user
RUN chown -R user:user /home/user
USER user
ARG commit_id=c722ca6c7eed3d7987c0d5c3df5c45f6b15e77d1
RUN curl -sSL --insecure “https://update.code.visualstudio.com/commit:${commit_id}/server-linux-x64/stable” -o /tmp/vscode-server-linux-x64.tar.gz
RUN mkdir -p ~/.vscode-server/bin/${commit_id}
RUN tar zxvf /tmp/vscode-server-linux-x64.tar.gz -C ~/.vscode-server/bin/${commit_id} --strip 1
RUN touch ~/.vscode-server/bin/${commit_id}/0

VOLUME ["/workspace"]

TensorBoard port

EXPOSE 6006

My apologies if this is all a bit overwhelming. Thank you for your time!

Best,
Anup