Ray: Preprocessor and PyArrow Crashes

I am trying to preprocess a large data set of roughly 700m rows x 25 columns but I am met with this error:

ray.exceptions.RayTaskError(AttributeError): e[36mray::reduce()e[39m (pid=318, ip=10.0.0.5)
  File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py", line 71, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).aggregate_combined_blocks(
  File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/pandas_block.py", line 516, in aggregate_combined_blocks
    next_row = next(iter)
  File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/heapq.py", line 373, in merge
    value = next()
  File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/table_block.py", line 194, in __next__
    row = outer._get_row(self._cur)
  File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/table_block.py", line 159, in _get_row
    base_row = self.slice(index, index + 1, copy=copy)
  File "/azureml-envs/azureml_afccb31d48bcd54bd3609c62423c284a/lib/python3.9/site-packages/ray/data/_internal/pandas_block.py", line 155, in slice
    view.reset_index(drop=True, inplace=True)
AttributeError: 'pyarrow.lib.Table' object has no attribute 'reset_index'

The event that is causing the crash is:

train_set = preprocessor.fit_transform(train_set)

Which follows these steps:

# Create a preprocessor
preprocessor = Chain(StandardScaler(columns=feature_set),
                    Concatenator(exclude=["SYNDROME"], dtype=np.float32)
)

# apply preprocessor
train_set = preprocessor.fit_transform(train_set)
valid_set = preprocessor.transform(valid_set)

My pyarrow and ray versions are:

pyarrow==12.0.1

and the nightly for py3.9
https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl

My data is generated by:

ray.data.read_parquet(.... )

and I am wondering if it thinks I have a pandas data set rather than pyarrow one. When we use
batch_format with map_batches, we get to tell it what kind of data we are dealing with, but I dont see any option for the preprocessor.

Edit:

I also tried several other different dependencies, including the recommended pyarrow==11.0.0 with the latest ray[air] and ran into the same issues.

I offloaded the feature normalization to Spark and can run things fine now; but the preprocessor scalers do not seem to be able to “scale” with big-ish data.