How severe does this issue affect your experience of using Ray?
- High: I can´t do hyper parameter training and it hinders my progress.
Dear Ray-team!
Currently I am working with RLlib and PPO. I developed an agent that imitates an acc-controller. Means my agent should learn to keep a safe distance to a leading vehicle. Additionally, I implemented a custom torch model. I attached my code below.
Now I want to try out different sets of hyper parameters. Therefore I want to use the power of parallelization and tried to increase the number of num_rollout_workers from one to 128. I run my python scripts on a high-performance computer. I have access to 10 nodes, where each node has 2 CPUs and each CPU has 64 cores.
Unfortunately I can´t measure a significant time difference between between 1 rollout worker and 128 rollout workers. Actually, I expected that it must be 10-times (or more) faster as soon as I increase the number of rollout workers.
Thanks in advance for your support!
I use:
Ray 2.6.0
Python 3.8
#############################################################
import os
import math
import time
import tempfile
import gymnasium
import numpy as np
import numpy.random as random
from collections import deque
from datetime import date
import ray
from ray.rllib.algorithms import ppo
from ray.rllib.models import ModelCatalog
from ray.rllib.models.torch.torch_modelv2 import TorchModelV2
from ray.rllib.models.modelv2 import ModelV2
from ray.rllib.utils.annotations import override
from ray.rllib.models.torch.misc import SlimFC
from ray.rllib.utils.framework import try_import_torch
######################################################################
torch, nn = try_import_torch()
ray.init()
#########################################################
import math
import gymnasium
import os
import numpy as np
import numpy.random as random
from collections import deque
##########################################################
ROAD_LENGTH = 1000 # [meter]
ROAD_LENGTH_LEAD_VEHICLES = 1500 # [meter]
nr_dist_elements = 16
DISTANCES_VEL_PROFILE = np.linspace(0, ROAD_LENGTH_LEAD_VEHICLES, nr_dist_elements)
dist_consecutive_points = DISTANCES_VEL_PROFILE[1] - DISTANCES_VEL_PROFILE[0]
nr_vel_elements = 9
VELOCITIES_VEL_PROFILE = np.linspace(30 / 3.6, 130 / 3.6, nr_vel_elements)
class ADDemonstrator(gymnasium.Env):
def __init__(self, config):
self.reward_nr = config['reward_nr']
self.task_id_ = config['task_id']
self.action_space_low_ = -5 # -2
self.action_space_high_ = 5 # 2
self.delta_t_ = 0.1
self.max_allowed_speed_ = 130/3.6
self.ego_vel_desired_ = 0.
self.time_headway_ = 1 # [sec]
self.safe_dist_standstill_ = 1 # [meter]
self.safe_dist_ = 0.
self.min_dist_idx_ = 0
self.ego_pos_ = 0.
self.ego_s_ = 0.
self.ego_vel_ = 0.
self.reward_ = 0.
self.training_steps_ = 0
self.max_training_steps_per_episode_ = 1300
self.reset()
self.action_space = gymnasium.spaces.Box(low=self.action_space_low_, high=self.action_space_high_, dtype=np.float32, shape=(1,))
self.observation_space = gymnasium.spaces.Box(low=np.array([0, 0, 0, 0, 0, 0, 0]), high=np.array([100, 100, 100, 100, 5000, 5000, 5000]), dtype=np.float32)
self.reason_for_epsiode_end_ = 0
def step(self, action):
self.training_steps_ += 1
try:
assert ~(math.isnan(float(action))), "action in step() has nan value "
except AssertionError as msg:
print(msg)
acceleration = float(action)
#######
# ego
#######
previous_ego_pos = self.ego_pos_
self.ego_s_ = self.ego_vel_*self.delta_t_ + 0.5 * acceleration * self.delta_t_ * self.delta_t_
if(self.ego_s_ < 0):
self.ego_s_ = self.ego_vel_ * self.delta_t_
self.ego_pos_ += self.ego_s_
#self.elapsed_ego_dist_per_step_ = float(self.ego_pos_ - previous_ego_pos)
self.ego_vel_ = self.ego_vel_ + acceleration * self.delta_t_
if (self.ego_vel_ < 0):
self.ego_vel_ = 0.0
self.ego_vel_list_.append(self.ego_vel_)
##########
# center
##########
self.center_lead_pos_ += self.center_lead_vel_ * self.delta_t_
self.distance2center_lead_ = self.center_lead_pos_ - self.ego_pos_
if (self.distance2center_lead_ < 0.0):
self.distance2center_lead_ = 0.0
self.center_lead_vel_ = self.velocityProfile(self.center_velocity_profile_, self.center_lead_pos_)
if (self.distance2center_lead_ >= 200):
self.distance2center_lead_ = random.uniform(200, 210)
############
# left
############
self.left_lead_pos_ += self.left_lead_vel_ * self.delta_t_
self.distance2left_lead_ = self.left_lead_pos_ - self.ego_pos_
if (self.distance2left_lead_ < 0.0):
self.distance2left_lead_ = 0.0
self.left_lead_vel_ = self.velocityProfile(self.left_velocity_profile_, self.left_lead_pos_)
if (self.distance2left_lead_ >= 200):
self.distance2left_lead_ = random.uniform(200, 210)
self.ego_dist_desired_ = self.ego_vel_ * self.time_headway_ + self.safe_dist_standstill_
self.lead_distances_ = [self.distance2center_lead_, self.distance2left_lead_]
self.min_dist_idx_ = self.idxOfMinDist(self.lead_distances_)
self.ego_vel_desired_ = (self.lead_distances_[self.min_dist_idx_] - self.safe_dist_standstill_) / self.time_headway_
if self.ego_vel_desired_ < 0:
self.ego_vel_desired_ = 0.0
self.ego_vel_desired_ = min(self.ego_vel_desired_, self.max_allowed_speed_)
self.rewardCalculation(acceleration)
self.next_state_ = (self.ego_vel_, self.ego_vel_desired_, self.center_lead_vel_, self.left_lead_vel_, self.ego_dist_desired_, self.distance2center_lead_, self.distance2left_lead_)
try:
assert ~(math.isnan(float(self.ego_vel_))), "ego velocity has nan value "
except AssertionError as msg:
print(msg)
return np.array([self.ego_vel_, self.ego_vel_desired_, self.center_lead_vel_, self.left_lead_vel_, self.ego_dist_desired_, self.distance2center_lead_, self.distance2left_lead_]), self.reward_, self.done_, self.done_, {}
#return np.array([self.ego_vel_, self.center_lead_vel_, self.left_lead_vel_, self.distance2center_lead_, self.distance2left_lead_]), self.reward_, self.done_, self.done_, {}
def reset(self, *, seed=None, options=None):
######
# ego
######
self.ego_vel_ = random.uniform(30 / 3.6, 130 / 3.6) # [m/sec]
self.ego_pos_ = 0. # [m]
self.ego_vel_list_ = []
self.ego_vel_list_.append(self.ego_vel_)
########
# center
########
self.center_lead_pos_ = random.uniform(30, 60) #(20,60)
self.center_velocity_profile_, self.center_acc_profile_ = self.initVelocityProfile(self.center_lead_pos_, self.ego_vel_)
self.center_lead_vel_ = self.velocityProfile(self.center_velocity_profile_, self.center_lead_pos_)
#######
# left
#######
self.left_lead_pos_ = random.uniform(30, 60) #(20,60)
self.left_velocity_profile_, self.left_acc_profile_ = self.initVelocityProfile(self.left_lead_pos_, self.ego_vel_)
self.left_lead_vel_ = self.velocityProfile(self.left_velocity_profile_, self.left_lead_pos_)
##########
# distance
##########
self.distance2center_lead_ = self.center_lead_pos_ - self.ego_pos_
self.distance2left_lead_ = self.left_lead_pos_ - self.ego_pos_
self.reward_ = 0.
self.done_ = False
self.training_steps_ = 0
self.ego_dist_desired_ = self.ego_vel_ * self.time_headway_ + self.safe_dist_standstill_
self.lead_distances_ = [self.distance2center_lead_, self.distance2left_lead_]
self.min_dist_idx_ = self.idxOfMinDist(self.lead_distances_)
self.ego_vel_desired_ = (self.lead_distances_[self.min_dist_idx_] - self.safe_dist_standstill_) / self.time_headway_
if self.ego_vel_desired_ < 0:
self.ego_vel_desired_ = 0.0
self.ego_vel_desired_ = min(self.ego_vel_desired_, self.max_allowed_speed_)
try:
assert ~(math.isnan(float(self.ego_vel_))), "ego velocity has nan value "
except AssertionError as msg:
print(msg)
return np.array([self.ego_vel_, self.ego_vel_desired_, self.center_lead_vel_, self.left_lead_vel_, self.ego_dist_desired_ , self.distance2center_lead_, self.distance2left_lead_]), {}
def initVelocityProfile(self, lead_init_pos, ego_init_vel):
velocities_sampled = np.full(len(DISTANCES_VEL_PROFILE), 0.)
acc_along_profile = np.full(len(DISTANCES_VEL_PROFILE), 0.)
acc_along_profile[0] = 0
velocities_sampled[0] = random.choice(VELOCITIES_VEL_PROFILE, 1)
for cnt in range(0, len(DISTANCES_VEL_PROFILE) - 1):
# while (True):
velocity_tmp = random.choice(VELOCITIES_VEL_PROFILE, 1)
# acc = (velocity_tmp ** 2) / (2 * dist_consecutive_points)
acc = (1 / (2 * dist_consecutive_points)) * (velocity_tmp ** 2 - velocities_sampled[cnt] ** 2)
if (acc >= -2 and acc <= 2):
velocities_sampled[cnt + 1] = velocity_tmp
acc_along_profile[cnt + 1] = acc
else:
velocities_sampled[cnt + 1] = velocities_sampled[cnt]
acc_along_profile[cnt + 1] = acc_along_profile[cnt]
return (velocities_sampled, acc_along_profile)
def rewardCalculation(self, acceleration):
self.reward_ = 0.
self.reason_for_epsiode_end_ = 0
if self.ego_pos_ > ROAD_LENGTH:
self.done_ = True
if self.training_steps_ >= self.max_training_steps_per_episode_:
self.done_ = True
#print('REWARD')
#print(self.reward_nr)
#print(type(self.reward_nr))
if(self.reward_nr == 'reward_1'):
print('REWARD FCT 1')
if self.lead_distances_[self.min_dist_idx_] <= 3:
self.reward_ = -20000
self.done_ = True
else:
error = abs(self.ego_vel_ - self.ego_vel_desired_)
self.reward_ = error * (-1)
if (self.reward_nr == 'reward_2'):
print('REWARD FCT 2')
if self.lead_distances_[self.min_dist_idx_] <= 3:
self.reward_ = -500000
self.done_ = True
else:
error = pow(abs(self.ego_vel_ - self.ego_vel_desired_), 2)
self.reward_ = error * (-1)
def idxOfMinDist(self, distance_list):
# this function returns the index of the min value
if distance_list[0] < distance_list[1]:
return 0 # 0 == center
return 1 # 1 == left
def velocityProfile(self, velocity_profile, lead_pos):
for cnt in range(0, len(DISTANCES_VEL_PROFILE) - 1):
if (self.locateCurrentVehiclePosition(lead_pos, cnt)):
self.slope = (velocity_profile[cnt + 1] - velocity_profile[cnt]) / (
DISTANCES_VEL_PROFILE[cnt + 1] - DISTANCES_VEL_PROFILE[cnt])
self.dist_along_y_axis = velocity_profile[cnt] - self.slope * DISTANCES_VEL_PROFILE[cnt]
velocity = self.slope * lead_pos + self.dist_along_y_axis
return velocity
if (lead_pos > ROAD_LENGTH_LEAD_VEHICLES):
return 130 / 3.6
def locateCurrentVehiclePosition(self, lead_pos, cnt):
if (lead_pos > DISTANCES_VEL_PROFILE[cnt] and lead_pos < DISTANCES_VEL_PROFILE[cnt + 1]):
return True
return False
class TorchModel2HiddenLayer(TorchModelV2, nn.Module):
def __init__(self, obs_space, action_space, num_outputs, model_config, name, **kwarg):
TorchModelV2.__init__(self, obs_space, action_space, num_outputs, model_config, name)
nn.Module.__init__(self)
self.hidden_units = 128
num_inputs = obs_space.shape[0]
self.critic = nn.Sequential(
nn.Linear(num_inputs, self.hidden_units),
nn.ReLU(),
nn.Linear(self.hidden_units, self.hidden_units),
nn.ReLU(),
nn.Linear(self.hidden_units, self.hidden_units),
nn.ReLU(),
nn.Linear(self.hidden_units, 1)
)
self.actor = nn.Sequential(
nn.Linear(num_inputs, self.hidden_units),
nn.ReLU(),
nn.Linear(self.hidden_units, self.hidden_units),
nn.ReLU(),
nn.Linear(self.hidden_units, self.hidden_units),
nn.ReLU(),
nn.Linear(self.hidden_units, num_outputs)
)
@override(ModelV2)
def forward(self, input_dict, state, seq_lens):
x = input_dict["obs"]
self.v_values = self.critic(x)
mean_std = self.actor(x)
mean, logvar = torch.chunk(mean_std, 2, dim=1)
self.mean_tanh = torch.tanh(mean)
self.logvar_relu = torch.clamp(logvar, -torch.inf, 5)
# log variables
self.logvar = logvar
self.mean = mean
mean_std_new = torch.cat((self.mean_tanh, self.logvar_relu), 1)
return mean_std_new, state
@override(ModelV2)
def value_function(self):
return torch.reshape(self.v_values, [-1])
@override(ModelV2)
def metrics(self):
return {"mean_tanh": (self.mean_tanh[0]).detach().numpy().mean(),
"mean": (self.mean).detach().numpy().mean(),
"logvar_relu": (self.logvar_relu).detach().numpy().mean(),
"logvar": (self.logvar).detach().numpy().mean()
}
ModelCatalog.register_custom_model("nn_model", TorchModel2HiddenLayer)
config = (
ppo.PPOConfig()
.environment(env=ADDemonstrator, env_config={"reward_nr": 'reward_1', "task_id": '1'})
.framework("torch")
.resources(num_gpus=0)
.rl_module( _enable_rl_module_api=False)
.rollouts(num_rollout_workers=128, num_envs_per_worker=1, batch_mode="complete_episodes")
.training(
_enable_learner_api=False,
train_batch_size=1000,
sgd_minibatch_size=32,
num_sgd_iter=32,
entropy_coeff=0.001,
model={"custom_model": "nn_model"}
))
algo = config.build()
for n in range(10):
start_time = time.time()
results = algo.train()
print('reward: %f' % (results['episode_reward_mean']))
checkpoint_dir = algo.save()
end_time = time.time()
print("duration: ", end_time - start_time)