Tuning a Keras model - no checkpoints saved

I am tuning a Keras model with ray.tune. It basically looks as follows:

import tensorflow as tf
from ray import tune 
from ray.tune.search.hyperopt import HyperOptSearch
from ray.tune.schedulers import MedianStoppingRule

# model structure
def build_model(config):
  
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(32, activation="relu", input_shape=(156,)),
        tf.keras.layers.Dense(1, activation="relu")
    ])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=config["learning_rate"]),
        loss='mse',
        metrics='mse',
    )

    return model

# trainable
def train_model(config):

    # Build and compile model
    model = build_model(config)

    # Train the model
    model.fit(x_train, y_train,
              validation_data=(x_val, y_val),
              epochs=10,
              callbacks = [ray.tune.integration.keras.TuneReportCallback(metrics={'val_mse':'val_mse'})]
             )

# define search space
space = {'learning_rate': tune.choice([0.02, 0.2])}

# Define experiment configuration
hyperopt = HyperOptSearch()
median_stopping = MedianStoppingRule(grace_period=2)

analysis = tune.Tuner( 
    tune.with_resources(
        train_model,
        resources={"cpu": 1, "gpu": 0}
     ),
     param_space=space,
     tune_config=tune.TuneConfig(
         metric="val_mse",
         mode="min",
         search_alg=hyperopt,
         scheduler=median_stopping,
         num_samples=3,
         ),
    run_config=air.RunConfig(
         local_dir = 'PATH',
         name= 'output',
    ),
)

# run
ray.init()
result = analysis.fit()

This creates an output folder that contains events.out.tfevents, params.json, params.pkl, progress.csv and results.json. However, no checkpoints are created. If I do: result.get_best_result().best_checkpoints, it returns an empty list.

I need to save the best performing model and work with it outside of the ray architecture. Without checkpoints, I am not sure how to do it.

I found out myself: ray.tune.integration.keras.TuneReportCallback() only reports to ray tune and does not save checkpoints. ray.tune.integration.keras.TuneReportCheckpointCallback() should do the job.

Correct - and also from this thread Ray Iteration vs Keras Epoch, we recommend using: ray.air.integrations.keras.ReportCheckpointCallback — Ray 2.3.0, which works in both Tune training functions, as well as the AIR TensorflowTrainer.

Could you also mark this as resolved? Thanks!

I encountered one issue: ray.air.integrations.keras.ReportCheckpointCallback() does not save the model, i.e. it does not save a saved_model.pb file. On the other hand ray.tune.integration.keras.TuneReportCheckpointCallback() does. I need to load the model later on, so I need the saved_model.pb file. Am I wrong here, or do the two functions differ in that regard?

@F_S

Ah yeah that is another difference - tune.integration.keras.TuneReportCheckpointCallback() will go through Keras model.save to dump a .pb file. Then, it constructs a dir with those checkpoint contents inside. Then, it’s your job to load it back w/ the correct filepath later.

air.integrations.keras.ReportCheckpointCallback will create a framework-specific TensorflowCheckpoint checkpoint from the model itself.

checkpoint = TensorflowCheckpoint.from_model(model)

Then, when you need to load the model, you can just do:

model = init_model()
# Load weights into the model
model = checkpoint.get_model(model=model)

The framework-specific checkpoint is useful if you want to do batch prediction with Ray later. You can just pass it into the BatchPredictor.from_checkpoint constructor.

Also, I’d appreciate if you could answer a few questions that would help us inform our future APIs:

  • Which method do you prefer? Working with directories or converting the model to an AIR checkpoint?
  • What would be the ideal user experience for checkpointing during training?

Let me know if you have any other questions!

1 Like

Thanks. However, I do not quite get it.

So when I use air.integrations.keras.ReportCheckpointCallback it creates a tensorflowcheckpoint. I do my tuning on a rolling window basis. That is, I estimate and tune a model around 20 times in a for loop over a range of 20 time windows. For each of these time windows, I estimate a model with 500 epochs. This yields 10000 epochs overall. Now let’s say I test 5 different network configurations, this then yields 50000 models in total. For each of these models, I need a checkpoint.

After having estimated all of these, I can navigate through these models easily by going through the filepaths. However, I have difficulties understanding how I can navigate through these models using your suggested method. How do I load one of these 50000 models using your suggested method? I should have 50000 AIR checkpoints, right? How do I navigate through these or rather, how can I use them (what are they called, etc.)? I do not quite understand…

Regarding your questions: I am a PhD student and I am doing empirical research. So I am not a ML-engineer, but rather a data scientist. I prefer using directories as for now (obviously, since I do not understand how to work with AIR checkpoints at the moment… haha). However, I have been working with ray for about a week, so I am still learning.

To give you a concrete example. For a given rolling window I do as follows:

import numpy as np
from random import seed
import tensorflow as tf 
import ray
from ray import tune
from ray.tune.search.hyperopt import HyperOptSearch
from ray.tune.schedulers import MedianStoppingRule
from ray.air.integrations.keras import ReportCheckpointCallback
from tensorflow_addons.metrics import RSquare

def build_model(config):
    
    np.random.seed(12456)
    seed(12456)
    tf.random.set_seed(12456)
    
    model = tf.keras.Sequential([
        tf.keras.layers.Dense(32, activation=tf.keras.layers.LeakyReLU(alpha=0.1), input_shape=(471,), kernel_regularizer = regularizers.L1L2(l1 = 0.01, l2 = 0)), #manually change input shape
        tf.keras.layers.Dense(16, activation=tf.keras.layers.LeakyReLU(alpha=0.1), kernel_regularizer = regularizers.L1L2(l1 = 0.01, l2 = 0)),
        tf.keras.layers.Dense(8, activation=tf.keras.layers.LeakyReLU(alpha=0.1), kernel_regularizer = regularizers.L1L2(l1 = 0.01, l2 = 0)),
        tf.keras.layers.Dense(4, activation=tf.keras.layers.LeakyReLU(alpha=0.1), kernel_regularizer = regularizers.L1L2(l1 = 0.01, l2 = 0)),
        tf.keras.layers.Dense(1, activation=tf.keras.layers.LeakyReLU(alpha=0.1), kernel_regularizer = regularizers.L1L2(l1 = 0.01, l2 = 0))
    ])

    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=config["learning_rate"]),
        loss='mse',
        metrics=[RSquare(), 'mse'],
    )

    return model

# define search space
space = {'learning_rate': tune.choice([0.02, 0.01])}

# train model function
def train_model(config, data):

    # Build and compile model
    model = build_model(config)

    # Train the model
    model.fit(
        data[0], data[1],
        validation_data=(data[2], data[3]),
        epochs=5,
        verbose=0,
        callbacks = [ReportCheckpointCallback(
            metrics={'val_mse':'val_mse'},
            checkpoint_on = 'epoch_end')]
    )
            
# Define experiment configuration
hyperopt = HyperOptSearch(random_state_seed=1234567)
median_stopping = MedianStoppingRule(grace_period=2)
        
analysis = tune.Tuner( 
    tune.with_parameters(
        train_model,
        data=[X_train, Y_train, X_val, Y_val]
        ),
    param_space=space,
    tune_config=tune.TuneConfig(
        metric="val_mse",
        mode="min",
        search_alg=hyperopt,
        scheduler=median_stopping,
        num_samples=2,
        ),
    run_config=air.RunConfig(
        local_dir = 'PLACEHOLDER',
        name= "testing",
    ),
)

After having done this. How do I access the AIR checkpoint for e.g. epoch 2 of trial 1 and load the model?

Hi @F_S,

Thank you for providing this feedback - it’s very useful for me!

So, to summarize, the overall workflow you have right now is:

num_windows = 20
for i in range(num_windows):
    tuner = Tuner(...) # configured for 500 epochs of training on window i
    result_grid = tuner.fit()

    # You can access the checkpoints right after training easily
    # Get checkpoints for window i
    for result in result_grid:
        # all 500 checkpoints + metrics
        checkpoints_and_metrics = result.best_checkpoints
        # checkpoint from 500th epoch
        last_checkpoint = result.checkpoint

If you want to first run your training script, then do some post-experiment analysis, you can do something like:

total_checkpoints = 0

for i in range(num_windows):
    # Assumes that you set `name="..._{i}"` uniquely for each window
    tuner = tune.Tuner.restore(f"<local_dir>/<name>_{i}")
    result_grid = tuner.get_results()

    for result in result_grid:
        total_checkpoints += len(result.best_checkpoints)
        for (ckpt, metrics) in result.best_checkpoints:
            print(ckpt.uri)
        print()

    epoch_2_ckpt, epoch_2_metrics = result_grid[0].best_checkpoints[1]
    model = init_model()
    model = epoch_2_ckpt.get_model(model=model)

print(f"Found {total_checkpoints} total checkpoints")

Basically, instead of having to traverse through the saved files yourself, you can just use our APIs to retrieve checkpoints.

This “result loading” API is something that’s being actively developed, and we would love more of your input on what you think would be an ideal user experience!

1 Like