Save model parameters on each checkpoint

I would like to save the model (.pb, .h5) parameters on each checkpoint as we would like
to compare the various stages of training outside of the ray/rllib framework and the
models are relatively small. It is not possible to know ahead of time how many iterations
are needed for training at the moment.

I have confirmed saving at the end of training works:

from ray import tune
from ray.rllib.agents.ppo import PPOTrainer
from ray.tune.trial import ExportFormat

tune.run(PPOTrainer,
config={“env”: “CartPole-v0”},
export_formats=[ExportFormat.MODEL, ExportFormat.H5, ExportFormat.CHECKPOINT],
local_dir=‘cart_outputs3’,
stop={“training_iteration”: 1}
)

PPO_CartPole-v0_91fad_00000_0_2021-07-14_09-14-54
├── checkpoint
│ ├── checkpoint
│ ├── model.data-00000-of-00001
│ ├── model.index
│ └── model.meta
├── events.out.tfevents.1626250494.velocity
├── model
│ ├── events.out.tfevents.1626250523.velocity
│ ├── saved_model.pb
│ └── variables
│ ├── variables.data-00000-of-00002
│ ├── variables.data-00001-of-00002
│ └── variables.index
├── params.json
├── params.pkl
├── progress.csv
└── result.json

(First problem is that the .h5 file is not created despite it being an available export option)

Now we use Tune:

results = tune.run(args.run, config=config, stop=stop, checkpoint_freq=2,
export_formats=[ExportFormat.MODEL, ExportFormat.H5],
num_samples=1, checkpoint_at_end=False)

But in this case nothing else appears in the checkpoints at all.
checkpoint_000002
├── checkpoint-2
└── checkpoint-2.tune_metadata

In a previous version of Ray (I think 0.8.0) - setting
,“checkpoint_freq”:2
,“checkpoint_at_end”: True
in the config and using run_experiments would create the model data under each checkpoint directory:
run_experiments({“EnvName”: myconfig})

So how can one save the model parameters (TensorFlow in this case) - to .pb or .h5 at each
checkpoint (the model is small) using ray.tune? Many thanks!

As an additional point, using ray==1.4.1 on Mac OS X. Is saving a model down in recoverable format (pb, hdf5) at checkpoints a supported feature? Is there any other information needed? I am kind of stuck at this point. Cheers.

If there is anyone with information about this would greatly appreciate it; I have tried several forums and scoured the internet for this simple question that seems so basic and essential. Quite a few queries about it but so no response anywhere. It would seem quite silly to have all this fantastic framework for training and tuning - but be unable to actually use the model trained outside of a ray actor that forces one to use the service/ports, etc. As I mentioned saving a model on a checkpoint as an option used to work in previous versions. I have tried this on Mac OS X and Linux and get the same result - a checkpoint only has the following files:
checkpoint-1479
checkpoint-1479.tune_metadata
despite specifying H5 and model on the input. No error is produced during training:

results = tune.run(args.run, 
                    config=config, 
                    stop=stop, 
                    checkpoint_freq=1, 
                    export_formats=[ExportFormat.MODEL, 
                           ExportFormat.H5, ExportFormat.CHECKPOINT], 
                    checkpoint_at_end=True
                )

I’ve been using Ray since the initial version and never had this issue - please help.
From a design standpoint I don’t think it would make sense to use the ‘results’ from the tuning - firstly one may not know how many iterations are needed ahead of time and may need to hit CTRL^C to stop the training. If we need to use results then all of that would be lost. Instead, the usual point of the checkpoint is to save the model so that recovery can start from an arbitrary point. The other objective is for inference: we may want to compare inference along different checkpoints - but we need to re-create the network model in tensoflow without the overhead of ray actors in the way.

Hi,
Sorry for getting to you late. But could you try the latest 1.7.0 release? Also could you share a repro script? We are actually revamping Tune checkpointing logic. Would love to take a look at this issue!

Many thanks - I tried slightly modifying one of the tune examples to snapshot a model on every checkpoint, using ray 1.8.0 - there is still no model saved on each checkpoint:

##########

Contribution by the Center on Long-Term Risk:

GitHub - longtermrisk/marltoolbox: A toolbox with the goal of speeding up research on bargaining in MARL (cooperation problems in MARL).

##########
import argparse
import os

import ray
from ray import tune
from ray.rllib.agents.ppo import PPOTrainer
from ray.rllib.examples.env.coin_game_non_vectorized_env import
CoinGame, AsymCoinGame

parser = argparse.ArgumentParser()
parser.add_argument("–tf", action=“store_true”)
parser.add_argument("–stop-iters", type=int, default=2000)

def main(debug, stop_iters=2000, tf=False, asymmetric_env=False):
train_n_replicates = 1 if debug else 1
seeds = list(range(train_n_replicates))

ray.init()

env_config = {
    "players_ids": ["player_red", "player_blue"],
    "max_steps": 20,
    "grid_size": 3,
    "get_additional_info": True,
}

rllib_config = {
    "env": AsymCoinGame if asymmetric_env else CoinGame,
    "env_config": env_config,
    "multiagent": {
        "policies": {
            env_config["players_ids"][0]: (
                None, AsymCoinGame(env_config).OBSERVATION_SPACE,
                AsymCoinGame.ACTION_SPACE, {}),
            env_config["players_ids"][1]: (
                None, AsymCoinGame(env_config).OBSERVATION_SPACE,
                AsymCoinGame.ACTION_SPACE, {}),
        },
        "policy_mapping_fn": lambda agent_id, **kwargs: agent_id,
    },
    # Size of batches collected from each worker.
    "rollout_fragment_length": 20,
    # Number of timesteps collected for each SGD round.
    # This defines the size of each SGD epoch.
    "train_batch_size": 512,
    "model": {
        "dim": env_config["grid_size"],
        "conv_filters": [[16, [3, 3], 1],
                         [32, [3, 3],
                          1]]  # [Channel, [Kernel, Kernel], Stride]]
    },
    "lr": 5e-3,
    "seed": tune.grid_search(seeds),
    "num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0")),
    "framework": "tf" if tf else "torch",
}

from ray.tune.trial import ExportFormat

stop = {
    "training_iteration": 20 if debug else stop_iters,
}

tune_analysis = tune.run(
    PPOTrainer,
    config=rllib_config,
    stop=stop,
    checkpoint_freq=1,
    export_formats=[ExportFormat.MODEL, ExportFormat.H5],
    checkpoint_at_end=True,
    name="PPO_AsymCG")

ray.shutdown()

return tune_analysis

if name == “main”:
args = parser.parse_args()
debug_mode = True
use_asymmetric_env = False
main(debug_mode, args.stop_iters, args.tf, use_asymmetric_env)

PPO_CoinGame_d2693_00000_0_seed=0_2021-11-06_10-39-30
├── checkpoint_000001
│ ├── checkpoint-1
│ └── checkpoint-1.tune_metadata
├── checkpoint_000002
│ ├── checkpoint-2
│ └── checkpoint-2.tune_metadata
├── checkpoint_000003
│ ├── checkpoint-3
│ └── checkpoint-3.tune_metadata
├── checkpoint_000004
│ ├── checkpoint-4
│ └── checkpoint-4.tune_metadata
├── checkpoint_000005
│ ├── checkpoint-5
│ └── checkpoint-5.tune_metadata
├── checkpoint_000006
│ ├── checkpoint-6
│ └── checkpoint-6.tune_metadata
├── checkpoint_000007
│ ├── checkpoint-7
│ └── checkpoint-7.tune_metadata
├── checkpoint_000008
│ ├── checkpoint-8
│ └── checkpoint-8.tune_metadata
├── checkpoint_000009
│ ├── checkpoint-9
│ └── checkpoint-9.tune_metadata
├── checkpoint_000010
│ ├── checkpoint-10
│ └── checkpoint-10.tune_metadata
├── checkpoint_000011
│ ├── checkpoint-11
│ └── checkpoint-11.tune_metadata
├── checkpoint_000012
│ ├── checkpoint-12
│ └── checkpoint-12.tune_metadata
├── checkpoint_000013
│ ├── checkpoint-13
│ └── checkpoint-13.tune_metadata
├── checkpoint_000014
│ ├── checkpoint-14
│ └── checkpoint-14.tune_metadata
├── checkpoint_000015
│ ├── checkpoint-15
│ └── checkpoint-15.tune_metadata
├── checkpoint_000016
│ ├── checkpoint-16
│ └── checkpoint-16.tune_metadata
├── checkpoint_000017
│ ├── checkpoint-17
│ └── checkpoint-17.tune_metadata
├── checkpoint_000018
│ ├── checkpoint-18
│ └── checkpoint-18.tune_metadata
├── checkpoint_000019
│ ├── checkpoint-19
│ └── checkpoint-19.tune_metadata
├── checkpoint_000020
│ ├── checkpoint-20
│ └── checkpoint-20.tune_metadata
├── events.out.tfevents.1636195170.wormwood
├── params.json
├── params.pkl
├── progress.csv
└── result.json

Hello - is this a suitable example? If not it should be possible to create a simpler one - I pulled an existing example and added the necessary parameters. Thank you!

Hi, sorry about our delay in responding to you.

I completely agree that exporting a model with every checkpoint is a really useful thing to have.
Did it really work at some point in history? Do you remember which version it was? Were you using Tune or just RLlib.
In any case, this is something we really want to clean up this Q. We should make it easy for users to take the model out.

Your repro script also revealed another potential problem for this. Since you are running multi-agent training without a policy under default_policy_id, the export_model() call errors out for us.
We should probably fix RLlib to export everything listed under multiagent.policies_to_train key.

I think it worked all the way back in 0.8.X-1.2.X - and I may be able to verify this with old setups - always with ray/tune and a custom model. I generally like the flexibility of tune to configure parameters, but with a custom model (policy) - which may be quite complex. The ray project would be well served if the documentation/examples of loading/saving models in the native (tensorflow/pytorch) formats and documenting it early on. Also, I’d be happy to any other approach if tune is not the right way to achieve this. Thanks so much!

Update: We are scoping this out to support exporting model for every checkpoint (not just at the end)… @kai

Great thank you - this would be for the next version?

Hi was this implemented?

Hi - its still a blocker for me (since v1.2x+ - more than a year) to use ray: models cannot be saved which defeats the purpose of training. Simplest test, with exporting turned on results in no saved .h5 models or otherwise - tested again on ray-1.13.0:

from ray import tune
from ray.rllib.agents.ppo import PPOTrainer
from ray.tune.trial import ExportFormat

tune.run(PPOTrainer,
config={“env”: “CartPole-v0”},
export_formats=[ExportFormat.MODEL, ExportFormat.H5, ExportFormat.CHECKPOINT],
local_dir=‘cart_outputs3’,
stop={‘training_iteration’: 1}
)

PPOTrainer_CartPole-v0_2022-07-15_07-45-38c42vyw9w
├── events.out.tfevents.1657885538.wormwood
├── params.json
├── params.pkl
├── progress.csv
└── result.json

If this is not the correct way to save models, I have seen online from 2020 there is a reporter mechanism. How are users saving the models on checkpoints, and broadly how are users saving model parameters at all? Thank you.

Hi @treadzero, I’m very sorry your problem has still not been resolved.

The way I see it there are two ways to resolve this for you quickly (until we address policy exporting in a more general way).

The first way is to extract the models post-training. Here is an example for PPO:

from ray.rllib.agents.ppo.ppo import PPOTrainer

trainer = PPOTrainer(
    config={}, env="CartPole-v0"
)

trainer.restore("/path/to/experiment/trial/checkpoint_000002")
trainer.export_policy_model("/tmp/policy")

This is a bit cumbersome as you’ll have to restore the config and initialize Ray again to export the model.

The other way would be to use a trainer that exports every step. Example for PPO:

import os

from ray import tune
from ray.rllib.agents.ppo.ppo import PPOTrainer


class ExportingPPOTrainer(PPOTrainer):
    def save_checkpoint(self, checkpoint_dir: str) -> str:
        path = super().save_checkpoint(checkpoint_dir)
        self.export_policy_model(os.path.join(checkpoint_dir, "policy"))
        return path


analysis = tune.run(
    ExportingPPOTrainer,
    config={
        "env": "CartPole-v0",
    },
    num_samples=1,
    verbose=3,
    checkpoint_freq=1,
    stop={"training_iteration": 2}
)

this will export the model in the checkpoints every time. Note that this will take up double the amount of storage.

Also, please know that this is a workaround that should hopefully enable your use-case for now, but we don’t endorse this as an official way to do the model export. This will require a more thorough design decision.

No worries at all - its an amazing system and I’m just keen to resume using it.
I think these are fine solutions and will give them a try. Will this work for a custom Policy? Usually I have a custom neural network and custom environment. Please let me know if you need a minimal example. thanks again!

import os

from ray import tune
from ray.rllib.agents.ppo.ppo import PPOTrainer
import tensorflow as tf


def train():

    class ExportingPPOTrainer(PPOTrainer):
        def save_checkpoint(self, checkpoint_dir: str) -> str:
            path = super().save_checkpoint(checkpoint_dir)
            self.export_policy_model(os.path.join(checkpoint_dir, "policy"))
            return path


    analysis = tune.run(
        ExportingPPOTrainer,
        config={
            "env": "CartPole-v0",
        },
        num_samples=1,
        verbose=3,
        checkpoint_freq=1,
        stop={"training_iteration": 2}
    )

def convert():

    from ray.rllib.agents.ppo.ppo import PPOTrainer

    trainer = PPOTrainer(
        config={}, env="CartPole-v0"
    )
    # how to override the unnecessarily long path - later it will be 
    # accessed programmatically.
    path = '<really-unnecessarily-long-path>/checkpoint_000002'
    trainer.restore(path)
    trainer.export_policy_model("/tmp/policy")


def restore():
    # throws exception
    import tensorflow as tf
    from tensorflow import keras
    test = keras.models.load_model('/tmp/policy/')


restore will produce an exception - the model can't be loaded:

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
/tmp/ipykernel_1204921/2896457936.py in <module>
      2 from tensorflow import keras
      3 
----> 4 test = keras.models.load_model('/tmp/policy/')

~/venv.1/lib/python3.9/site-packages/keras/utils/traceback_utils.py in error_handler(*args, **kwargs)
     65     except Exception as e:  # pylint: disable=broad-except
     66       filtered_tb = _process_traceback_frames(e.__traceback__)
---> 67       raise e.with_traceback(filtered_tb) from None
     68     finally:
     69       del filtered_tb

~/venv.1/lib/python3.9/site-packages/keras/saving/saved_model/load.py in _generate_object_paths(object_graph_def)
    225     current_node = nodes_to_visit.pop()
    226     current_path = paths[current_node]
--> 227     for reference in object_graph_def.nodes[current_node].children:
    228       if reference.node_id in paths:
    229         continue

IndexError: list index (0) out of range

(venv.1) dlf@wormwood:/tmp$ tree policy
policy
├── events.out.tfevents.1658972048.wormwood
├── saved_model.pb
└── variables
├── variables.data-00000-of-00001
└── variables.index

1 directory, 4 files

Just to be sure, you only need one or the other - either you use the first method (exporting PPO trainer) to export the policy, or you can export it after the run (though both should also still work).

I’ll look into the loading in a bit - it’s good that you can successfully eport the model after the run (which means the checkpoint can be read and rllib is actually able to load the model :smiley: ) - I’ll circle back to see how we can go about loading it in keras natively.

Hi, I’m still struggling to save/load a model after training:

  • I think we need a self-contained example of using Train/Tune to checkpoint - and within each checkpoint a model is available. Ideally the path is not so long and convoluted one needs to use glob and regex to locate them programmatically. There seem to be many posts on saving/loading models but no exact solution. I’ve seen cases where people go into the model and create various traces - which really starts to put a barrier in the usefulness - there are just so many ways for it to go wrong.
  • I think the style of documentation is a but unclear sometimes because there are too many ways to do the same things. At least cover the basics: create an env, create a new policy, train/checkpoint, save the model, load the model externally - make an inference. The hello-world of RL needs nothing more. But loading a trained model is like the most basic thing. Can we also fix the HDF saving or save model policies in Tune - these arguments are supported in the API but confusingly do nothing.
  • In the solution above, we have to create a specific PPOTrainer - what if we used another trainer? Why does the loader even need to know how a model was trained? We end up with some weights, the state, action space, etc. are known - ideally the Agent, Env, Policy, and training methods are separable.
  • Lots of warning messages appear; there may be compatibility problems on the saving part - tensor flow v1 vs v2.
  • The model doesn’t actually load - I think this is the main blocking part at the moment. So there is no way to load a trained model.
""" The most basic model save/load: any recommendations to improve?
      - the checkpoint occurs on each step: normally we'd checkpoint every n-steps.
      - how would we obtain the path where logging occurs to make a model checkpoint dir, instead of local dir.
      - there is no check for early exiting in the training
      - this is a basic example but the model can be loaded - although we are using an older .h5 model file approach.
"""
import argparse
import gym
from gym.spaces import Discrete, Box
import numpy as np
import os
import random

import ray
from ray.rllib.models import ModelCatalog
from ray.rllib.models.tf.misc import normc_initializer
from ray.rllib.agents import ppo
import tensorflow as tf

from ray.rllib.models.tf.tf_modelv2 import TFModelV2
from ray.rllib.env.env_context import EnvContext

# python main.py --agent=PPO --loglevel=3

class SimpleCorridor(gym.Env):
    """Example of a custom env in which you have to walk down a corridor.

    You can configure the length of the corridor via the env config."""

    def __init__(self, config: EnvContext):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1,), dtype=np.float32)
        # Set the seed. This is only used for the final (reach goal) reward.
        self.seed(config.worker_index * config.num_workers)

    def reset(self):
        self.cur_pos = 0
        return [self.cur_pos]

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1
        done = self.cur_pos >= self.end_pos
        # Produce a random reward when we reach the goal.
        return [self.cur_pos], random.random() * 2 if done else -0.1, done, {}

    def seed(self, seed=None):
        random.seed(seed)

class MyKerasModel(TFModelV2):
    """Custom model for policy gradient algorithms."""

    def __init__(self, obs_space, action_space, num_outputs, model_config, name):
        
        super(MyKerasModel, self).__init__(obs_space, action_space, num_outputs, model_config, name)

        self.inputs = tf.keras.layers.Input(shape=obs_space.shape, name="observations")
        
        layer_1 = tf.keras.layers.Dense(
            16,
            name="layer1",
            activation=tf.nn.relu,
            kernel_initializer=normc_initializer(1.0))(self.inputs)

        layer_out = tf.keras.layers.Dense(
            num_outputs,
            name="out",
            activation=None,
            kernel_initializer=normc_initializer(0.01))(layer_1)

        value_out = tf.keras.layers.Dense(
            1,
            name="value",
            activation=None,
            kernel_initializer=normc_initializer(0.01))(layer_1)

        self.base_model = tf.keras.Model(self.inputs, [layer_out, value_out])

        # self.register_variables(self.base_model.variables)

    def forward(self, input_dict, state, seq_lens):
        model_out, self._value_out = self.base_model(input_dict["obs"])
        return model_out, state

    def value_function(self):
        return tf.reshape(self._value_out, [-1])

    def import_from_h5(self, import_file):
        # Override this to define custom weight loading behavior from h5 files.
        self.base_model.load_weights(import_file)


env_config = {"corridor_length":10}

model_config = {
    "custom_model": "my_model"
    ,"vf_share_layers": False
    ,"fcnet_activation":"relu"
    # Number of hidden layers for fully connected net
    ,"fcnet_hiddens":[256,1024,256]
    }

train_config = {
    "env": SimpleCorridor  # or "corridor" if registered above
    ,"num_gpus": int(os.environ.get("RLLIB_NUM_GPUS", "0"))
    ,"env_config": env_config
    ,"model": model_config
    }

if __name__ == "__main__":

    ray.init(log_to_driver=False)
    ModelCatalog.register_custom_model(
        "my_model", MyKerasModel
    )
    
    agent = ppo.APPOTrainer(train_config)

    """  this is a basic trainer - but checkopints go into local dir: """
    def train_agent(agent, num_steps=5):
        for i in range(num_steps):
            result = agent.train()
            print(f"training step{i}", result["episode_reward_mean"] )
            with agent.get_policy()._sess.graph.as_default():
                model_checkpoint_path = "checkpoints/{}.h5".format(i)
                with agent.get_policy()._sess.as_default():
                    agent.get_policy().model.base_model.save_weights(model_checkpoint_path)
                    agent.get_policy().model.base_model.save(model_checkpoint_path)

    train_agent(agent, num_steps=2)

    # do a final model write:
    with agent.get_policy()._sess.graph.as_default():
        with agent.get_policy()._sess.as_default():
            agent.get_policy().model.base_model.save_weights("weights.h5")
            agent.get_policy().model.base_model.save('model.h5')

    ray.shutdown()

    # ok, now load the model and create a random inference!
    obs_space = Box(0.0, env_config['corridor_length'], shape=(1,), dtype=np.float32)
    act_space = Discrete(2)

    model = MyKerasModel(obs_space=obs_space, action_space=act_space, num_outputs=1, model_config=model_config, name="test")
    model.import_from_h5("checkpoints/1.h5")

    input_dict = {"obs":obs_space.sample()}
    x = model.forward(input_dict=input_dict, state=None, seq_lens=1)

    print(x)

unfortunately, however - switching framework to ‘tf2’ breaks this:

    with agent.get_policy()._sess.graph.as_default():
AttributeError: 'APPOTF2Policy' object has no attribute '_sess'