How to get the best performance of Ray´s RLlib when running python scripts (using PPO) via a SLURM file on a HPC?

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Dear Ray-team!

Currently I am working with RLlib and PPO. I developed an agent that imitates an acc-controller. Means my agent should learn to keep a safe distance to a leading vehicle. Additionally, I implemented a custom torch model. All my python code is attached to my post.

I want to run my python scripts on a high-performance computer. I have access to 10 nodes, where each node has 2 CPUs and each CPU has 64 cores. For the configuration of the SLURM file I followed the instructions on the Ray homepage: Deploying on Slurm — Ray 2.6.3. I also attached the SLURM file to this post.

For the PPO configuration I set the number of GPUs to 0 (num_gpus=0 ) since I use CPUs for my calculations. The number of workers is set to 1 (num_rollout_workers=1) and the number of environments per worker is set to 30 (num_envs_per_worker=30).

I log in the step() function of the environment the cpu usage via psutil.cpu_percent(4) and I am wondering why the value is just 0.1%. I made experiments where I also increased the number of workers but the cpu usage is still 0.1%. And I think, that should not be the case, right?

My questions are:

How can I find out the optimal PPO-configuration, means the number of rollout workers, the environments of workers to get the best performance of Ray/RLlib in combination with an hpc? Are there other parameters in the PPO configurationI have to consider when I want to run my scripts on an hpc?

Actually, the SLURM file is working but I would like to know if my SLURM file is configured correctly to get the best performance of Ray in combination with an high performance computer?

I use:
Ray 2.6.3
Python 3.8

#############################################################
import os
import math
import tempfile
import gymnasium
import numpy as np
import numpy.random as random
from collections import deque
from datetime import date
import psutil
import ray
from ray.rllib.algorithms import ppo
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import override
from ray.rllib.models.torch.misc import SlimFC
from ray.rllib.utils.framework import try_import_torch
######################################################################
torch, nn = try_import_torch()
ray.init()
#########################################################
import math
import gymnasium
import os
import numpy as np
import numpy.random as random
from collections import deque
##########################################################

ROAD_LENGTH = 1000  # [meter]
ROAD_LENGTH_LEAD_VEHICLES = 1500  # [meter]
nr_dist_elements = 16
DISTANCES_VEL_PROFILE = np.linspace(0, ROAD_LENGTH_LEAD_VEHICLES, nr_dist_elements)
dist_consecutive_points = DISTANCES_VEL_PROFILE[1] - DISTANCES_VEL_PROFILE[0]
nr_vel_elements = 9
VELOCITIES_VEL_PROFILE = np.linspace(30 / 3.6, 130 / 3.6, nr_vel_elements)

class ADDemonstrator(gymnasium.Env):
    def __init__(self, config):
        self.reward_nr = config['reward_nr']
        self.task_id_ = config['task_id']
        self.action_space_low_ = -5  # -2
        self.action_space_high_ = 5  # 2
        self.delta_t_ = 0.1
        self.max_allowed_speed_ = 130/3.6
        self.ego_vel_desired_ = 0.
        self.time_headway_ = 1  # [sec]
        self.safe_dist_standstill_ = 1  # [meter]
        self.safe_dist_ = 0.
        self.min_dist_idx_ = 0
        self.ego_pos_ = 0.
        self.ego_s_ = 0.
        self.ego_vel_ = 0.
        self.reward_ = 0.
        self.training_steps_ = 0
        self.max_training_steps_per_episode_ = 1300
        self.reset()
        self.action_space = gymnasium.spaces.Box(low=self.action_space_low_, high=self.action_space_high_, dtype=np.float32, shape=(1,))
        self.observation_space = gymnasium.spaces.Box(low=np.array([0, 0, 0, 0, 0, 0, 0]), high=np.array([100, 100, 100, 100, 5000, 5000, 5000]), dtype=np.float32)
        self.reason_for_epsiode_end_ = 0
        self.filename_1 = "cpu_usage" + self.task_id_ + ".txt"
        self.filename_2 = "ram_memory_used" + self.task_id_ + ".txt"
        self.filename_3 = "ram_memory_GB" + self.task_id_ + ".txt"


    def step(self, action):
        file1 = open(self.filename_1, "a")
        file1.write(str(psutil.cpu_percent(4)))
        file1.write('\n')
        file2 = open(self.filename_2, "a")
        file2.write(str(psutil.virtual_memory()[2]))
        file2.write('\n')
        file3 = open(self.filename_3, "a")
        file3.write(str(psutil.virtual_memory()[3] / 1000000000))
        file3.write('\n')

        self.training_steps_ += 1

        try:
            assert ~(math.isnan(float(action))), "action in step() has nan value "
        except AssertionError as msg:
          print(msg)

        acceleration = float(action)

        #######
        # ego
        #######
        previous_ego_pos = self.ego_pos_
        self.ego_s_ = self.ego_vel_*self.delta_t_ + 0.5 * acceleration * self.delta_t_ * self.delta_t_
        if(self.ego_s_ < 0):
            self.ego_s_ = self.ego_vel_ * self.delta_t_

        self.ego_pos_ += self.ego_s_
        #self.elapsed_ego_dist_per_step_ = float(self.ego_pos_ - previous_ego_pos)

        self.ego_vel_ = self.ego_vel_ + acceleration * self.delta_t_
        if (self.ego_vel_ < 0):
            self.ego_vel_ = 0.0
        self.ego_vel_list_.append(self.ego_vel_)

        ##########
        # center
        ##########
        self.center_lead_pos_ += self.center_lead_vel_ * self.delta_t_
        self.distance2center_lead_ = self.center_lead_pos_ - self.ego_pos_
        if (self.distance2center_lead_ < 0.0):
            self.distance2center_lead_ = 0.0
        self.center_lead_vel_ = self.velocityProfile(self.center_velocity_profile_, self.center_lead_pos_)

        if (self.distance2center_lead_ >= 200):
            self.distance2center_lead_ = random.uniform(200, 210)

        ############
        # left
        ############
        self.left_lead_pos_ += self.left_lead_vel_ * self.delta_t_
        self.distance2left_lead_ = self.left_lead_pos_ - self.ego_pos_
        if (self.distance2left_lead_ < 0.0):
            self.distance2left_lead_ = 0.0
        self.left_lead_vel_ = self.velocityProfile(self.left_velocity_profile_, self.left_lead_pos_)
        if (self.distance2left_lead_ >= 200):
            self.distance2left_lead_ = random.uniform(200, 210)


        self.ego_dist_desired_ = self.ego_vel_ * self.time_headway_ + self.safe_dist_standstill_
        self.lead_distances_ = [self.distance2center_lead_, self.distance2left_lead_]
        self.min_dist_idx_ = self.idxOfMinDist(self.lead_distances_)

        self.ego_vel_desired_ = (self.lead_distances_[self.min_dist_idx_] - self.safe_dist_standstill_) / self.time_headway_

        if self.ego_vel_desired_ < 0:
            self.ego_vel_desired_ = 0.0

        self.ego_vel_desired_ = min(self.ego_vel_desired_, self.max_allowed_speed_)

        self.rewardCalculation(acceleration)
        self.next_state_ = (self.ego_vel_, self.ego_vel_desired_, self.center_lead_vel_, self.left_lead_vel_, self.ego_dist_desired_, self.distance2center_lead_, self.distance2left_lead_)

        try:
            assert ~(math.isnan(float(self.ego_vel_))), "ego velocity has nan value "
        except AssertionError as msg:
            print(msg)


        return np.array([self.ego_vel_, self.ego_vel_desired_, self.center_lead_vel_, self.left_lead_vel_, self.ego_dist_desired_, self.distance2center_lead_, self.distance2left_lead_]), self.reward_, self.done_, self.done_, {}
        #return np.array([self.ego_vel_, self.center_lead_vel_, self.left_lead_vel_, self.distance2center_lead_, self.distance2left_lead_]), self.reward_, self.done_, self.done_, {}


    def reset(self,  *, seed=None, options=None):
        ######
        # ego
        ######
        self.ego_vel_ = random.uniform(30 / 3.6, 130 / 3.6)  # [m/sec]

        self.ego_pos_ = 0.  # [m]
        self.ego_vel_list_ = []
        self.ego_vel_list_.append(self.ego_vel_)

        ########
        # center
        ########
        self.center_lead_pos_ = random.uniform(30, 60) #(20,60)
        self.center_velocity_profile_, self.center_acc_profile_ = self.initVelocityProfile(self.center_lead_pos_, self.ego_vel_)
        self.center_lead_vel_ = self.velocityProfile(self.center_velocity_profile_, self.center_lead_pos_)

        #######
        # left
        #######
        self.left_lead_pos_ = random.uniform(30, 60) #(20,60)
        self.left_velocity_profile_, self.left_acc_profile_ = self.initVelocityProfile(self.left_lead_pos_, self.ego_vel_)
        self.left_lead_vel_ = self.velocityProfile(self.left_velocity_profile_, self.left_lead_pos_)

        ##########
        # distance
        ##########
        self.distance2center_lead_ = self.center_lead_pos_ - self.ego_pos_
        self.distance2left_lead_ = self.left_lead_pos_ - self.ego_pos_

        self.reward_ = 0.
        self.done_ = False
        self.training_steps_ = 0


        self.ego_dist_desired_ = self.ego_vel_ * self.time_headway_ + self.safe_dist_standstill_
        self.lead_distances_ = [self.distance2center_lead_, self.distance2left_lead_]
        self.min_dist_idx_ = self.idxOfMinDist(self.lead_distances_)

        self.ego_vel_desired_ = (self.lead_distances_[self.min_dist_idx_] - self.safe_dist_standstill_) / self.time_headway_
        if self.ego_vel_desired_ < 0:
            self.ego_vel_desired_ = 0.0
        self.ego_vel_desired_ = min(self.ego_vel_desired_, self.max_allowed_speed_)

        try:
            assert ~(math.isnan(float(self.ego_vel_))), "ego velocity has nan value "
        except AssertionError as msg:
            print(msg)

        return np.array([self.ego_vel_, self.ego_vel_desired_, self.center_lead_vel_, self.left_lead_vel_, self.ego_dist_desired_ , self.distance2center_lead_, self.distance2left_lead_]), {}

    def initVelocityProfile(self, lead_init_pos, ego_init_vel):
        velocities_sampled = np.full(len(DISTANCES_VEL_PROFILE), 0.)
        acc_along_profile = np.full(len(DISTANCES_VEL_PROFILE), 0.)
        acc_along_profile[0] = 0
        velocities_sampled[0] = random.choice(VELOCITIES_VEL_PROFILE, 1)

        for cnt in range(0, len(DISTANCES_VEL_PROFILE) - 1):
            # while (True):
            velocity_tmp = random.choice(VELOCITIES_VEL_PROFILE, 1)
            # acc = (velocity_tmp ** 2) / (2 * dist_consecutive_points)
            acc = (1 / (2 * dist_consecutive_points)) * (velocity_tmp ** 2 - velocities_sampled[cnt] ** 2)
            if (acc >= -2 and acc <= 2):
                velocities_sampled[cnt + 1] = velocity_tmp
                acc_along_profile[cnt + 1] = acc
            else:
                velocities_sampled[cnt + 1] = velocities_sampled[cnt]
                acc_along_profile[cnt + 1] = acc_along_profile[cnt]
        return (velocities_sampled, acc_along_profile)


    def rewardCalculation(self, acceleration):
        self.reward_ = 0.
        self.reason_for_epsiode_end_ = 0

        if self.ego_pos_ > ROAD_LENGTH:
            self.done_ = True

        if self.training_steps_ >= self.max_training_steps_per_episode_:
            self.done_ = True

        if(self.reward_nr == 'reward_1'):
            print('REWARD FCT 1')
            if self.lead_distances_[self.min_dist_idx_] <= 3:
                self.reward_ = -20000
                self.done_ = True
            else:
                error = abs(self.ego_vel_ - self.ego_vel_desired_)
                self.reward_ = error * (-1)

        if (self.reward_nr  == 'reward_2'):
            print('REWARD FCT 2')
            if self.lead_distances_[self.min_dist_idx_] <= 3:
                self.reward_ = -500000
                self.done_ = True
            else:
                error = pow(abs(self.ego_vel_ - self.ego_vel_desired_), 2)
                self.reward_ = error * (-1)


    def idxOfMinDist(self, distance_list):
        # this function returns the index of the min value
        if distance_list[0] < distance_list[1]:
            return 0  # 0 == center
        return 1  # 1 == left


    def velocityProfile(self, velocity_profile, lead_pos):
        for cnt in range(0, len(DISTANCES_VEL_PROFILE) - 1):
            if (self.locateCurrentVehiclePosition(lead_pos, cnt)):
                self.slope = (velocity_profile[cnt + 1] - velocity_profile[cnt]) / (
                            DISTANCES_VEL_PROFILE[cnt + 1] - DISTANCES_VEL_PROFILE[cnt])
                self.dist_along_y_axis = velocity_profile[cnt] - self.slope * DISTANCES_VEL_PROFILE[cnt]
                velocity = self.slope * lead_pos + self.dist_along_y_axis
                return velocity

        if (lead_pos > ROAD_LENGTH_LEAD_VEHICLES):
            return 130 / 3.6


    def locateCurrentVehiclePosition(self, lead_pos, cnt):
        if (lead_pos > DISTANCES_VEL_PROFILE[cnt] and lead_pos < DISTANCES_VEL_PROFILE[cnt + 1]):
            return True
        return False




class TorchModel2HiddenLayer(TorchModelV2, nn.Module):
    def __init__(self, obs_space, action_space, num_outputs, model_config, name, hidden_units, **kwarg):
        TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
        nn.Module.__init__(self)

        self.hidden_units = 128
        num_inputs = obs_space.shape[0]

        self.critic = nn.Sequential(
            nn.Linear(num_inputs, self.hidden_units),
            nn.ReLU(),
            nn.Linear(self.hidden_units, self.hidden_units),
            nn.ReLU(),
            nn.Linear(self.hidden_units, self.hidden_units),
            nn.ReLU(),
            nn.Linear(self.hidden_units, 1)
        )

        self.actor = nn.Sequential(
            nn.Linear(num_inputs, self.hidden_units),
            nn.ReLU(),
            nn.Linear(self.hidden_units, self.hidden_units),
            nn.ReLU(),
            nn.Linear(self.hidden_units, self.hidden_units),
            nn.ReLU(),
            nn.Linear(self.hidden_units, num_outputs)
        )

    @override(ModelV2)
    def forward(self, input_dict, state, seq_lens):
        x = input_dict["obs"]
        self.v_values = self.critic(x)

        mean_std = self.actor(x)
        mean, logvar = torch.chunk(mean_std, 2, dim=1)
        self.mean_tanh = torch.tanh(mean)
        self.logvar_relu = torch.clamp(logvar, -torch.inf, 5)

        # log variables
        self.logvar = logvar
        self.mean = mean

        mean_std_new = torch.cat((self.mean_tanh, self.logvar_relu), 1)

        return mean_std_new, state

    @override(ModelV2)
    def value_function(self):
        return torch.reshape(self.v_values, [-1])

    @override(ModelV2)
    def metrics(self):
        return {"mean_tanh": (self.mean_tanh[0]).detach().numpy().mean(),
                "mean": (self.mean).detach().numpy().mean(),
                "logvar_relu": (self.logvar_relu).detach().numpy().mean(),
                "logvar": (self.logvar).detach().numpy().mean()
                }



ModelCatalog.register_custom_model("nn_model", TorchModel2HiddenLayer)

config = (
    ppo.PPOConfig()
    .environment(env=ADDemonstrator, env_config={"reward_nr": 'reward_1', "task_id": '1'})
    .framework("torch")
    .resources(num_gpus=0)
    .rollouts(num_rollout_workers=1, num_envs_per_worker=30, batch_mode="complete_episodes")
    .training(
        train_batch_size=1000,
        sgd_minibatch_size=32,
        num_sgd_iter=32,
        entropy_coeff=0.001,
        model={"custom_model": "nn_model"}
    ))

algo = config.build()

for n in range(10000):
    results = algo.train()
    print('reward: %f' % (results['episode_reward_mean']))
    checkpoint_dir = algo.save()

SLURM file

#!/bin/bash
#SBATCH --job-name=test
#SBATCH --qos=idle_0512 
#SBATCH --partition=zen3_0512 
#SBATCH --nodes=1 
#SBATCH --cpus-per-task=128
#SBATCH --array 1-10
#SBATCH --time=05:00:00 

module purge 

cd /home/fs72201/xxx/agent_training
source /home/fs72201/xxx/agent_training/py38-venv/bin/activate

# Obtain the head IP address
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
echo "nodes: $nodes"
nodes_array=($nodes)
echo "nodes_array: $nodes_array"

head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
export head_node_ip

# if we detect a space character in the head node IP, we'll
# convert it to an ipv4 address. This step is optional.
if [[ "$head_node_ip" == *" "* ]]; then
IFS=' ' read -ra ADDR <<<"$head_node_ip"
if [[ ${#ADDR[0]} -gt 16 ]]; then
  head_node_ip=${ADDR[1]}
else
  head_node_ip=${ADDR[0]}
fi
echo "IPV6 address detected. We split the IPV4 address as $head_node_ip"
fi

export port

# STARTING RAY HEAD NODE
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"

echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
    ray start --head --node-ip-address="$head_node_ip" --port=$port \
    --num-cpus "${SLURM_CPUS_PER_TASK}" --block &

echo "SLURM_CPUS_PER_TASK: ${SLURM_CPUS_PER_TASK}"


# STARTING RAY WORKER NODES
sleep 10

# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))
echo "worker_num: $worker_num"

for ((i = 1; i <= worker_num; i++)); do
    node_i=${nodes_array[$i]}
    echo "Starting WORKER $i at $node_i"
    srun --nodes=1 --ntasks=1 -w "$node_i" \
        ray start --address "$ip_head" \
        --num-cpus "${SLURM_CPUS_PER_TASK}" --block &
done

#wait

python3 -u /home/fs72201/xxx/agent_training/main.py --task_id "$SLURM_ARRAY_TASK_ID" 

I kindly ask for your support and many thanks in advance for your help!
BR,
Marlies MR