My objective:
Train a DDPG agent that performs well in my costume environment.
My implementation:
I try to achieve the objective via the following two steps.
step 1: Train DDPG agents. During training, I run tune to perform grid searches, where each run generates a trained agent.
Here is my code for performing grid searches. For each agent, I save a checkpoint for every 10 iterations.
resource = ray.cluster_resources()
num_trial = 4
num_cpu = resource['CPU'] / num_trial
num_gpu = resource['GPU'] / num_trial
resources_per_trial = {
"cpu": num_cpu,
"gpu": num_gpu,
}
stop = {
"training_iteration": 80,
}
config = {
"env_config": {
"num_node": 10,
"run_ch": 6,
"len_record": 100,
"time_max": 10 ** 6,
},
"agent_config": {
"critic_lr": tune.grid_search([2e-3, 3e-3]),
"gamma": 0.995,
"framework": "tf",
"soft_horizon": True, # It is a continues env instead of an episodic one.
},
"train_config": {
"num_iters": 50,
},
"chkpt_root": chkpt_root,
}
asha_shcedular = tune.schedulers.ASHAScheduler(
metric="episode_reward_mean",
mode="max"
)
results = tune.run(
Trainable,
config = config,
resources_per_trial = resources_per_trial,
stop = stop,
local_dir = chkpt_root,
scheduler = asha_shcedular,
checkpoint_freq = 10,
checkpoint_at_end = True,
)
Here is my Trainable class.
from envs.my_costume_env import MyCostumeEnv
from ray.rllib.agents.ddpg import DDPGTrainer, DEFAULT_CONFIG
from ray import tune
from ray.tune.registry import register_env
class Trainable(tune.Trainable):
def setup(self, config):
self.got_config = {}
self.got_config = config
def step(self):
agent_config = DEFAULT_CONFIG.copy()
for key in self.got_config.get("agent_config"):
agent_config[key] = self.got_config["agent_config"][key]
agent_config["env_config"] = self.got_config["env_config"]
chkpt_root = self.got_config["chkpt_root"]
select_env = "my_env_v2"
register_env(select_env, lambda x: MyCostumeEnv())
agent = DDPGTrainer(config=agent_config, env=MyCostumEnv)
num_iters = self.got_config["train_config"]["num_iters"]
results = 0
for ith in range(num_iters):
results = agent.train()
print("Trainable: ran iter ({}/{}).".format(ith, num_iters))
chkpt_path = agent.save(checkpoint_dir=chkpt_root)
return results
Step 2: Select the best trained agent and then restore it to run in my custome environment.
I select the best trained agent as follows.
from envs.my_costume_env import MyCostumeEnv
from ray.rllib.agents.ddpg import DDPGTrainer, DEFAULT_CONFIG
chkpt = tune.Analysis(experiment_dir=chkpt_root)
chkpt_config = chkpt.get_best_config(metric="episode_reward_mean", mode="max")
chkpt_path = results.get_trial_checkpoints_paths(
trial=results.get_best_trial(metric="episode_reward_mean", mode="max", scope="all"),
metric="episode_reward_mean",
)
print("chkpt_path:", chkpt_path)
agent_config = DEFAULT_CONFIG.copy()
for key in chkpt_config.get("agent_config"):
agent_config[key] = chkpt_config["agent_config"][key]
agent_config["env_config"] = chkpt_config["env_config"]
agent = DDPGTrainer(config=agent_config, env=MyCostumeEnv)
agent.restore(chkpt_path[0][0])
env = MyCostumEnv(config["env_config"])
state = env.reset()
sum_reward = 0
num_step = 10 ** 7
for step in range(num_step):
action = agent.compute_action(state)
state, reward, done, info = env.step(action)
sum_reward += reward
if done:
print("sum_reward =", sum_reward)
train_agent_result.append(sum_reward)
state = env.reset()
sum_reward = 0
print("train_agent_result = {}".format(train_agent_result))
Outputs:
ERROR trial_runner.py:883 -- Trial Trainable_25f46_00001: Error processing result.
Traceback (most recent call last):
File "<my-path>/lib/python3.7/site-packages/ray/tune/trial_runner.py", line 880, in _process_trial_save
results = self.trial_executor.fetch_result(trial)
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/ray_trial_executor.py", line 686, in fetch_result
result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
return func(*args, **kwargs)
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/worker.py", line 1481, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(NotImplementedError): ray::Trainable.save() (pid=428, ip=<my-remote-computer-ip>)
File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/_private/function_manager.py", line 556, in actor_method_executor
return method(__ray_actor, *args, **kwargs)
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/trainable.py", line 327, in save
checkpoint = self.save_checkpoint(checkpoint_dir)
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/trainable.py", line 701, in save_checkpoint
checkpoint = self._save(tmp_checkpoint_dir)
File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/trainable.py", line 715, in _save
raise NotImplementedError
Question:
Where do I do wrong?
System info:
OS: Ubuntu 18.04
Ray: v1.3.0
Python: 3.7