[Predicting] TensorflowPredictor throws warning that parallelisation will be reduced to 1

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hi,
I have a Tensorflow model that I have trained using a Ray AIR TensorflowTrainer. I want to make predictions using a Ray AIR TensorflowPredictor. It works but I get the following warning message when using BatchPredictor.from_checkpoint([...]).predict():

2023-01-23 11:02:53,625 WARNING compute.py:520 -- `batch_size` is set to 4096, which reduces parallelism from 14 to 1. If the performance is worse than expected, this may indicate that the batch size is too large or the input block size is too small. To reduce batch size, consider decreasing `batch_size` or use the default in `map_batches`. To increase input block size, consider decreasing `parallelism` in read.

I have tried changing batch size and converting the dataset to a DatasetPipeline but I still get the same warning…

Here is my prediction function, where I basically follow the documention for making predictions using Ray AIR :

from ray.train.tensorflow import TensorflowPredictor
from ray.train.batch_predictor import BatchPredictor

def predict(self, df, threshold = 0.8, cv = False):
    df = df.window(blocks_per_window = 1) # Transform into a DatasetPipeline to stream predictions

    df = self._preprocessor.preprocessors[0].transform(df) # Min-Max scaling of features only
    # Define predictor
    self._predictor = BatchPredictor.from_checkpoint(
        self._model_ckpt, # Best checkpoint kept after training
        TensorflowPredictor, # The predictor to use according to the model backend used for training
        model_definition = lambda : build_model(self.classifier, self._nb_classes, len(self.kmers)) # Model definition function with parameters passed with lambda function
    )
    # Make predictions
    predictions = self._predictor.predict(
        data=df,
    )

    return predictions # Return predictions for further use

Hi @nicdemon you’re referring to the default batch size here: batch_predictor.py - ray-project/ray - Sourcegraph

Can you try passing in the batch size to your predict call explicitly ? Also, I think in earlier versions of Ray 2.1 we had issues of not actually respecting this batch size but should be fixed on nightly, do you mind giving it a try on that too ?

Hi @Jiao_Dong , thanks for your answer.
I have tried to pass a batch size before posting the question but it did not change anything.
Also for Ray version, I am using 2.2.0, is this issue you are talking still active?

I talked with two Anyscale employees on monday (I don’t remember their names, sorry) and they told me I could try to call predict_pipelined directly from the BatchPredictor. I will try this today and come back with the results.

I have been able to resolve this problem by using the predict_pipelined method from BatchPredictor and passing it a batch_size explicitely.

Yep we’re pushing to make pipelined / streaming ingest as the default mode over bulk for better efficiency. I’m afraid Ray 2.2 also had the issue of not using correct batch size in bulk mode, but using nightly or pipelined should be the way to go to resolve it.

1 Like

Hi @Jiao_Dong,
This resolved my problem, but I encountered another problem following this.
I get a error (221 Mb > FUNCTION_SIZE_ERROR_THRESHOLD=95) when extracting data from the DatasetPipeline generated by the BatchPredictor.predict_pipelined() method.

After reading documentation and looking around on the discussion forum, I feel this may be related to pickling of the UDF function I use right after to post-process the predictions. I have tried the method described in this post and it returned 221 Mb which seems to be more than a coincidence.

I have also tried to put by UDFs for mapping transformation outside of the class and to remove calls to pd.DataFrame() as mentionned here to no avail.

So I am wondering what I am doing wrong and if I should use another strategy than mapping batches to transform my probability results into classes?

Here are the postprocessing methods used after calling BatchMapper.predict_pipelined():

from ray.data.preprocessors import BatchMapper
import pandas as pd
import numpy as np

def prob_2_cls(self, predictions, threshold):
    # I have two possibilities for the bumber of classes : 2 or ~48 000
    if self._nb_classes == 2:
        fn = lambda x : map_predicted_label_binary(x, threshold)
    else:
        fn = lambda x : map_predicted_label_multiclass(x, threshold)

    # Define a BatchMapper according to the number of classes
    mapper = BatchMapper(
        fn,
        batch_size=self.batch_size,
        batch_format='pandas'
    )
    predict = mapper.transform(predictions) # Make the predictions

    arr = []
    # Iterate over DatasetPipeline to get predictions and put them in a list
    for batch in predict.iter_batches(batch_size=self.batch_size):
        arr.extend(np.array(batch))

    # Flatten the list of predictions to return it
    return np.ravel(arr)

# UDF to map labels to predictions according to their probability of classification in binary setting
def map_predicted_label_binary(df : pd.DataFrame, threshold : float) -> pd.DataFrame:
    # Define upper/lower thresholds
    lower_threshold = 0.5 - (threshold * 0.5)
    upper_threshold = 0.5 + (threshold * 0.5)
    df['proba'] = df['predictions'] # Define a probability column
    df['predicted_label'] = np.full(len(df), -1) # Map predicted label to -1
    df.loc[df['proba'] >= upper_threshold, 'predicted_label'] = 1 # Map predicted label to 1 if probability is above upper threshold
    df.loc[df['proba'] <= lower_threshold, 'predicted_label'] = 0 # Map predicted label to 0 if probability is below lower threshold
    return df

# UDF to map labels to predictions according to their probability of classification in multiclass setting
def map_predicted_label_multiclass(df : pd.DataFrame, threshold : float) -> pd.DataFrame:
    df['best_proba'] = [df['predictions'][i][np.argmax(df['predictions'][i])] for i in range(len(df))] # Get the highest probability of classification
    df['predicted_label'] = [np.argmax(df['predictions'][i]) for i in range(len(df))] # Get the index of the highest probability of classification
    df.loc[df['best_proba'] < threshold, 'predicted_label'] = -1 # Map predicted label to -1 if probability is below threshold
    return df

I have tried creating UDFs without mapping them by using a ray method and I still get the same error message. This with the error trace makes me think that this might be caused by the DatasetPipeline execution.

Here’s the error trace :

Stage 0: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:00<00:00, 18.17it/s]Traceback (most recent call last):                                                                        | 0/1 [00:00<?, ?it/s]  File "/usr/local/lib/python3.8/dist-packages/joblib/parallel.py", line 862, in dispatch_one_batch
    tasks = self._ready_batches.get(block=False)
  File "/usr/lib/python3.8/queue.py", line 167, in get
    raise Empty
_queue.Empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/bin/Caribou_classification_train_cv.py", line 83, in <module>
    bacteria_classification_train_cv(opt)
  File "/usr/local/bin/Caribou_classification_train_cv.py", line 50, in bacteria_classification_train_cv
    ClassificationMethods(
  File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 132, in execute_training
    self._train_model(taxa)
  File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 140, in _train_model
    self._multiclass_training(taxa)
  File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 231, in _multiclass_training
    self.models[taxa].train(self._training_datasets, self._database_data, self._cv)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 168, in train
    self._cross_validation(df, df_test, kmers_ds)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 216, in _cross_validation
    y_pred = self.predict(df_test.drop_columns([self.taxa]), cv = True)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 305, in predict
    predictions = self._prob_2_cls(predictions, threshold)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 340, in _prob_2_cls
    predict = Parallel(n_jobs=-1, prefer='threads', verbose=1)(
  File "/usr/local/lib/python3.8/dist-packages/joblib/parallel.py", line 1085, in __call__
    if self.dispatch_one_batch(iterator):
  File "/usr/local/lib/python3.8/dist-packages/joblib/parallel.py", line 873, in dispatch_one_batch
    islice = list(itertools.islice(iterator, big_batch_size))
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 340, in <genexpr>
    predict = Parallel(n_jobs=-1, prefer='threads', verbose=1)(
  File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset_pipeline.py", line 203, in iter_batches
    blocks_owned_by_consumer = self._peek()._plan.execute()._owned_by_consumer
  File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset_pipeline.py", line 1264, in _peek
    self._first_dataset = next(self._dataset_iter)
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/pipeline_executor.py", line 131, in __next__
    result = self._stages[i].result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 437, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/pipeline_executor.py", line 139, in <lambda>
    lambda r, fn: pipeline_stage(lambda: fn(r)),
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/pipeline_executor.py", line 22, in pipeline_stage
    return fn().fully_executed()
  File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset.py", line 3778, in fully_executed
    self._plan.execute(force_read=True)
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/plan.py", line 314, in execute
    blocks, stage_info = stage(
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/plan.py", line 678, in __call__
    blocks = compute._apply(
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/compute.py", line 336, in _apply
    workers = [
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/compute.py", line 337, in <listcomp>
    BlockWorker.remote(*fn_constructor_args, **fn_constructor_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 529, in remote
    return self._remote(args=args, kwargs=kwargs, **self._default_options)
  File "/usr/local/lib/python3.8/dist-packages/ray/util/tracing/tracing_helper.py", line 387, in _invocation_actor_class_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/ray/actor.py", line 846, in _remote
    worker.function_actor_manager.export_actor_class(
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/function_manager.py", line 479, in export_actor_class
    check_oversized_function(
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/utils.py", line 810, in check_oversized_function
    raise ValueError(error)
ValueError: The actor BlockWorker is too large (221 MiB > FUNCTION_SIZE_ERROR_THRESHOLD=95 MiB). Check that its definition is not implicitly capturing a large array or other object in scope. Tip: use ray.put() to put large objects in the Ray object store.
Stage 1:   0%|                                                                                            | 0/1 [00:04<?, ?it/s]Stage 0: 100%|β–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆβ–ˆ| 1/1 [00:04<00:00,  4.57s/it]

After further investigation I found out that my saved checkpoint on disk is 215 Mb so maybe the problem is linked to this?