Blas GEMM launch failed when running ray tune 2.1 with horovod + gpu

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi team,

Recently I tried to upgrade our ray tune version from 1.2 to 2.1. However I found that when I run horovod + gpu on ray cluster, it will cause the error like

tensorflow.python.framework.errors_impl.InternalError: Blas GEMM launch failed : a.shape=(64, 784), b.shape=(784, 64), m=64, n=64, k=784. However it works with ray 2.0.

Is there any change that cause this issue?

Here’s some info,

Ray:2.1.0
CUDA: 11
Tensorflow: 2.4.0.39
Horovod: 0.23.0.7

This is my code

import argparse
import numpy as np
import os
import socket


import horovod.tensorflow.keras as hvd
import ray
import tensorflow as tf

from ray import air, tune
from ray.air.callbacks.keras import Callback as TrainCheckpointReportCallback
from ray.air.config import ScalingConfig
from ray.train.horovod import HorovodTrainer
from ray.tune.tune_config import TuneConfig


def build_and_compile_cnn_model(optimizer):
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=optimizer,
        metrics=['accuracy'])
    return model


def train_func(config):
    os.environ["TF_GPU_CUPTI_USE_ACTIVITY_API"] = "false"
    hostname = socket.gethostname().replace(".", "_")

    per_worker_batch_size = config["batch_size"]

    # Horovod: initialize Horovod.
    hvd.init()

    global_batch_size = per_worker_batch_size * hvd.size()
    log_dir = os.path.join(config["working_dir"], hostname, f"worker_{hvd.rank()}")
    learning_rate = config.get("lr", 0.001) * hvd.size()
    steps_per_epoch = config["steps"] // config["epochs"] // hvd.size()
    ckpt_full_path = os.path.join(config["working_dir"], hostname, 'model.ckpt-{epoch:04d}')
    opt = tf.keras.optimizers.SGD(learning_rate=learning_rate)
    opt = hvd.DistributedOptimizer(opt)
    callbacks = [
        # Horovod: broadcast initial variable states from rank 0 to all other processes.
        # This is necessary to ensure consistent initialization of all workers when
        # training is started with random weights or restored from a checkpoint.
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),

        # Horovod: average metrics among workers at the end of every epoch.
        #
        # Note: This callback must be in the list before the ReduceLROnPlateau,
        # TensorBoard or other metrics-based callbacks.
        hvd.callbacks.MetricAverageCallback(),

        # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
        # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
        # the first three epochs. See https://arxiv.org/abs/1706.02677 for details.
        hvd.callbacks.LearningRateWarmupCallback(initial_lr=learning_rate, warmup_epochs=3, verbose=1),

        tf.keras.callbacks.TensorBoard(log_dir=log_dir, profile_batch=config["profile_batch"], histogram_freq=2),
        TrainCheckpointReportCallback(),
    ]

    # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
    if hvd.rank() == 0:
        callbacks.append(tf.keras.callbacks.ModelCheckpoint(ckpt_full_path, save_weights_only=True, verbose=1, save_freq=50))

    # Horovod: pin GPU to be used to process local rank (one GPU per process)
    gpus = tf.config.experimental.list_physical_devices('GPU')
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

    model = build_and_compile_cnn_model(opt)

    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_datasets = (
        tf.data.Dataset.from_tensor_slices((x_train, y_train))
        .shuffle(60000)
        .repeat()
        .batch(global_batch_size)
    )

    # By default the sharding is done using files, however mnist dataset
    # only has 1 file so auto sharding fails for multiworker strategy.
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.OFF
    train_datasets_no_auto_shard = train_datasets.with_options(options)

    model.fit(
        x=train_datasets_no_auto_shard,
        epochs=config["epochs"],
        steps_per_epoch=steps_per_epoch,
        callbacks=callbacks,
        verbose=1
    )


def tune_tensorflow_mnist(config):
    num_workers = 1
    num_cpus_per_worker = 4
    num_gpus_per_worker = 1 if config["use_gpu"] else 0
    resources_per_worker = {"CPU": num_cpus_per_worker, "GPU": num_gpus_per_worker}
    hp = {"lr": tune.loguniform(1e-4, 1e-1)}

    trainable = HorovodTrainer(train_loop_per_worker=train_func,
                               train_loop_config=config,
                               scaling_config=ScalingConfig(trainer_resources={"CPU": 0}, num_workers=num_workers, use_gpu=config["use_gpu"], resources_per_worker=resources_per_worker))
    hp = {"train_loop_config": hp}

    tuner = tune.Tuner(
        trainable,
        run_config=air.RunConfig(
            failure_config=air.FailureConfig(max_failures=3)
        ),
        tune_config=TuneConfig(mode="max", metric="accuracy", num_samples=config["num_samples"], max_concurrent_trials=3),
        param_space=hp,
    )

    results = tuner.fit()
    print("Best hyperparameters found were: ", results.get_best_result().config)
    
def main(kwargs):
    # Needs to remove the usage of FLAGS in train_func otherwise there will be TypeError: ray.cloudpickle.dumps errors
    config = {
        "epochs": kwargs.get("epochs"),
        "data_dir": kwargs.get("data_dir"),
        "download_data": kwargs.get("download_data"),
        "working_dir": kwargs.get("working_dir"),
        "steps": kwargs.get("steps"),
        "batch_size": kwargs.get("batch_size"),
        "profile_batch": kwargs.get("profile_batch"),
        "num_samples": kwargs.get("num_samples"),
        "use_gpu": kwargs.get("use_gpu"),
    }

    if kwargs.get("is_local_run"):
        # when running locally, use ray.init()
        ray.init()
    else:
        ray.init("ray://127.0.0.1:10001")

    tune_tensorflow_mnist(config)
    ray.shutdown()


if __name__ == "__main__":
    # Training settings
    parser = argparse.ArgumentParser(
        description="Tensorflow MNIST Example",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    # To Run this locally, do $ python cloudflow-dsl-example/src/linkedin/tensorflow/mnist/mnist_horovod_raytune.py --epochs 1 --steps 200 --data_dir=~/
#  --working_dir=~/keras_raytune_test --download_data=True --use_gpu=False --profile_batch=0,0 --is_local_run=True
    parser.add_argument(
        "--batch_size",
        type=int,
        default=64,
        metavar="N",
        help="input batch size for training (default: 64)",
    )
    parser.add_argument(
        "--epochs",
        type=int,
        default=5,
        metavar="N",
        help="number of epochs to train (default: 10)",
    )

    parser.add_argument(
        "--download_data", action="store_true", default=False, help="download data"
    )

    parser.add_argument(
        "--use_gpu", action="store_true", default=False, help="enables CUDA training"
    )

    parser.add_argument(
        "--is_local_run", action="store_true", default=False, help="is local run"
    )

    parser.add_argument(
        "--profile_batch",
        default="0,0",
        help="profile batch",
    )

    parser.add_argument(
        "--data_dir",
        help="location of the training dataset in the local filesystem ("
        "will be downloaded if needed)",
    )

    args = parser.parse_args()

    kwargs = {
        "data_dir": args.data_dir,
        "batch_size": args.batch_size,
        "epochs": args.epochs,
        "download_data": args.download_data,
        "is_local_run": args.is_local_run,
        "profile_batch": args.profile_batch,
        "use_gpu": args.use_gpu,
        "num_samples": 3,
        "working_dir": "~/keras_raytune_test",
        "steps": 200,
    }

    main(kwargs=kwargs)

This is the log,

(TunerInternal pid=4780) 2023-01-18 20:42:24,186        ERROR trial_runner.py:993 -- Trial HorovodTrainer_750ba_00002: Error processing event.
(TunerInternal pid=4780) ray.exceptions.RayTaskError(InternalError): ray::_Inner.train() (pid=588, ip=100.96.197.49, repr=HorovodTrainer)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/tune/trainable/trainable.py", line 355, in train
(TunerInternal pid=4780)     raise skipped from exception_cause(skipped)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
(TunerInternal pid=4780)     ray.get(object_ref)
(TunerInternal pid=4780) ray.exceptions.RayTaskError(InternalError): ray::RayTrainWorker._RayTrainWorker__execute() (pid=462, ip=100.96.205.88, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7f5520779f10>)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/train/_internal/worker_group.py", line 31, in __execute
(TunerInternal pid=4780)     raise skipped from exception_cause(skipped)
(TunerInternal pid=4780)   File "/home/jobuser/.shiv/cloudflow-dsl-example.pyz_5671c8d0e260c4ebf25144901e349542aa0e77068788380bed00dbf2496d6542/site-packages/ray/train/_internal/utils.py", line 129, in discard_return_wrapper
(TunerInternal pid=4780)     train_func(*args, **kwargs)
(TunerInternal pid=4780)   File "/home/wyen/workspace/kingkong-starter-kit_trunk/cloudflow-dsl-example/src/linkedin/tensorflow/mnist/mnist_horovod_raytune.py", line 104, in train_func
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py", line 1100, in fit
(TunerInternal pid=4780)     tmp_logs = self.train_function(iterator)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 828, in __call__
(TunerInternal pid=4780)     result = self._call(*args, **kwds)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/def_function.py", line 888, in _call
(TunerInternal pid=4780)     return self._stateless_fn(*args, **kwds)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 2943, in __call__
(TunerInternal pid=4780)     filtered_flat_args, captured_inputs=graph_function.captured_inputs)  # pylint: disable=protected-access
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 1919, in _call_flat
(TunerInternal pid=4780)     ctx, args, cancellation_manager=cancellation_manager))
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/function.py", line 560, in call
(TunerInternal pid=4780)     ctx=ctx)
(TunerInternal pid=4780)   File "/home/jobuser/.local/lib/python3.7/site-packages/tensorflow/python/eager/execute.py", line 60, in quick_execute
(TunerInternal pid=4780)     inputs, attrs, num_outputs)
(TunerInternal pid=4780) tensorflow.python.framework.errors_impl.InternalError:  Blas GEMM launch failed : a.shape=(64, 784), b.shape=(784, 64), m=64, n=64, k=784
(TunerInternal pid=4780)         [[node sequential/dense/MatMul (defined at home/wyen/workspace/kingkong-starter-kit_trunk/cloudflow-dsl-example/src/linkedin/tensorflow/mnist/mnist_horovod_raytune.py:104) ]] [Op:__inference_train_function_515]
(TunerInternal pid=4780) 
(TunerInternal pid=4780) Function call stack:

See Blas GEMM launch failed when running ray tune 2.1 with horovod + gpu · Issue #31754 · ray-project/ray · GitHub.