This is not a contribution.
Versions:
python: 3.9
ray: 2.2.0
tensorflow: 2.8.0
OS: Ubuntu18.04
I was using tune to rewrite a simple tensorflow project and encountered InactiveRpcError showing a property value that exceeded the preset maximum value.
Here’s an example of the errors (produced by the script provided later):
> _InactiveRpcError Traceback (most recent call last)
/tmp/ipykernel_473/2500321790.py in <cell line: 9>()
7
8 ray.shutdown() # Restart Ray defensively in case the ray connection is lost.
----> 9 ray.init(ignore_reinit_error=True)
10 # We clean out the logs before running for a clean visualization later.
11 get_ipython().system(' rm -rf ~/ray_results/train_on_original')
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
106
107 return wrapper
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/worker.py in init(address, num_cpus, num_gpus, resources, object_store_memory, local_mode, ignore_reinit_error, include_dashboard, dashboard_host, dashboard_port, job_config, configure_logging, logging_level, logging_format, log_to_driver, namespace, runtime_env, storage, **kwargs)
1565
1566 for hook in _post_init_hooks:
-> 1567 hook()
1568
1569 node_id = global_worker.core_worker.get_current_node_id()
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/tune/registry.py in flush_values(self)
209 def flush_values(self):
210 for (category, key), value in self._to_flush.items():
--> 211 _internal_kv_put(
212 _make_key(self._prefix, category, key), value, overwrite=True
213 )
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
106
107 return wrapper
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/experimental/internal_kv.py in _internal_kv_put(key, value, overwrite, namespace)
92 and isinstance(overwrite, bool)
93 )
---> 94 return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
95
96
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/gcs_utils.py in wrapper(self, *args, **kwargs)
177 while True:
178 try:
--> 179 return f(self, *args, **kwargs)
180 except grpc.RpcError as e:
181 if remaining_retry <= 0:
~/miniconda3/envs/myconda/lib/python3.9/site-packages/ray/_private/gcs_utils.py in internal_kv_put(self, key, value, overwrite, namespace, timeout)
296 overwrite=overwrite,
297 )
--> 298 reply = self._kv_stub.InternalKVPut(req, timeout=timeout)
299 if reply.status.code == GcsCode.OK:
300 return reply.added_num
~/miniconda3/envs/myconda/lib/python3.9/site-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
944 state, call, = self._blocking(request, timeout, metadata, credentials,
945 wait_for_ready, compression)
--> 946 return _end_unary_response_blocking(state, call, False, None)
947
948 def with_call(self,
~/miniconda3/envs/myconda/lib/python3.9/site-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
847 return state.response
848 else:
--> 849 raise _InactiveRpcError(state)
850
851
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Sent message larger than max (798785647 vs. 536870912)"
debug_error_string = "{"created":"@1671861206.706027819","description":"Error received from peer 172.17.0.2:6379","file":"src/core/lib/surface/call.cc","file_line":952,"grpc_message":"Sent message larger than max (798785647 vs. 536870912)","grpc_status":8}">
Here’s an example of the errors (produced by the script provided later):
> import numpy as np
> np.random.seed(0)
>
> import tensorflow as tf
> try:
> tf.get_logger().setLevel('INFO')
> except Exception as exc:
> print(exc)
> import warnings
> warnings.simplefilter("ignore")
>
> from tensorflow.keras.models import Sequential
> from tensorflow.keras.layers import Dense
>
> from tensorflow.keras.optimizers import SGD, Adam
> from tensorflow.keras.callbacks import ModelCheckpoint
>
> import ray
> from ray import tune
> from ray.tune.examples.utils import get_iris_data
>
> import inspect
> import pandas as pd
> import matplotlib.pyplot as plt
> plt.style.use('ggplot')
> %matplotlib inline
> cifar10 = tf.keras.datasets.cifar10
>
> (trainx, trainy), (testx, testy) = cifar10.load_data()
>
> x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0
>
> y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)
>
> x_train.shape[1:]
> def create_model(learning_rate):
> model=tf.keras.Sequential()
>
> model.add(tf.keras.layers.Conv2D(16, kernel_size=(3, 3), padding='same', activation=tf.nn.relu,
> input_shape=x_train.shape[1:]))
> model.add(tf.keras.layers.Conv2D(16, kernel_size=(3, 3), padding='same', activation=tf.nn.relu))
> model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
> model.add(tf.keras.layers.Dropout(0.2))
>
> model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation=tf.nn.relu,
> input_shape=x_train.shape[1:]))
> model.add(tf.keras.layers.Conv2D(32, kernel_size=(3, 3), padding='same', activation=tf.nn.relu))
> model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
> model.add(tf.keras.layers.Dropout(0.2))
>
> model.add(tf.keras.layers.Flatten())
> model.add(tf.keras.layers.Dropout(0.2))
> model.add(Dense(128, activation='relu'))
> model.add(tf.keras.layers.Dropout(0.2))
> model.add(Dense(10, activation='softmax'))
>
> model.summary()
>
> optimizer = SGD(lr=learning_rate)
>
> model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['sparse_categorical_accuracy'])
> return model
> import tensorflow.keras as keras
> from ray import tune
>
>
> class TuneReporterCallback(keras.callbacks.Callback):
> """Tune Callback for Keras.
>
> The callback is invoked every epoch.
> """
>
> def __init__(self, logs={}):
> self.iteration = 0
> super(TuneReporterCallback, self).__init__()
>
> def on_epoch_end(self, batch, logs={}):
> self.iteration += 1
> tune.report(keras_info=logs, mean_accuracy=logs.get("accuracy"), mean_loss=logs.get("loss"))
> def tune_on_train(config):
> x_train, x_test = tf.cast(trainx, tf.float32) / 255.0, tf.cast(testx, tf.float32) / 255.0
>
> y_train, y_test = tf.cast(trainy, tf.int16), tf.cast(testy, tf.int16)
>
> model = create_model(learning_rate=config['lr'])
> checkpoint_callback = ModelCheckpoint(
> "model_1.h5", monitor='accuracy', save_best_only=True, save_freq=2)
>
> callbacks = [checkpoint_callback, TuneReporterCallback()]
>
> model.fit(x_train,y_train,
> batch_size=64,epochs=10,
> validation_split=0.2,
> callbacks = [checkpoint_callback])
> return model
> import numpy as np; np.random.seed(5)
> hyperparameter_space = {
> "lr": tune.loguniform(0.001, 0.1),
> } # TODO: Fill me out.
> num_samples = 20 # TODO: Fill me out.
>
> ray.shutdown() # Restart Ray defensively in case the ray connection is lost.
> ray.init(ignore_reinit_error=True)
>
> ! rm -rf ~/ray_results/train_on_original
>
>
>
> analysis = tuner.run(
> tune_on_train,
> verbose=1,
> config=hyperparameter_space,
> num_samples=num_samples)
>
> assert len(analysis.trials) == 20, "Did you set the correct number of samples?"
Then I tried with with_parameters()
, but didn’t find the right way.