Hi @Jiao_Dong,
This resolved my problem, but I encountered another problem following this.
I get a error (221 Mb > FUNCTION_SIZE_ERROR_THRESHOLD=95)
when extracting data from the DatasetPipeline
generated by the BatchPredictor.predict_pipelined()
method.
After reading documentation and looking around on the discussion forum, I feel this may be related to pickling of the UDF function I use right after to post-process the predictions. I have tried the method described in this post and it returned 221 Mb
which seems to be more than a coincidence.
I have also tried to put by UDFs for mapping transformation outside of the class and to remove calls to pd.DataFrame()
as mentionned here to no avail.
So I am wondering what I am doing wrong and if I should use another strategy than mapping batches to transform my probability results into classes?
Here are the postprocessing methods used after calling BatchMapper.predict_pipelined()
:
from ray.data.preprocessors import BatchMapper
import pandas as pd
import numpy as np
def prob_2_cls(self, predictions, threshold):
# I have two possibilities for the bumber of classes : 2 or ~48 000
if self._nb_classes == 2:
fn = lambda x : map_predicted_label_binary(x, threshold)
else:
fn = lambda x : map_predicted_label_multiclass(x, threshold)
# Define a BatchMapper according to the number of classes
mapper = BatchMapper(
fn,
batch_size=self.batch_size,
batch_format='pandas'
)
predict = mapper.transform(predictions) # Make the predictions
arr = []
# Iterate over DatasetPipeline to get predictions and put them in a list
for batch in predict.iter_batches(batch_size=self.batch_size):
arr.extend(np.array(batch))
# Flatten the list of predictions to return it
return np.ravel(arr)
# UDF to map labels to predictions according to their probability of classification in binary setting
def map_predicted_label_binary(df : pd.DataFrame, threshold : float) -> pd.DataFrame:
# Define upper/lower thresholds
lower_threshold = 0.5 - (threshold * 0.5)
upper_threshold = 0.5 + (threshold * 0.5)
df['proba'] = df['predictions'] # Define a probability column
df['predicted_label'] = np.full(len(df), -1) # Map predicted label to -1
df.loc[df['proba'] >= upper_threshold, 'predicted_label'] = 1 # Map predicted label to 1 if probability is above upper threshold
df.loc[df['proba'] <= lower_threshold, 'predicted_label'] = 0 # Map predicted label to 0 if probability is below lower threshold
return df
# UDF to map labels to predictions according to their probability of classification in multiclass setting
def map_predicted_label_multiclass(df : pd.DataFrame, threshold : float) -> pd.DataFrame:
df['best_proba'] = [df['predictions'][i][np.argmax(df['predictions'][i])] for i in range(len(df))] # Get the highest probability of classification
df['predicted_label'] = [np.argmax(df['predictions'][i]) for i in range(len(df))] # Get the index of the highest probability of classification
df.loc[df['best_proba'] < threshold, 'predicted_label'] = -1 # Map predicted label to -1 if probability is below threshold
return df