I am using Ray Tune on a Google Colab TPU large instance. With a small dataset it runs. But with my full size dataset (300 MB), I am getting the fatal error pasted below. The data is stored as a parquet on Google Drive and makes its way into tune.run via a "partial"ing of my training function and a Pandas dataframe.
I cannot tell if there is some setting that I can configure to allow a larger “message” in the grpc module.
Python 3.7
ray[tune] 1.9.0.
Any insight is appreciated.
Thanks,
tf
/usr/local/lib/python3.7/dist-packages/ray/tune/tune.py in run(run_or_experiment, name, metric, mode, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, search_alg, scheduler, keep_checkpoints_num, checkpoint_score_attr, checkpoint_freq, checkpoint_at_end, verbose, progress_reporter, log_to_file, trial_name_creator, trial_dirname_creator, sync_config, export_formats, max_failures, fail_fast, restore, server_port, resume, reuse_actors, trial_executor, raise_on_failed_trial, callbacks, max_concurrent_trials, queue_trials, loggers, _remote)
442 export_formats=export_formats,
443 max_failures=max_failures,
--> 444 restore=restore)
445 else:
446 logger.debug("Ignoring some parameters passed into tune.run.")
/usr/local/lib/python3.7/dist-packages/ray/tune/experiment.py in __init__(self, name, run, stop, time_budget_s, config, resources_per_trial, num_samples, local_dir, sync_config, trial_name_creator, trial_dirname_creator, log_to_file, checkpoint_freq, checkpoint_at_end, keep_checkpoints_num, checkpoint_score_attr, export_formats, max_failures, restore)
111 "checkpointable function. You can specify checkpoints "
112 "within your trainable function.")
--> 113 self._run_identifier = Experiment.register_if_needed(run)
114 self.name = name or self._run_identifier
115
/usr/local/lib/python3.7/dist-packages/ray/tune/experiment.py in register_if_needed(cls, run_object)
256 "No name detected on trainable. Using {}.".format(name))
257 try:
--> 258 register_trainable(name, run_object)
259 except (TypeError, PicklingError) as e:
260 extra_msg = ("Other options: "
/usr/local/lib/python3.7/dist-packages/ray/tune/registry.py in register_trainable(name, trainable, warn)
74 raise TypeError("Second argument must be convertable to Trainable",
75 trainable)
---> 76 _global_registry.register(TRAINABLE_CLASS, name, trainable)
77
78
/usr/local/lib/python3.7/dist-packages/ray/tune/registry.py in register(self, category, key, value)
150 self._to_flush[(category, key)] = pickle.dumps_debug(value)
151 if _internal_kv_initialized():
--> 152 self.flush_values()
153
154 def contains(self, category, key):
/usr/local/lib/python3.7/dist-packages/ray/tune/registry.py in flush_values(self)
173 for (category, key), value in self._to_flush.items():
174 _internal_kv_put(
--> 175 _make_key(self._prefix, category, key), value, overwrite=True)
176 self._to_flush.clear()
177
/usr/local/lib/python3.7/dist-packages/ray/_private/client_mode_hook.py in wrapper(*args, **kwargs)
103 if func.__name__ != "init" or is_client_mode_enabled_by_default:
104 return getattr(ray, func.__name__)(*args, **kwargs)
--> 105 return func(*args, **kwargs)
106
107 return wrapper
/usr/local/lib/python3.7/dist-packages/ray/experimental/internal_kv.py in _internal_kv_put(key, value, overwrite, namespace)
77 overwrite, bool)
78 return global_gcs_client.internal_kv_put(key, value, overwrite,
---> 79 namespace) == 0
80
81
/usr/local/lib/python3.7/dist-packages/ray/_private/gcs_utils.py in wrapper(self, *args, **kwargs)
128 while True:
129 try:
--> 130 return f(self, *args, **kwargs)
131 except grpc.RpcError as e:
132 if remaining_retry <= 0:
/usr/local/lib/python3.7/dist-packages/ray/_private/gcs_utils.py in internal_kv_put(self, key, value, overwrite, namespace)
247 req = gcs_service_pb2.InternalKVPutRequest(
248 key=key, value=value, overwrite=overwrite)
--> 249 reply = self._kv_stub.InternalKVPut(req)
250 if reply.status.code == GcsCode.OK:
251 return reply.added_num
/usr/local/lib/python3.7/dist-packages/grpc/_channel.py in __call__(self, request, timeout, metadata, credentials, wait_for_ready, compression)
944 state, call, = self._blocking(request, timeout, metadata, credentials,
945 wait_for_ready, compression)
--> 946 return _end_unary_response_blocking(state, call, False, None)
947
948 def with_call(self,
/usr/local/lib/python3.7/dist-packages/grpc/_channel.py in _end_unary_response_blocking(state, call, with_call, deadline)
847 return state.response
848 else:
--> 849 raise _InactiveRpcError(state)
850
851
_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Received message larger than max (355752448 vs. 104857600)"
debug_error_string = "{"created":"@1638857019.251890494","description":"Error received from peer ipv4:172.28.0.2:42493","file":"src/core/lib/surface/call.cc","file_line":1063,"grpc_message":"Received message larger than max (355752448 vs. 104857600)","grpc_status":8}"