Raise NotImplementedError when I try to restore the best trained agent

My objective:

Train a DDPG agent that performs well in my costume environment.

My implementation:

I try to achieve the objective via the following two steps.

step 1: Train DDPG agents. During training, I run tune to perform grid searches, where each run generates a trained agent.

Here is my code for performing grid searches. For each agent, I save a checkpoint for every 10 iterations.

resource = ray.cluster_resources()
num_trial = 4
num_cpu = resource['CPU'] / num_trial
num_gpu = resource['GPU'] / num_trial
resources_per_trial = {
    "cpu": num_cpu,
    "gpu": num_gpu,
}

stop = {
    "training_iteration": 80,
}

config = {
    "env_config": {
    "num_node": 10,
    "run_ch": 6,
    "len_record": 100,
    "time_max": 10 ** 6,
    },
    "agent_config": {
        "critic_lr": tune.grid_search([2e-3, 3e-3]),
        "gamma": 0.995,
        "framework": "tf",
        "soft_horizon": True,  # It is a continues env instead of an episodic one.
    },
    "train_config": {
        "num_iters": 50,
    },
    "chkpt_root": chkpt_root,
}

asha_shcedular = tune.schedulers.ASHAScheduler(
    metric="episode_reward_mean",
    mode="max"
)

results = tune.run(
    Trainable,
    config = config,
    resources_per_trial = resources_per_trial,
    stop = stop,
    local_dir = chkpt_root,
    scheduler = asha_shcedular,
    checkpoint_freq = 10,
    checkpoint_at_end = True,
)

Here is my Trainable class.

from envs.my_costume_env import MyCostumeEnv
from ray.rllib.agents.ddpg import DDPGTrainer, DEFAULT_CONFIG
from ray import tune
from ray.tune.registry import register_env

class Trainable(tune.Trainable):
    def setup(self, config):
        self.got_config = {}
        self.got_config = config

    def step(self):
        agent_config = DEFAULT_CONFIG.copy()

        for key in self.got_config.get("agent_config"):
            agent_config[key] = self.got_config["agent_config"][key]

        agent_config["env_config"] = self.got_config["env_config"]
        chkpt_root = self.got_config["chkpt_root"]

        select_env = "my_env_v2"
        register_env(select_env, lambda x: MyCostumeEnv())

        agent = DDPGTrainer(config=agent_config, env=MyCostumEnv)

        num_iters = self.got_config["train_config"]["num_iters"]
        results = 0
        for ith in range(num_iters):
            results = agent.train()
            print("Trainable: ran iter ({}/{}).".format(ith, num_iters))

        chkpt_path = agent.save(checkpoint_dir=chkpt_root)

        return results

Step 2: Select the best trained agent and then restore it to run in my custome environment.

I select the best trained agent as follows.

from envs.my_costume_env import MyCostumeEnv
from ray.rllib.agents.ddpg import DDPGTrainer, DEFAULT_CONFIG

chkpt = tune.Analysis(experiment_dir=chkpt_root)
chkpt_config = chkpt.get_best_config(metric="episode_reward_mean", mode="max")
chkpt_path = results.get_trial_checkpoints_paths(
    trial=results.get_best_trial(metric="episode_reward_mean", mode="max", scope="all"), 
    metric="episode_reward_mean",
)
print("chkpt_path:", chkpt_path)

agent_config = DEFAULT_CONFIG.copy()
    for key in chkpt_config.get("agent_config"):
        agent_config[key] = chkpt_config["agent_config"][key]

agent_config["env_config"] = chkpt_config["env_config"]
agent = DDPGTrainer(config=agent_config, env=MyCostumeEnv)
agent.restore(chkpt_path[0][0])

env = MyCostumEnv(config["env_config"])
state = env.reset()
sum_reward = 0
num_step = 10 ** 7
for step in range(num_step):
    action = agent.compute_action(state)
    state, reward, done, info = env.step(action)
    sum_reward += reward
    if done:
        print("sum_reward =", sum_reward)
        train_agent_result.append(sum_reward)
        state = env.reset()
        sum_reward = 0

print("train_agent_result = {}".format(train_agent_result))

Outputs:

ERROR trial_runner.py:883 -- Trial Trainable_25f46_00001: Error processing result.
Traceback (most recent call last):
  File "<my-path>/lib/python3.7/site-packages/ray/tune/trial_runner.py", line 880, in _process_trial_save
    results = self.trial_executor.fetch_result(trial)
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/ray_trial_executor.py", line 686, in fetch_result
    result = ray.get(trial_future[0], timeout=DEFAULT_GET_TIMEOUT)
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return func(*args, **kwargs)
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/worker.py", line 1481, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(NotImplementedError): ray::Trainable.save() (pid=428, ip=<my-remote-computer-ip>)
  File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/_private/function_manager.py", line 556, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/trainable.py", line 327, in save
    checkpoint = self.save_checkpoint(checkpoint_dir)
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/trainable.py", line 701, in save_checkpoint
    checkpoint = self._save(tmp_checkpoint_dir)
  File "<my-anaconda-env-path>/lib/python3.7/site-packages/ray/tune/trainable.py", line 715, in _save
    raise NotImplementedError

Question:

Where do I do wrong?

System info:

OS: Ubuntu 18.04
Ray: v1.3.0
Python: 3.7

Hey @Roller44 , thanks for the question. This looks more like a Ray Tune problem, but I’m assuming, you have to implement the save_checkpoint method for your custom Trainable. In that implementation, you could call save() on your wrapped DDPGTrainer. That being said, I’m not exactly sure why you even have to wrap a DDPGTrainer into a custom trainable in your case. Couldn’t you just use a DDPGTrainer directly in tune? Like tune.run("DDPG", ...)? But I guess you would like to add more customizations, in which case, this is totally fine.

Thanks for your reply.

Yes, you are right, I should not wrap my DDPGTrainer into a custom trainable.

Here is my revised code.

Code for training:

    from envs.my_costume_env import MyCostumeEnv
    from ray.rllib.agents.ddpg import DDPGTrainer, DEFAULT_CONFIG
    from ray.tune.registry import register_env

    resource = ray.cluster_resources()
    num_trial = 16

    stop = {
        "training_iteration": 80,
    }

    env_config = {
                # Env variables
            }

    select_env = "my_costum_env_v1"
    register_env(select_env, lambda env_config: MyCostumeEnv(env_config))

    config = {
        "env_config": env_config
        "agent_config": {
            "critic_lr": tune.grid_search([2e-3, 3e-3]),
            "actor_lr": tune.grid_search([3e-3, 4e-3]),
            "gamma": 0.995,
            "framework": "tf",
            "soft_horizon": True,  # It is a continues env instead of an episodic one.
        },
        "num_gpus": resource['GPU'] / num_trial,
        "num_cpus_for_driver": 1,
    }

    asha_shcedular = tune.schedulers.ASHAScheduler(
        metric="episode_reward_mean",
        mode="max"
    )

    results = tune.run(
        "DDPG",
        config = config,
        stop = stop,
        local_dir = chkpt_root,
        scheduler = asha_shcedular,
        checkpoint_freq = 10,
        checkpoint_at_end = True,
    )

Code for restoring a trained agent:

    from envs.my_costume_env import MyCostumeEnv
    from ray.rllib.agents.ddpg import DDPGTrainer, DEFAULT_CONFIG

    chkpt = results.get_trial_checkpoints_paths(
            trial=results.get_best_trial(
                metric="episode_reward_mean",
                mode="max",
                scope="last"
            ),
            metric="episode_reward_mean"
        )
    chkpt_path = chkpt[0][0]

    chkpt_config = results.get_best_config(metric="episode_reward_mean", mode="max")
    agent_config = DEFAULT_CONFIG.copy()
        for key in chkpt_config:
            agent_config[key] = chkpt_config[key]
            
    agent = DDPGTrainer(config=agent_config, env="multi_channels_v1")
    agent.restore(chkpt_path)

    env = MyCostumEnv(config["env_config"])
    state = env.reset()
    sum_reward = 0
    num_step = 10 ** 7
    for step in range(num_step):
        action = agent.compute_action(state)
        state, reward, done, info = env.step(action)
        sum_reward += reward
        if done:
            print("sum_reward =", sum_reward)
            train_agent_result.append(sum_reward)
            state = env.reset()
            sum_reward = 0

    print("train_agent_result = {}".format(train_agent_result))

The agent is not trained well but I think is another issue that irrelevant to this problem.
Thanks a lot!

1 Like