I am trying to preprocess a large data set of roughly 700m rows x 25 columns but I am met with this error:
ray.exceptions.RayTaskError(AttributeError): e[36mray::reduce()e[39m (pid=318, ip=10.0.0.5)
File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py", line 71, in reduce
return BlockAccessor.for_block(mapper_outputs[0]).aggregate_combined_blocks(
File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/pandas_block.py", line 516, in aggregate_combined_blocks
next_row = next(iter)
File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/heapq.py", line 373, in merge
value = next()
File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/table_block.py", line 194, in __next__
row = outer._get_row(self._cur)
File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/table_block.py", line 159, in _get_row
base_row = self.slice(index, index + 1, copy=copy)
File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/pandas_block.py", line 155, in slice
view.reset_index(drop=True, inplace=True)
AttributeError: 'pyarrow.lib.Table' object has no attribute 'reset_index'
The event that is causing the crash is:
train_set = preprocessor.fit_transform(train_set)
Which follows these steps:
# Create a preprocessor
preprocessor = Chain(StandardScaler(columns=feature_set),
Concatenator(exclude=["SYNDROME"], dtype=np.float32)
)
# apply preprocessor
train_set = preprocessor.fit_transform(train_set)
valid_set = preprocessor.transform(valid_set)
My pyarrow and ray versions are:
pyarrow==12.0.1
and the nightly for py3.9
https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl
My data is generated by:
ray.data.read_parquet(.... )
and I am wondering if it thinks I have a pandas data set rather than pyarrow one. When we use
batch_format
with map_batches
, we get to tell it what kind of data we are dealing with, but I dont see any option for the preprocessor.
Edit:
I also tried several other different dependencies, including the recommended pyarrow==11.0.0
with the latest ray[air]
and ran into the same issues.
I offloaded the feature normalization to Spark and can run things fine now; but the preprocessor scalers do not seem to be able to “scale” with big-ish data.