I have tried creating UDFs without mapping them by using a ray method and I still get the same error message. This with the error trace makes me think that this might be caused by the DatasetPipeline
execution.
Hereβs the error trace :
Stage 0: 100%|ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ| 1/1 [00:00<00:00, 18.17it/s]Traceback (most recent call last): | 0/1 [00:00<?, ?it/s] File "/usr/local/lib/python3.8/dist-packages/joblib/parallel.py", line 862, in dispatch_one_batch
tasks = self._ready_batches.get(block=False)
File "/usr/lib/python3.8/queue.py", line 167, in get
raise Empty
_queue.Empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/bin/Caribou_classification_train_cv.py", line 83, in <module>
bacteria_classification_train_cv(opt)
File "/usr/local/bin/Caribou_classification_train_cv.py", line 50, in bacteria_classification_train_cv
ClassificationMethods(
File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 132, in execute_training
self._train_model(taxa)
File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 140, in _train_model
self._multiclass_training(taxa)
File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 231, in _multiclass_training
self.models[taxa].train(self._training_datasets, self._database_data, self._cv)
File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 168, in train
self._cross_validation(df, df_test, kmers_ds)
File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 216, in _cross_validation
y_pred = self.predict(df_test.drop_columns([self.taxa]), cv = True)
File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 305, in predict
predictions = self._prob_2_cls(predictions, threshold)
File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 340, in _prob_2_cls
predict = Parallel(n_jobs=-1, prefer='threads', verbose=1)(
File "/usr/local/lib/python3.8/dist-packages/joblib/parallel.py", line 1085, in __call__
if self.dispatch_one_batch(iterator):
File "/usr/local/lib/python3.8/dist-packages/joblib/parallel.py", line 873, in dispatch_one_batch
islice = list(itertools.islice(iterator, big_batch_size))
File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 340, in <genexpr>
predict = Parallel(n_jobs=-1, prefer='threads', verbose=1)(
File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset_pipeline.py", line 203, in iter_batches
blocks_owned_by_consumer = self._peek()._plan.execute()._owned_by_consumer
File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset_pipeline.py", line 1264, in _peek
self._first_dataset = next(self._dataset_iter)
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/pipeline_executor.py", line 131, in __next__
result = self._stages[i].result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/pipeline_executor.py", line 139, in <lambda>
lambda r, fn: pipeline_stage(lambda: fn(r)),
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/pipeline_executor.py", line 22, in pipeline_stage
return fn().fully_executed()
File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset.py", line 3778, in fully_executed
self._plan.execute(force_read=True)
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/plan.py", line 314, in execute
blocks, stage_info = stage(
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/plan.py", line 678, in __call__
blocks = compute._apply(
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/compute.py", line 336, in _apply
workers = [
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/compute.py", line 337, in <listcomp>
BlockWorker.remote(*fn_constructor_args, **fn_constructor_kwargs)
File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 529, in remote
return self._remote(args=args, kwargs=kwargs, **self._default_options)
File "/usr/local/lib/python3.8/dist-packages/ray/util/tracing/tracing_helper.py", line 387, in _invocation_actor_class_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 846, in _remote
worker.function_actor_manager.export_actor_class(
File "/usr/local/lib/python3.8/dist-packages/ray/_private/function_manager.py", line 479, in export_actor_class
check_oversized_function(
File "/usr/local/lib/python3.8/dist-packages/ray/_private/utils.py", line 810, in check_oversized_function
raise ValueError(error)
ValueError: The actor BlockWorker is too large (221 MiB > FUNCTION_SIZE_ERROR_THRESHOLD=95 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.
Stage 1: 0%| | 0/1 [00:04<?, ?it/s]Stage 0: 100%|ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ| 1/1 [00:04<00:00, 4.57s/it]