How to debug ray.rpc.InternalKVPutRequest error?

Observed

I am getting the following error which I cannot reproduce with CIFAR10 data:

My pipeline is basically identical to the tutorial cifar10 example, which runs fine.

I simply refactored this example (which does local training) for my own model with training, testing, and distributed data-loading functions and ran it on my data which generates the following error:

Can someone point me toward [e.g. Tuner] parameters that will alleviate this protobuf bottleneck?

14:51:32.005 [error] Disposing session as kernel process died ExitCode: undefined, Reason: WARNING: All log messages before absl::InitializeLog() is called are written to STDERR
E0000 00:00:1708786288.030497  613135 message_lite.cc:445] ray.rpc.InternalKVPutRequest exceeded maximum protobuf size of 2GB: 12353883359
E0224 14:51:28.031024445  613135 call_op_set.h:319]                    ASSERTION FAILED: serializer_(msg_).ok()
*** SIGABRT received at time=1708786288 on cpu 28 ***
PC: @     0x7f9ce8d249fc  (unknown)  pthread_kill
    @     0x7f9ce8cd0520  (unknown)  (unknown)
[2024-02-24 14:51:28,039 E 613135 613135] logging.cc:361: *** SIGABRT received at time=1708786288 on cpu 28 ***
[2024-02-24 14:51:28,039 E 613135 613135] logging.cc:361: PC: @     0x7f9ce8d249fc  (unknown)  pthread_kill
[2024-02-24 14:51:28,039 E 613135 613135] logging.cc:361:     @     0x7f9ce8cd0520  (unknown)  (unknown)
Fatal Python error: Aborted

Stack (most recent call first):
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/experimental/internal_kv.py", line 94 in _internal_kv_put
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103 in wrapper
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/registry.py", line 277 in flush_values
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/registry.py", line 239 in register
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/registry.py", line 112 in register_trainable
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/experiment/experiment.py", line 345 in register_if_needed
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/experiment/experiment.py", line 149 in __init__
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/tune.py", line 772 in run
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/impl/tuner_internal.py", line 628 in _fit_internal
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/impl/tuner_internal.py", line 509 in fit
  File "~/miniconda3/envs/myenv/lib/python3.10/site-packages/ray/tune/tuner.py", line 381 in fit
  File "/tmp/ipykernel_613135/2356309687.py", line 24 in main
  File "/tmp/ipykernel_613135/2356309687.py", line 38 in <module>
  File "~/.local/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3550 in run_code
  File "~/.local/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3490 in run_ast_nodes
  File "~/.local/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3308 in run_cell_async
  File "~/.local/lib/python3.10/site-packages/IPython/core/async_helpers.py", line 129 in _pseudo_sync_runner
  File "~/.local/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3103 in _run_cell
  File "~/.local/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3048 in run_cell
  File "~/.local/lib/python3.10/site-packages/ipykernel/zmqshell.py", line 549 in run_cell
  File "~/.local/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 429 in do_execute
  File "~/.local/lib/python3.10/site-packages/ipykernel/kernelbase.py", line 766 in execute_request
  File "~/.local/lib/python3.10/site-packages/ipykernel/kernelbase.py", line 424 in dispatch_shell
  File "~/.local/lib/python3.10/site-packages/ipykernel/kernelbase.py", line 518 in process_one
  File "~/.local/lib/python3.10/site-packages/ipykernel/kernelbase.py", line 529 in dispatch_queue
  File "~/miniconda3/envs/myenv/lib/python3.10/asyncio/events.py", line 80 in _run
  File "~/miniconda3/envs/myenv/lib/python3.10/asyncio/base_events.py", line 1909 in _run_once
  File "~/miniconda3/envs/myenv/lib/python3.10/asyncio/base_events.py", line 603 in run_forever
  File "~/.local/lib/python3.10/site-packages/tornado/platform/asyncio.py", line 205 in start
  File "~/.local/lib/python3.10/site-packages/ipykernel/kernelapp.py", line 739 in start
  File "~/.local/lib/python3.10/site-packages/traitlets/config/application.py", line 1077 in launch_instance
  File "~/.local/lib/python3.10/site-packages/ipykernel_launcher.py", line 17 in <module>
  File "~/miniconda3/envs/myenv/lib/python3.10/runpy.py", line 86 in _run_code
  File "~/miniconda3/envs/myenv/lib/python3.10/runpy.py", line 196 in _run_module_as_main

Extension modules: zmq.backend.cython.context, zmq.backend.cython.message, zmq.backend.cython.socket, zmq.backend.cython._device, zmq.backend.cython._poll, zmq.backend.cython._proxy_steerable, zmq.backend.cython._version, zmq.backend.cython.error, zmq.backend.cython.utils, tornado.speedups, psutil._psutil_linux, psutil._psutil_posix, _pydevd_bundle.pydevd_cython, numpy.core._multiarray_umath, numpy.core._multiarray_tests, numpy.linalg._umath_linalg, numpy.fft._pocketfft_internal, numpy.random._common, numpy.random.bit_generator, numpy.random._bounded_integers, numpy.random._mt19937, numpy.random.mtrand, numpy.random._philox, numpy.random._pcg64, numpy.random._sfc64, numpy.random._generator, pandas._libs.tslibs.np_datetime, pandas._libs.tslibs.dtypes, pandas._libs.tslibs.base, pandas._libs.tslibs.nattype, pandas._libs.tslibs.timezones, pandas._libs.tslibs.tzconversion, pandas._libs.tslibs.ccalendar, pandas._libs.tslibs.fields, pandas._libs.tslibs.timedeltas, pandas._libs.tslibs.timestamps, pandas._libs.properties, pandas._libs.tslibs.offsets, pandas._libs.tslibs.parsing, pandas._libs.tslibs.conversion, pandas._libs.tslibs.period, pandas._libs.tslibs.vectorized, pandas._libs.ops_dispatch, pandas._libs.missing, pandas._libs.hashtable, pandas._libs.algos, pandas._libs.interval, pandas._libs.tslib, pandas._libs.lib, pandas._libs.hashing, pyarrow.lib, pyarrow._hdfsio, pandas._libs.ops, numexpr.interpreter, pyarrow._compute, pandas._libs.arrays, pandas._libs.index, pandas._libs.join, pandas._libs.sparse, pandas._libs.reduction, pandas._libs.indexing, pandas._libs.internals, pandas._libs.writers, pandas._libs.window.aggregations, pandas._libs.window.indexers, pandas._libs.reshape, pandas._libs.tslibs.strptime, pandas._libs.groupby, pandas._libs.testing, pandas._libs.parsers, pandas._libs.json, lz4._version, lz4.frame._frame, matplotlib._c_internal_utils, PIL._imaging, matplotlib._path, kiwisolver._cext, matplotlib._image, sklearn.__check_build._check_build, scipy._lib._ccallback_c, scipy.sparse._sparsetools, _csparsetools, scipy.sparse._csparsetools, scipy.sparse.linalg._isolve._iterative, scipy.linalg._fblas, scipy.linalg._flapack, scipy.linalg.cython_lapack, scipy.linalg._cythonized_array_utils, scipy.linalg._solve_toeplitz, scipy.linalg._decomp_lu_cython, scipy.linalg._matfuncs_sqrtm_triu, scipy.linalg.cython_blas, scipy.linalg._matfuncs_expm, scipy.linalg._decomp_update, scipy.linalg._flinalg, scipy.sparse.linalg._dsolve._superlu, scipy.sparse.linalg._eigen.arpack._arpack, scipy.sparse.csgraph._tools, scipy.sparse.csgraph._shortest_path, scipy.sparse.csgraph._traversal, scipy.sparse.csgraph._min_spanning_tree, scipy.sparse.csgraph._flow, scipy.sparse.csgraph._matching, scipy.sparse.csgraph._reordering, scipy.special._ufuncs_cxx, scipy.special._ufuncs, scipy.special._specfun, scipy.special._comb, scipy.special._ellip_harm_2, scipy.spatial._ckdtree, scipy._lib.messagestream, scipy.spatial._qhull, scipy.spatial._voronoi, scipy.spatial._distance_wrap, scipy.spatial._hausdorff, scipy.spatial.transform._rotation, scipy.ndimage._nd_image, _ni_label, scipy.ndimage._ni_label, scipy.optimize._minpack2, scipy.optimize._group_columns, scipy.optimize._trlib._trlib, scipy.optimize._lbfgsb, _moduleTNC, scipy.optimize._moduleTNC, scipy.optimize._cobyla, scipy.optimize._slsqp, scipy.optimize._minpack, scipy.optimize._lsq.givens_elimination, scipy.optimize._zeros, scipy.optimize.__nnls, scipy.optimize._highs.cython.src._highs_wrapper, scipy.optimize._highs._highs_wrapper, scipy.optimize._highs.cython.src._highs_constants, scipy.optimize._highs._highs_constants, scipy.linalg._interpolative, scipy.optimize._bglu_dense, scipy.optimize._lsap, scipy.optimize._direct, scipy.integrate._odepack, scipy.integrate._quadpack, scipy.integrate._vode, scipy.integrate._dop, scipy.integrate._lsoda, scipy.special.cython_special, scipy.stats._stats, scipy.stats.beta_ufunc, scipy.stats._boost.beta_ufunc, scipy.stats.binom_ufunc, scipy.stats._boost.binom_ufunc, scipy.stats.nbinom_ufunc, scipy.stats._boost.nbinom_ufunc, scipy.stats.hypergeom_ufunc, scipy.stats._boost.hypergeom_ufunc, scipy.stats.ncf_ufunc, scipy.stats._boost.ncf_ufunc, scipy.stats.ncx2_ufunc, scipy.stats._boost.ncx2_ufunc, scipy.stats.nct_ufunc, scipy.stats._boost.nct_ufunc, scipy.stats.skewnorm_ufunc, scipy.stats._boost.skewnorm_ufunc, scipy.stats.invgauss_ufunc, scipy.stats._boost.invgauss_ufunc, scipy.interpolate._fitpack, scipy.interpolate.dfitpack, scipy.interpolate._bspl, scipy.interpolate._ppoly, scipy.interpolate.interpnd, scipy.interpolate._rbfinterp_pythran, scipy.interpolate._rgi_cython, scipy.stats._biasedurn, scipy.stats._levy_stable.levyst, scipy.stats._stats_pythran, scipy._lib._uarray._uarray, scipy.stats._statlib, scipy.stats._sobol, scipy.stats._qmc_cy, scipy.stats._mvn, scipy.stats._rcont.rcont, sklearn.utils._isfinite, sklearn.utils.murmurhash, sklearn.utils._openmp_helpers, sklearn.metrics.cluster._expected_mutual_info_fast, sklearn.utils._logistic_sigmoid, sklearn.utils.sparsefuncs_fast, sklearn.preprocessing._csr_polynomial_expansion, sklearn.preprocessing._target_encoder_fast, sklearn.metrics._dist_metrics, sklearn.metrics._pairwise_distances_reduction._datasets_pair, sklearn.utils._cython_blas, sklearn.metrics._pairwise_distances_reduction._base, sklearn.metrics._pairwise_distances_reduction._middle_term_computer, sklearn.utils._heap, sklearn.utils._sorting, sklearn.metrics._pairwise_distances_reduction._argkmin, sklearn.metrics._pairwise_distances_reduction._argkmin_classmode, sklearn.utils._vector_sentinel, sklearn.metrics._pairwise_distances_reduction._radius_neighbors, sklearn.metrics._pairwise_fast, sklearn.utils._fast_dict, sklearn.cluster._hierarchical_fast, sklearn.cluster._k_means_common, sklearn.cluster._k_means_elkan, sklearn.cluster._k_means_lloyd, sklearn.cluster._k_means_minibatch, sklearn.neighbors._partition_nodes, sklearn.neighbors._ball_tree, sklearn.neighbors._kd_tree, sklearn.utils._random, sklearn.utils._seq_dataset, sklearn.linear_model._cd_fast, sklearn._loss._loss, sklearn.utils.arrayfuncs, sklearn.svm._liblinear, sklearn.svm._libsvm, sklearn.svm._libsvm_sparse, sklearn.utils._weight_vector, sklearn.linear_model._sgd_fast, sklearn.linear_model._sag_fast, sklearn.decomposition._online_lda_fast, sklearn.decomposition._cdnmf_fast, sklearn.cluster._dbscan_inner, sklearn.cluster._hdbscan._tree, sklearn.cluster._hdbscan._linkage, sklearn.cluster._hdbscan._reachability, sklearn._isotonic, sklearn.tree._utils, sklearn.tree._tree, sklearn.tree._splitter, sklearn.tree._criterion, sklearn.neighbors._quad_tree, sklearn.manifold._barnes_hut_tsne, sklearn.manifold._utils, _brotli, charset_normalizer.md, simplejson._speedups, msgpack._cmsgpack, yaml._yaml, pyBigWig, sklearn.feature_extraction._hashing_fast, torch._C, torch._C._fft, torch._C._linalg, torch._C._nested, torch._C._nn, torch._C._sparse, torch._C._special, gmpy2.gmpy2, h5py._errors, h5py.defs, h5py._objects, h5py.h5, h5py.utils, h5py.h5t, h5py.h5s, h5py.h5ac, h5py.h5p, h5py.h5r, h5py._proxy, h5py._conv, h5py.h5z, h5py.h5a, h5py.h5d, h5py.h5ds, h5py.h5g, h5py.h5i, h5py.h5f, h5py.h5fd, h5py.h5pl, h5py.h5o, h5py.h5l, h5py._selector, numba.core.typeconv._typeconv, numba._helperlib, numba._dynfunc, numba._dispatcher, numba.core.runtime._nrt_python, numba.np.ufunc._internal, numba.experimental.jitclass._box, cytoolz.utils, cytoolz.itertoolz, cytoolz.functoolz, cytoolz.dicttoolz, cytoolz.recipes, markupsafe._speedups, scipy.fftpack.convolve, igraph._igraph, leidenalg._c_leiden, louvain._c_louvain, _cffi_backend, scipy.cluster._vq, scipy.cluster._hierarchy, scipy.cluster._optimal_leaf_ordering, google.protobuf.pyext._message, setproctitle, uvloop.loop, ray._raylet, pyarrow._fs, pyarrow._hdfs, pyarrow._gcsfs, pyarrow._s3fs, pyarrow._parquet, sqlalchemy.cyextension.collections, sqlalchemy.cyextension.immutabledict, sqlalchemy.cyextension.processors, sqlalchemy.cyextension.resultproxy, sqlalchemy.cyextension.util, greenlet._greenlet, fontTools.misc.bezierTools, lxml._elementpath, lxml.etree, fontTools.varLib.iup, statsmodels.robust._qn, scipy.signal._sigtools, scipy.signal._max_len_seq_inner, scipy.signal._upfirdn_apply, scipy.signal._spline, scipy.signal._sosfilt, scipy.signal._spectral, scipy.signal._peak_finding_utils, statsmodels.tsa._innovations, statsmodels.nonparametric._smoothers_lowess, statsmodels.nonparametric.linbin, statsmodels.tsa.statespace._smoothers._conventional, statsmodels.tsa.statespace._smoothers._univariate, statsmodels.tsa.statespace._smoothers._univariate_diffuse, statsmodels.tsa.statespace._smoothers._classical, statsmodels.tsa.statespace._smoothers._alternative, statsmodels.tsa.statespace._kalman_smoother, statsmodels.tsa.statespace._filters._conventional, statsmodels.tsa.statespace._filters._univariate, statsmodels.tsa.statespace._filters._univariate_diffuse, statsmodels.tsa.statespace._filters._inversions, statsmodels.tsa.statespace._kalman_filter, statsmodels.tsa.statespace._tools, statsmodels.tsa.statespace._representation, statsmodels.tsa.statespace._initialization, statsmodels.tsa.statespace._simulation_smoother, statsmodels.tsa.statespace._cfa_simulation_smoother, statsmodels.tsa.innovations._arma_innovations, statsmodels.tsa.exponential_smoothing._ets_smooth, statsmodels.tsa.stl._stl, statsmodels.tsa.holtwinters._exponential_smoothers, statsmodels.tsa.regime_switching._hamilton_filter, statsmodels.tsa.regime_switching._kim_smoother, multidict._multidict, yarl._quoting_c, aiohttp._helpers, aiohttp._http_writer, aiohttp._http_parser, aiohttp._websocket, frozenlist._frozenlist, grpc._cython.cygrpc (total: 354)

14:51:32.006 [info] Dispose Kernel process 613135.
14:51:32.093 [info] End cell 30 execution after -1708786259.061s, completed @ undefined, started @ 1708786259061

Expected

Something like:

(train_cifar pid=623607) [1,  2000] loss: 1.905
(train_cifar pid=623607) Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/vscode/ray_results/train_cifar_2024-02-24_15-27-32/train_cifar_deaf7838_1_batch_size=16,l1=fn_ph_b4e22907,l2=fn_ph_cdfc0d33,lr=0.0056,smoke_test=False_2024-02-24_15-27-32/checkpoint_000000)
(train_cifar pid=623684) [1,  4000] loss: 1.170 [repeated 2x across cluster]
(train_cifar pid=623607) Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/vscode/ray_results/train_cifar_2024-02-24_15-27-32/train_cifar_deaf7838_1_batch_size=16,l1=fn_ph_b4e22907,l2=fn_ph_cdfc0d33,lr=0.0056,smoke_test=False_2024-02-24_15-27-32/checkpoint_000001)
(train_cifar pid=623684) [1,  8000] loss: 0.585 [repeated 3x across cluster]
(train_cifar pid=623684) [1, 12000] loss: 0.391 [repeated 2x across cluster]
(train_cifar pid=623684) [1, 16000] loss: 0.293 [repeated 2x across cluster]
(train_cifar pid=623684) [1, 20000] loss: 0.234 [repeated 2x across cluster]
(train_cifar pid=623684) Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/vscode/ray_results/train_cifar_2024-02-24_15-27-32/train_cifar_0e2f003d_2_batch_size=2,l1=fn_ph_b4e22907,l2=fn_ph_cdfc0d33,lr=0.0345,smoke_test=False_2024-02-24_15-27-35/checkpoint_000000)
2024-02-24 15:28:30,431	INFO tune.py:1042 -- Total run time: 57.82 seconds (57.79 seconds for the tuning loop).
Best trial config: {'l1': 16, 'l2': 8, 'lr': 0.005595481308272002, 'batch_size': 16, 'smoke_test': False}
Best trial final validation loss: 1.4832090266227722
Best trial final validation accuracy: 0.4614
Files already downloaded and verified
Files already downloaded and verified
Best trial test set accuracy: 0.4699

My main func:

from ray.tune.search.optuna import OptunaSearch
SMOKE_TEST = False
def main(config, num_samples=10, max_num_epochs=10, gpus_per_trial=1, smoke_test=False):
    scheduler = ASHAScheduler(
            max_t=max_num_epochs,
            grace_period=1,
            reduction_factor=2)
    algo = OptunaSearch()

    tuner = tune.Tuner(
        tune.with_resources(
            tune.with_parameters(train_scBPnet),
            resources={"cpu": 2, "gpu": gpus_per_trial}
        ),
        tune_config=tune.TuneConfig(
            metric="loss",
            mode="min",
            scheduler=scheduler,
            num_samples=num_samples,
            search_alg=algo
        ),
        param_space=config,
    )
    results = tuner.fit()
    
    best_result = results.get_best_result("loss", "min")

    print("Best trial config: {}".format(best_result.config))
    print("Best trial final validation mnll loss: {}".format(
        best_result.metrics["mnll_loss"]))
    print("Best trial final validation count corr: {}".format(
        best_result.metrics["count_corr"]))
    print("Best trial final validation profile corr: {}".format(
        best_result.metrics["prof_corr"]))

    test_best_model(best_result, smoke_test=smoke_test)

main(config,num_samples=2, max_num_epochs=2, gpus_per_trial=0, smoke_test=SMOKE_TEST)

CIFAR10 main func:

from ray.tune.search.optuna import OptunaSearch
# Set this to True for a smoke test that runs with a small synthetic dataset.
SMOKE_TEST = False

def main(num_samples=10, max_num_epochs=10, gpus_per_trial=2, smoke_test=False):
    config = {
        "l1": tune.sample_from(lambda _: 2 ** np.random.randint(2, 9)),
        "l2": tune.sample_from(lambda _: 2 ** np.random.randint(2, 9)),
        "lr": tune.loguniform(1e-4, 1e-1),
        "batch_size": tune.choice([2, 4, 8, 16]),
        "smoke_test": smoke_test,
    }
    scheduler = ASHAScheduler(
        max_t=max_num_epochs,
        grace_period=1,
        reduction_factor=2)
    algo = OptunaSearch()

    tuner = tune.Tuner(
        tune.with_resources(
            tune.with_parameters(train_cifar),
            resources={"cpu": 2, "gpu": gpus_per_trial}
        ),
        tune_config=tune.TuneConfig(
            metric="loss",
            mode="min",
            scheduler=scheduler,
            num_samples=num_samples,
            search_alg=algo
        ),
        param_space=config,
    )
    results = tuner.fit()
    
    best_result = results.get_best_result("loss", "min")

    print("Best trial config: {}".format(best_result.config))
    print("Best trial final validation loss: {}".format(
        best_result.metrics["loss"]))
    print("Best trial final validation accuracy: {}".format(
        best_result.metrics["accuracy"]))

    test_best_model(best_result, smoke_test=smoke_test)

main(num_samples=2, max_num_epochs=2, gpus_per_trial=0, smoke_test=SMOKE_TEST)

@mkarikom This is most likely because your training function train_scBPnet is capturing some large object in its scope. Could you check if that’s the case or paste the function code so I can take a look?

To pass large objects in, you should attach them via tune.with_parameters instead.

Hi, @justinvyu.

What happened was that my_torch_dataset.__getitem__() indexes my_torch_dataset.my_giant_lexicon_tensor and somehow the lexicon (which is >2GB) was getting buffered in the scheduler instead of living on the worker that instantiated my_torch_dataset (I had been using a distributed data loader strategy).

It turned out that the solution was to simply enumerate the entire torch dataset, and store the whole dataset as config['my_ray_dataset], and pass it to the scheduler:

# Enumerate torch dataset
all_X = []
all_y = []
all_c = []
all_r = []
for idx, (X, y, c, r) in enumerate(my_torch_dataset):
    all_X.append(X)
    all_y.append(y)
    all_c.append(c)
    all_r.append(r)

items = [{'X': X, 'y': y, 'c': c, 'r': r} for X, y, c, r in zip(all_X, all_y, all_c, all_r)]
# Create Ray dataset
config['my_ray_dataset] = ray.data.from_items(items)

The issue I’m facing now is that I need to be able to run a parameter search over the augmentation space of this torch dataset. So now instead of just enumerating a single augmentation scheme, I need to refactor the underlying index-dependent generator as a ray data transformation :upside_down_face: