Using Tune with tensorflow encountered InactiveRpcError

This is not a contribution.

Versions:
python: 3.9
ray: 2.2.0
tensorflow: 2.8.0
OS: Ubuntu18.04

I was using tune to rewrite a simple tensorflow project and encountered InactiveRpcError showing a property value that exceeded the preset maximum value.

Here’s an example of the errors (produced by the script provided later):

> _InactiveRpcError                         Traceback (most recent call last)
/tmp/ipykernel_473/2500321790.py in <cell line: 9>()
      7 
      8 ray.shutdown()  # Restart Ray defensively in case the ray connection is lost.
----> 9 ray.init(ignore_reinit_error=True)
     10 # We clean out the logs before running for a clean visualization later.
     11 get_ipython().system(' rm -rf ~/ray_results/train_on_original')
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
103             if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104                 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105         return func(*args, **kwargs)
    106 
    107     return wrapper
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/worker.py in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, storage, **kwargs)
   1565 
   1566     for hook in _post_init_hooks:
-> 1567         hook()
   1568 
   1569     node_id = global_worker.core_worker.get_current_node_id()
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/tune/registry.py in flush_values(self)
    209     def flush_values(self):
    210         for (category, key), value in self._to_flush.items():
--> 211             _internal_kv_put(
    212                 _make_key(self._prefix, category, key), value, overwrite=True
    213             )
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
    103             if func.__name__ != "init" or is_client_mode_enabled_by_default:
    104                 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105         return func(*args, **kwargs)
    106 
    107     return wrapper
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/experimental/internal_kv.py in _internal_kv_put(key, value, overwrite, namespace)
     92         and isinstance(overwrite, bool)
     93     )
---> 94     return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
     95 
     96 
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/gcs_utils.py in wrapper(self, *args, **kwargs)
    177             while True:
    178                 try:
--> 179                     return f(self, *args, **kwargs)
    180                 except grpc.RpcError as e:
    181                     if remaining_retry <= 0:
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/gcs_utils.py in internal_kv_put(self, key, value, overwrite, namespace, timeout)
    296             overwrite=overwrite,
    297         )
--> 298         reply = self._kv_stub.InternalKVPut(req, timeout=timeout)
    299         if reply.status.code == GcsCode.OK:
    300             return reply.added_num
~/miniconda3/envs/myconda/lib/python3.9/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
    944         state, call, = self._blocking(request, timeout, metadata, credentials,
    945                                       wait_for_ready, compression)
--> 946         return _end_unary_response_blocking(state, call, False, None)
    947 
    948     def with_call(self,
~/miniconda3/envs/myconda/lib/python3.9/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
    847             return state.response
    848     else:
--> 849         raise _InactiveRpcError(state)
    850 
    851 
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.RESOURCE_EXHAUSTED
	details = "Sent message larger than max (798785647 vs. 536870912)"
	debug_error_string = "{"created":"@1671861206.706027819","description":"Error received from peer 172.17.0.2:6379","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Sent message larger than max (798785647 vs. 536870912)","grpc_status":8}">

Here’s an example of the errors (produced by the script provided later):

> import numpy as np
> np.random.seed(0)
> 
> import tensorflow as tf
> try:
>     tf.get_logger().setLevel('INFO')
> except Exception as exc:
>     print(exc)
> import warnings
> warnings.simplefilter("ignore")
> 
> from tensorflow.keras.models import Sequential
> from tensorflow.keras.layers import Dense
> 
> from tensorflow.keras.optimizers import SGD, Adam
> from tensorflow.keras.callbacks import ModelCheckpoint
> 
> import ray
> from ray import tune
> from ray.tune.examples.utils import get_iris_data
> 
> import inspect
> import pandas as pd
> import matplotlib.pyplot as plt
> plt.style.use('ggplot')
> %matplotlib inline
> cifar10 = tf.keras.datasets.cifar10
> 
> (trainx, trainy), (testx, testy) = cifar10.load_data()
> 
> x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0
> 
> y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)
> 
> x_train.shape[1:]
> def create_model(learning_rate):
>     model=tf.keras.Sequential()
>     
>     model.add(tf.keras.layers.Conv2D(16, kernel_size=(3, 3), padding='same', activation=tf.nn.relu,
>                                  input_shape=x_train.shape[1:]))
>     model.add(tf.keras.layers.Conv2D(16, kernel_size=(3, 3), padding='same', activation=tf.nn.relu))
>     model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
>     model.add(tf.keras.layers.Dropout(0.2))
> 
>     model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation=tf.nn.relu,
>                                      input_shape=x_train.shape[1:]))
>     model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation=tf.nn.relu))
>     model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
>     model.add(tf.keras.layers.Dropout(0.2))
> 
>     model.add(tf.keras.layers.Flatten())
>     model.add(tf.keras.layers.Dropout(0.2))
>     model.add(Dense(128, activation='relu'))
>     model.add(tf.keras.layers.Dropout(0.2))
>     model.add(Dense(10, activation='softmax'))
>     
>     model.summary()
>     
>     optimizer = SGD(lr=learning_rate)
>     
>     model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])
>     return model
> import tensorflow.keras as keras
> from ray import tune
> 
> 
> class TuneReporterCallback(keras.callbacks.Callback):
>     """Tune Callback for Keras.
>     
>     The callback is invoked every epoch.
>     """
> 
>     def __init__(self, logs={}):
>         self.iteration = 0
>         super(TuneReporterCallback, self).__init__()
> 
>     def on_epoch_end(self, batch, logs={}):
>         self.iteration += 1
>         tune.report(keras_info=logs, mean_accuracy=logs.get("accuracy"), mean_loss=logs.get("loss"))
> def tune_on_train(config):
>     x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0
> 
>     y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)
>     
>     model = create_model(learning_rate=config['lr'])
>     checkpoint_callback = ModelCheckpoint(
>         "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)
>     
>     callbacks = [checkpoint_callback, TuneReporterCallback()]
>     
>     model.fit(x_train,y_train,
>               batch_size=64,epochs=10,
>               validation_split=0.2,
>               callbacks = [checkpoint_callback])
>     return model
> import numpy as np; np.random.seed(5)  
> hyperparameter_space = {
>     "lr": tune.loguniform(0.001, 0.1),  
> }  # TODO: Fill me out.
> num_samples = 20  # TODO: Fill me out.
> 
> ray.shutdown()  # Restart Ray defensively in case the ray connection is lost. 
> ray.init(ignore_reinit_error=True)
> 
> ! rm -rf ~/ray_results/train_on_original
> 
> 
> 
> analysis = tuner.run(
>      tune_on_train, 
>      verbose=1, 
>      config=hyperparameter_space,
>      num_samples=num_samples)
> 
>  assert len(analysis.trials) == 20, "Did you set the correct number of samples?"

Then I tried with with_parameters(), but didn’t find the right way.

Hi @PainkillerD,

the problem here seems to be that the whole dataset is serialized in the training function. You are right that tune.with_parameters is a good way to mitigate the issue.

Can you paste (parts of) the tune_on_train function, and can you post how you tried to use tune.with_parameters?

It would be amazing if you could also link to the part of the documentation where you looked this up. This will help us to improve the docs so other users don’t run into the same problem in the future.

  • This is the code for the tune_on_train function:

def tune_on_train(config):
x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0

y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)

model = create_model(learning_rate=config['lr'])
checkpoint_callback = ModelCheckpoint(
    "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)

callbacks = [checkpoint_callback, TuneReporterCallback()]

model.fit(x_train,y_train,
          batch_size=64,epochs=10,
          validation_split=0.2,
          callbacks = [checkpoint_callback])
return model
  • Because the error mentions "grpc_message":"Sent message larger than max (798785683 vs. 536870912)". So I try to change the maximum value in the prompt by with_parameters. But dont’ know how to do it.

There are two ways you can resolve the issue. The simplest one is to just move the data loading into the training function:

def tune_on_train(config):
    cifar10 = tf.keras.datasets.cifar10
    (trainx, trainy), (testx, testy) = cifar10.load_data()

    x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0

    y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)

    model = create_model(learning_rate=config['lr'])
    checkpoint_callback = ModelCheckpoint(
        "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)

    callbacks = [checkpoint_callback, TuneReporterCallback()]

    model.fit(x_train,y_train,
              batch_size=64,epochs=10,
              validation_split=0.2,
              callbacks = [checkpoint_callback])
    return model

This way the data will only be loaded on training time and the training function itself remains small.

The disadvantage here is that every trial will load the data separately, so this would take more time if it’s stored e.g. on a slower storage. For CIFAR it should be fine, but if you want to do this more efficiently, here is how to do it with tune.with_parameters:

def tune_on_train(config, data):
    (trainx, trainy), (testx, testy) = data

    x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0

    y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)

    model = create_model(learning_rate=config['lr'])
    checkpoint_callback = ModelCheckpoint(
        "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)

    callbacks = [checkpoint_callback, TuneReporterCallback()]

    model.fit(x_train,y_train,
              batch_size=64,epochs=10,
              validation_split=0.2,
              callbacks = [checkpoint_callback])
    return model

cifar10 = tf.keras.datasets.cifar10
cifar_data = cifar10.load_data()

# ...
analysis = tune.run(
    tune.with_parameters(tune_on_train, data=cifar_data),
    # ...
)

Here you load the cifar data into a variable cifar_data and pass it to tune.with_parameters which will serialize it once into the Ray object store.
In the trainable you then separate it into the train/text X/Y variables at the start of the function.

Thank you for the two solutions you proposed.
Unfortunately, I tried both of the ways you mentioned, but both failed.
The modified code for the first method is as follows:

import numpy as np
np.random.seed(0)

import tensorflow as tf
try:
    tf.get_logger().setLevel('INFO')
except Exception as exc:
    print(exc)
import warnings
warnings.simplefilter("ignore")

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense

from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.callbacks import ModelCheckpoint

import ray
from ray import tune
from ray.tune.examples.utils import get_iris_data

import inspect
import pandas as pd
def create_model(learning_rate):
    model=tf.keras.Sequential()
    
    model.add(tf.keras.layers.Conv2D(16, kernel_size=(3, 3), padding='same', activation=tf.nn.relu,
                                 input_shape=x_train.shape[1:]))
    model.add(tf.keras.layers.Conv2D(16, kernel_size=(3, 3), padding='same', activation=tf.nn.relu))
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation=tf.nn.relu,
                                     input_shape=x_train.shape[1:]))
    model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation=tf.nn.relu))
    model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.Dropout(0.2))

    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.2))
    model.add(Dense(10, activation='softmax'))
    
    model.summary()
    
    optimizer = SGD(lr=learning_rate)
    
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])
    return model
import tensorflow.keras as keras
from ray import tune


class TuneReporterCallback(keras.callbacks.Callback):
    """Tune Callback for Keras.
    
    The callback is invoked every epoch.
    """

    def __init__(self, logs={}):
        self.iteration = 0
        super(TuneReporterCallback, self).__init__()

    def on_epoch_end(self, batch, logs={}):
        self.iteration += 1
        tune.report(keras_info=logs, mean_accuracy=logs.get("accuracy"), mean_loss=logs.get("loss"))
def tune_on_train(config):
    
    cifar10 = tf.keras.datasets.cifar10

    (trainx, trainy), (testx, testy) = cifar10.load_data()

    x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0

    y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)
    
    model = create_model(learning_rate=config['lr'])
    checkpoint_callback = ModelCheckpoint(
        "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)
    
    callbacks = [checkpoint_callback, TuneReporterCallback()]
    
    model.fit(x_train,y_train,
              batch_size=64,epochs=10,
              validation_split=0.2,
              callbacks = [checkpoint_callback])
    return model
# This seeds the hyperparameter sampling.
import numpy as np; np.random.seed(5)  
hyperparameter_space = {
    "lr": tune.loguniform(0.001, 0.1),  
} 
num_samples = 20 

ray.shutdown()  # Restart Ray defensively in case the ray connection is lost. 
ray.init(ignore_reinit_error=True)
# We clean out the logs before running for a clean visualization later.
! rm -rf ~/ray_results/train_on_original

analysis = tune.run(
    tune.with_parameters(tune_on_train, data=cifar_data),
    tune_on_train, 
    verbose=1, 
    config=hyperparameter_space,
    num_samples=num_samples)

assert len(analysis.trials) == 20, "Did you set the correct number of samples?"

The above is the code changed in the first way, but the following error occurs at run time:

TuneError                                 Traceback (most recent call last)
/tmp/ipykernel_278/1580312215.py in <cell line: 15>()
     13 # tuner = tune.Tuner(tune.with_parameters(tune_on_train,grpc_message=8800000000))
     14 
---> 15 analysis = tune.run(
     16     tune_on_train,
     17     verbose=1,

~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/tune/tune.py in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, search_alg, scheduler, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, chdir_to_trial_dir, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, trial_executor, raise_on_failed_trial, callbacks, max_concurrent_trials, _experiment_checkpoint_dir, _remote, _remote_string_queue)
    754     if incomplete_trials:
    755         if raise_on_failed_trial and not state["signal"]:
--> 756             raise TuneError("Trials did not complete", incomplete_trials)
    757         else:
    758             logger.error("Trials did not complete: %s", incomplete_trials)

TuneError: ('Trials did not complete', [tune_on_train_6b119_00000, tune_on_train_6b119_00001, tune_on_train_6b119_00002, tune_on_train_6b119_00003, tune_on_train_6b119_00004, tune_on_train_6b119_00005, tune_on_train_6b119_00006, tune_on_train_6b119_00007, tune_on_train_6b119_00008, tune_on_train_6b119_00009, tune_on_train_6b119_00010, tune_on_train_6b119_00011, tune_on_train_6b119_00012, tune_on_train_6b119_00013, tune_on_train_6b119_00014, tune_on_train_6b119_00015, tune_on_train_6b119_00016, tune_on_train_6b119_00017, tune_on_train_6b119_00018, tune_on_train_6b119_00019])

The modified code for the second method is as follows:

def tune_on_train(data,config):
    
    (trainx, trainy), (testx, testy) = data

    x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0

    y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)
    
    model = create_model(learning_rate=config['lr'])
    checkpoint_callback = ModelCheckpoint(
        "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)
    
    callbacks = [checkpoint_callback, TuneReporterCallback()]
    
    model.fit(x_train,y_train,
              batch_size=64,epochs=10,
              validation_split=0.2,
              callbacks = [checkpoint_callback])
    return model

cifar10 = tf.keras.datasets.cifar10
cifar_data = cifar10.load_data()

# This seeds the hyperparameter sampling.
import numpy as np; np.random.seed(5)  
hyperparameter_space = {
    "lr": tune.loguniform(0.001, 0.1),  
}  # TODO: Fill me out.
num_samples = 20  # TODO: Fill me out.

ray.shutdown()  # Restart Ray defensively in case the ray connection is lost. 
ray.init(ignore_reinit_error=True)
# We clean out the logs before running for a clean visualization later.
! rm -rf ~/ray_results/train_on_original

analysis = tune.run(
    tune.with_parameters(tune_on_train, data=cifar_data),
    tune_on_train, 
    verbose=1, 
    config=hyperparameter_space,
    num_samples=num_samples)

assert len(analysis.trials) == 20, "Did you set the correct number of samples?"

This error is clearly a syntax error, as it appears that the parameter with_parameters is not included in tune.run
The error is as follows:

TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_2036/3919859050.py in <cell line: 15>()
     13 # tuner = tune.Tuner(tune.with_parameters(tune_on_train,grpc_message=8800000000))
     14 
---> 15 analysis = tune.run(
     16     tune.with_parameters(tune_on_train, data=cifar_data),
     17     tune_on_train,

~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/tune/tune.py in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, search_alg, scheduler, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, chdir_to_trial_dir, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, trial_executor, raise_on_failed_trial, callbacks, max_concurrent_trials, _experiment_checkpoint_dir, _remote, _remote_string_queue)
    631         search_alg=search_alg,
    632         scheduler=scheduler,
--> 633         local_checkpoint_dir=experiments[0].checkpoint_dir,
    634         remote_checkpoint_dir=experiments[0].remote_checkpoint_dir,
    635         sync_config=sync_config,

~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/tune/experiment/experiment.py in checkpoint_dir(self)
    441             return self._experiment_checkpoint_dir
    442         assert self.local_dir
--> 443         return os.path.join(self.local_dir, self.dir_name)
    444 
    445     @property

~/miniconda3/envs/myconda/lib/python3.9/posixpath.py in join(a, *p)
     88                 path += sep + b
     89     except (TypeError, AttributeError, BytesWarning):
---> 90         genericpath._check_arg_types('join', a, *p)
     91         raise
     92     return path

~/miniconda3/envs/myconda/lib/python3.9/genericpath.py in _check_arg_types(funcname, *args)
    150             hasbytes = True
    151         else:
--> 152             raise TypeError(f'{funcname}() argument must be str, bytes, or '
    153                             f'os.PathLike object, not {s.__class__.__name__!r}') from None
    154     if hasstr and hasbytes:

TypeError: join() argument must be str, bytes, or os.PathLike object, not 'function'

Thank you again for your two valuable suggestions.

In the first solution, there are two small errors.

First, the create_model method still refers to x_train.shape[1:], which is not available.
The solution is to use def create_model(learning_rate, input_shape), replace x_train.shape[1:] with input_shape in the method and call model = create_model(learning_rate=config['lr']), input_shape=x_train.shape[1:]) in the training function.

The second mistake is that you are passing in tune_on_train twice to tune.run. Instead it should be:

analysis = tune.run(
    tune_on_train,
    verbose=1, 
    config=hyperparameter_space,
    num_samples=num_samples)

for the first solution or

analysis = tune.run(
    tune.with_parameters(tune_on_train, data=cifar_data),
    verbose=1, 
    config=hyperparameter_space,
    num_samples=num_samples)

for the second solution.

In the second solution I made a small mistake, the data argument should come second, so it should be

def tune_on_train(config, data):