I am using ray for A3C algorithm. My system is Ubuntu with 1 GPU.This is my code:
import gym
from gym.envs.registration import register
import macad_gym # noqa F401
import argparse,logging
import os
from pprint import pprint
import cv2
import ray,sys,os
import ray.tune as tune
from gym.spaces import Box, Discrete
from macad_agents.rllib.env_wrappers import wrap_deepmind
from macad_agents.rllib.models import register_mnih15_net
from ray.rllib.agents.a3c.a3c_tf_policy import A3CTFPolicy #0.8.5
from ray.rllib.models.catalog import ModelCatalog
from ray.rllib.models.preprocessors import Preprocessor
from ray.tune import register_env
import time
import tensorflow as tf
from tensorboardX import SummaryWriter
from ray.tune.schedulers import PopulationBasedTraining
parser = argparse.ArgumentParser()
parser.add_argument(
“–env”,
default=“PongNoFrameskip-v4”,
help=“Name Gym env. Used only in debug mode. Default=PongNoFrameskip-v4”)
parser.add_argument(
“–disable-comet”,
action=“store_true”,
help=“Disables comet logging. Used for local smoke tests”)
parser.add_argument(
“–num-workers”,
default=2, #2 #fix
type=int,
help=“Num workers (CPU cores) to use”)
parser.add_argument(
“–num-gpus”, default=1, type=int, help=“Number of gpus to use. Default=2”)
parser.add_argument(
“–sample-bs-per-worker”, #one iteration
default=1024,
type=int,
help=“Number of samples in a batch per worker. Default=50”)
parser.add_argument(
“–train-bs”,
default=128,
type=int,
help=“Train batch size. Use as per available GPU mem. Default=500”)
parser.add_argument(
“–envs-per-worker”,
default=1,
type=int,
help=“Number of env instances per worker. Default=10”)
parser.add_argument(
“–notes”,
default=None,
help=“Custom experiment description to be added to comet logs”)
parser.add_argument(
“–model-arch”,
default=“mnih15”,
help=“Model architecture to use. Default=mnih15”)
parser.add_argument(
“–num-steps”,
default=4000000,
type=int,
help=“Number of steps to train. Default=20M”)
parser.add_argument(
“–num-iters”,
default=50,
type=int,
help=“Number of training iterations. Default=20”)
parser.add_argument(
“–log-graph”,
action=“store_true”,
help=“Write TF graph on Tensorboard for debugging”,default=True)
parser.add_argument(
“–num-framestack”,
type=int,
default=4,
help=“Number of obs frames to stack”)
parser.add_argument(
“–debug”, action=“store_true”, help=“Run in debug-friendly mode”, default=False)
parser.add_argument(
“–redis-address”,
default=None,
help=“Address of ray head node. Be sure to start ray with”
“ray start --redis-address <…> --num-gpus<.> before running this script”)
parser.add_argument(
“–use-lstm”, action=“store_true”, help=“Append a LSTM cell to the model”,default=True)
args = parser.parse_args()
model_name = args.model_arch
if model_name == “mnih15”:
register_mnih15_net() # Registers mnih15
else:
print(“Unsupported model arch. Using default”)
register_mnih15_net()
model_name = “mnih15”
env_name=‘A3C_four_way_train-v0’
env = gym.make(env_name)
print (env.spec.max_episode_steps,“-±±±±±±±±±±±±±±±±±+”)
env.spec.max_episode_steps=1024
print (env.spec.max_episode_steps,“-±±±±±±±±±±±±±±±±±+”)
env_actor_configs = env.configs
num_framestack = args.num_framestack
env_config[“env”][“render”] = False
def env_creator(env_config):
import macad_gym
env = gym.make("A3C_four_way_train-v0")
# Apply wrappers to: convert to Grayscale, resize to 84 x 84,
# stack frames & some more op
env = wrap_deepmind(env, dim=84, num_framestack=num_framestack)
return env
env = wrap_deepmind(env, dim=84, num_framestack=num_framestack)
register_env(env_name, lambda config: env_creator(config))
# Placeholder to enable use of a custom pre-processor
class ImagePreproc(Preprocessor):
def _init_shape(self, obs_space, options):
self.shape = (84, 84, 3) # Adjust third dim if stacking frames
return self.shape
def transform(self, observation):
observation = cv2.resize(observation, (self.shape[0], self.shape[1]))
return observation
def transform(self, observation):
observation = cv2.resize(observation, (self.shape[0], self.shape[1]))
return observation
ModelCatalog.register_custom_preprocessor(“sq_im_84”, ImagePreproc)
if args.redis_address is not None:
# num_gpus (& num_cpus) must not be provided when connecting to an
# existing cluster
ray.init(redis_address=args.redis_address,lru_evict=True, log_to_driver=False)
else:
ray.init(num_gpus=args.num_gpus,lru_evict=True, log_to_driver=False)
config = {
# Model and preprocessor options.
“model”: {
“custom_model”: model_name,
“custom_options”: {
# Custom notes for the experiment
“notes”: {
“args”: vars(args)
},
},
# NOTE:Wrappers are applied by RLlib if custom_preproc is NOT specified
“custom_preprocessor”: “sq_im_84”,
“dim”: 84,
“free_log_std”: False, # if args.discrete_actions else True,
“grayscale”: True,
# conv_filters to be used with the custom CNN model.
# “conv_filters”: [[16, [4, 4], 2], [32, [3, 3], 2], [16, [3, 3], 2]]
},
# preproc_pref is ignored if custom_preproc is specified
# “preprocessor_pref”: “deepmind”,
# env_config to be passed to env_creator
"env_config": env_actor_configs
}
def default_policy():
env_actor_configs[“env”][“render”] = False
config = {
# Model and preprocessor options.
"model": {
"custom_model": model_name,
"custom_options": {
# Custom notes for the experiment
"notes": {
"args": vars(args)
},
},
# NOTE:Wrappers are applied by RLlib if custom_preproc is NOT specified
"custom_preprocessor": "sq_im_84",
"dim": 84,
"free_log_std": False, # if args.discrete_actions else True,
"grayscale": True,
# conv_filters to be used with the custom CNN model.
# "conv_filters": [[16, [4, 4], 2], [32, [3, 3], 2], [16, [3, 3], 2]]
},
# Should use a critic as a baseline (otherwise don't use value baseline;
# required for using GAE).
"use_critic": True,
# If true, use the Generalized Advantage Estimator (GAE)
# with a value function, see https://arxiv.org/pdf/1506.02438.pdf.
"use_gae": True,
# Size of rollout batch
"rollout_fragment_length": 10,
# GAE(gamma) parameter
"lambda": 1.0,
# Max global norm for each gradient calculated by worker
"grad_clip": 40.0,
"epsilon":
0.1,
# Learning rate
"lr": 0.0001,
# Learning rate schedule
"lr_schedule": None,
# Value Function Loss coefficient
"vf_loss_coeff": 0.5,
# Entropy coefficient
"entropy_coeff": 0.01,
# Min time per iteration
"min_iter_time_s": 5,
# Workers sample async. Note that this increases the effective
# rollout_fragment_length by up to 5x due to async buffering of batches.
"sample_async": True,
# Discount factor of the MDP.
"gamma": 0.9,
# Number of steps after which the episode is forced to terminate. Defaults
# to `env.spec.max_episode_steps` (if present) for Gym envs.
"horizon": 1024,
# Calculate rewards but don't reset the environment when the horizon is
# hit. This allows value estimation and RNN state to span across logical
# episodes denoted by horizon. This only has an effect if horizon != inf.
"soft_horizon": True,
# Don't set 'done' at the end of the episode. Note that you still need to
# set this if soft_horizon=True, unless your env is actually running
# forever without returning done=True.
"no_done_at_end": True,
"monitor": True,
# System params.
# Should be divisible by num_envs_per_worker
"sample_batch_size":
args.sample_bs_per_worker,
"train_batch_size":
args.train_bs,
# "rollout_fragment_length": 128,
"num_workers":
args.num_workers,
# Number of environments to evaluate vectorwise per worker.
"num_envs_per_worker":
args.envs_per_worker,
"num_cpus_per_worker":
1,
"num_gpus_per_worker":
1,
# "eager_tracing": True,
# # Learning params.
# "grad_clip":
# 40.0,
# "clip_rewards":
# True,
# either "adam" or "rmsprop"
"opt_type":
"adam",
# "lr":
# 0.003,
"lr_schedule": [
[0, 0.0006],
[20000000, 0.000000000001], # Anneal linearly to 0 from start 2 end
],
# rmsprop considered
"decay":
0.5,
"momentum":
0.0,
# # balancing the three losses
# "vf_loss_coeff":
# 0.5, # Baseline loss scaling
# "entropy_coeff":
# -0.01,
# preproc_pref is ignored if custom_preproc is specified
# "preprocessor_pref": "deepmind",
“gamma”: 0.99,
"use_lstm": args.use_lstm,
# env_config to be passed to env_creator
"env":{
"render": False
},
# "in_evaluation": True,
# "evaluation_num_episodes": 1,
"env_config": env_actor_configs
}
# pprint (config)
return (A3CTFPolicy, Box(0.0, 255.0, shape=(84, 84, 3)), Discrete(9),config)
pprint (args.checkpoint_path)
pprint(os.path.isfile(args.checkpoint_path))
if args.debug:
# For checkpoint loading and retraining (not used in this script)
experiment_spec = tune.Experiment(
“multi-carla/” + args.model_arch,
“A3C”,
# restore=args.checkpoint_path,
# timesteps_total is init with None (not 0) which causes issue
# stop={“timesteps_total”: args.num_steps},
stop={“timesteps_since_restore”: args.num_steps},
config=config,
# checkpoint_freq=1000, #1000
# checkpoint_at_end=True,
resources_per_trial={
“cpu”: 1,
“gpu”: 1
})
experiment_spec = tune.run_experiments({
"MA-Inde-A3C-SUI1B2C1PCARLA": {
"run": "A3C",
"env": env_name,
"stop": {
"training_iteration": args.num_iters,
"timesteps_total": args.num_steps,
"episodes_total": 1024,
},
# "restore":args.checkpoint_path,
"config": {
"log_level": "DEBUG",
# "num_sgd_iter": 10, # Enables Experience Replay
"multiagent": {
"policies": {
id: default_policy()
for id in env_actor_configs["actors"].keys()
},
"policy_mapping_fn":
tune.function(lambda agent_id: agent_id),
"policies_to_train": ["car2","car3"],
},
"env_config": env_actor_configs,
"num_workers": args.num_workers,
"num_envs_per_worker": args.envs_per_worker,
"sample_batch_size": args.sample_bs_per_worker,
"train_batch_size": args.train_bs,
"horizon": 512,
},
"checkpoint_freq": 5,
"checkpoint_at_end": True,
}
})
else:
pbt = PopulationBasedTraining(
time_attr=args.num_iters,
metric ='episode_reward_mean',
mode = 'max',
# reward_attr='car2PPO/policy_reward_mean',
perturbation_interval=2,
resample_probability=0.5,
quantile_fraction=0.5, # copy bottom % with top %
# Specifies the search space for these hyperparams
hyperparam_mutations={
# "lambda": [0.9, 1.0],
# "clip_param": [0.1, 0.5],
"lr":[1e-3, 1e-5],
},
log_config=True,)
# custom_explore_fn=explore)
analysis = tune.run(
"A3C",
name="A3C_Four_way",
scheduler=pbt,
verbose=1,
reuse_actors=True,
# num_samples=args.num_samples,
stop={
# "timesteps_since_restore": args.num_steps,
"training_iteration": args.num_iters,
"timesteps_total": args.num_steps,
"episodes_total": 500,},
config= {
"env": env_name,
"log_level": "DEBUG",
# "num_sgd_iter": 4, # Enables Experience Replay
"multiagent": {
"policies": {
id: default_policy()
for id in env_actor_configs["actors"].keys()
},
"policy_mapping_fn":
tune.function(lambda agent_id: agent_id),
"policies_to_train": ["carA3C"], #car2PPO is Autonomous driving models
},
"env_config": env_actor_configs,
"num_workers": args.num_workers,
"num_envs_per_worker": args.envs_per_worker,
"sample_batch_size": args.sample_bs_per_worker,
"train_batch_size": args.train_bs,
#"horizon": 512, #yet to be fixed
},
checkpoint_freq = 5,
checkpoint_at_end = True,
)
ray.shutdown()
Training get started but later I get error like this:
2024-07-13 04:25:08.938498: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library ‘libnvinfer.so.6’; dlerror: libnvinfer.so.6: cannot open shared object file: No such file or directory
2024-07-13 04:25:08.938568: W tensorflow/stream_executor/platform/default/dso_loader.cc:55] Could not load dynamic library ‘libnvinfer_plugin.so.6’; dlerror: libnvinfer_plugin.so.6: cannot open shared object file: No such file or directory
2024-07-13 04:25:08.938577: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:30] Cannot dlopen some TensorRT libraries. If you would like to use Nvidia GPU with TensorRT, please make sure the missing libraries mentioned above are installed properly.
None -±±±±±±±±±±±±±±±±±+
1024 -±±±±±±±±±±±±±±±±±+
2024-07-13 04:25:09,896 INFO resource_spec.py:212 – Starting Ray with 13.48 GiB memory available for workers and up to 6.74 GiB for objects. You can adjust these settings with ray.init(memory=, object_store_memory=).
2024-07-13 04:25:10,257 INFO services.py:1148 – View the Ray dashboard at localhost:8267
2024-07-13 04:25:10,386 WARNING sample.py:27 – DeprecationWarning: wrapping <function at 0x7f23fb88dd08> with tune.function() is no longer needed
2024-07-13 04:25:10,519 ERROR trial_executor.py:64 – Trial A3C_A3C_four_way_train-v0_00000: Error checkpointing trial metadata.
Traceback (most recent call last):
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/trial_executor.py”, line 61, in try_checkpoint_metadata
self._cached_trial_state[trial.trial_id] = trial.getstate()
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/trial.py”, line 551, in getstate
self.result_logger.flush(sync_down=False)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/logger.py”, line 330, in flush
_logger.flush()
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/logger.py”, line 237, in flush
self._file_writer.flush()
AttributeError: ‘SummaryWriter’ object has no attribute ‘flush’
== Status ==
Memory usage on this node: 9.8/31.2 GiB
PopulationBasedTraining: 0 checkpoints, 0 perturbs
Resources requested: 3/16 CPUs, 0/1 GPUs, 0.0/13.48 GiB heap, 0.0/4.64 GiB objects
Result logdir: /home/ray_results/A3C_Four_way
Number of trials: 1 (1 RUNNING)
±--------------------------------±---------±------+
| Trial name | status | loc |
|---------------------------------±---------±------|
| A3C_A3C_four_way_train-v0_00000 | RUNNING | |
±--------------------------------±---------±------+
2024-07-13 04:35:16,915 ERROR trial_runner.py:521 – Trial A3C_A3C_four_way_train-v0_00000: Error processing event.
Traceback (most recent call last):
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/trial_runner.py”, line 467, in _process_trial
result = self.trial_executor.fetch_result(trial)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/ray_trial_executor.py”, line 381, in fetch_result
result = ray.get(trial_future[0], DEFAULT_GET_TIMEOUT)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/worker.py”, line 1513, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(Empty): ray::A3C.train() (pid=28802, ip=192.168.15.93)
File “python/ray/_raylet.pyx”, line 452, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 407, in ray._raylet.execute_task.function_executor
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/agents/trainer.py”, line 502, in train
raise e
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/agents/trainer.py”, line 491, in train
result = Trainable.train(self)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/tune/trainable.py”, line 261, in train
result = self._train()
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/agents/trainer_template.py”, line 142, in _train
return self._train_exec_impl()
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/agents/trainer_template.py”, line 174, in _train_exec_impl
res = next(self.train_exec_impl)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 634, in next
return next(self.built_iterator)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 644, in apply_foreach
for item in it:
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 685, in apply_filter
for item in it:
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 644, in apply_foreach
for item in it:
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 670, in add_wait_hooks
item = next(it)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 644, in apply_foreach
for item in it:
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 470, in base_iterator
yield ray.get(obj_id)
ray.exceptions.RayTaskError(Empty): ray::RolloutWorker.par_iter_next() (pid=28805, ip=192.168.15.93)
File “python/ray/_raylet.pyx”, line 452, in ray._raylet.execute_task
File “python/ray/_raylet.pyx”, line 407, in ray._raylet.execute_task.function_executor
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 961, in par_iter_next
return next(self.local_it)
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/util/iter.py”, line 644, in apply_foreach
for item in it:
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/evaluation/rollout_worker.py”, line 251, in gen_rollouts
yield self.sample()
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/evaluation/rollout_worker.py”, line 492, in sample
batches = [self.input_reader.next()]
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py”, line 53, in next
batches = [self.get_data()]
File “/home/miniconda3/envs/newbench/lib/python3.6/site-packages/ray/rllib/evaluation/sampler.py”, line 199, in get_data
rollout = self.queue.get(timeout=600.0)
File “/home/miniconda3/envs/newbench/lib/python3.6/queue.py”, line 172, in get
raise Empty
queue.Empty
== Status ==
Memory usage on this node: 11.6/31.2 GiB
PopulationBasedTraining: 0 checkpoints, 0 perturbs
Resources requested: 0/16 CPUs, 0/1 GPUs, 0.0/13.48 GiB heap, 0.0/4.64 GiB objects
Result logdir: /home/ray_results/A3C_Four_way
Number of trials: 1 (1 ERROR)
How to solve this?