Dataset in Pandas Returns Arrow Argument When Materializing

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I have Dataset object created by actors that are using the pandas batch format. When I materialize it, I get an error about Ray failing to infer an Arrow type.

The code in which I am hitting this is proprietary and somewhat complex, so I can’t post it here. I haven’t created a minial minimal repro scenario yet, but it appears to be caused by a groupby/aggregate operation.

from operator import attrgetter

from ray.data import Dataset
from ray.data.aggregate import AggregateFn


def collate(dataset: Dataset) -> Dataset:
    dataset = dataset.add_column("document_name", lambda f: f["document"].apply(attrgetter("name")))
    dataset = dataset.groupby("document_name").aggregate(
        AggregateFn(
            init=lambda column: MyObject(),
            accumulate_row=lambda a, r: a | r["my_object"],
            merge=lambda a1, a2: a1 | a2,
            name="my_object",
        )
    )
    return dataset.drop_columns(["document_name"])

The dataset coming in to this function has two columns: document and my_object. I want to group the my_object items by document. Ray won’t let me use the document objects temselves as group key, so I temporarily add a document_name column that contains a unique string returned by the name property of document.

MyObject defines an or operator that combines two MyObject instances into a single one.

When I run either take() or to_pandas() on a dataset that has had this aggregation, I get the following error.

  File "<ipython-input-1-ac5b7c801a02>", line 1, in <module>
    d.take()
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/dataset.py", line 2408, in take
    for row in limited_ds.iter_rows():
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/iterator.py", line 245, in _wrapped_iterator
    for batch in batch_iterable:
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/iterator.py", line 162, in _create_iterator
    block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
    block_iterator, stats, executor = ds._plan.execute_to_iterator()
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/exceptions.py", line 86, in handle_trace
    raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ArrowInvalid): ray::reduce() (pid=59917, ip=127.0.0.1)
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py", line 70, in reduce
    return BlockAccessor.for_block(mapper_outputs[0]).aggregate_combined_blocks(
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 606, in aggregate_combined_blocks
    blocks = TableBlockAccessor.normalize_block_types(blocks, "arrow")
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 296, in normalize_block_types
    results = [BlockAccessor.for_block(block).to_arrow() for block in blocks]
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 296, in <listcomp>
    results = [BlockAccessor.for_block(block).to_arrow() for block in blocks]
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 266, in to_arrow
    return pyarrow.table(self._table)
  File "pyarrow/table.pxi", line 5147, in pyarrow.lib.table
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pandas/core/frame.py", line 1025, in __arrow_c_stream__
    table = pa.Table.from_pandas(self, schema=requested_schema)
  File "pyarrow/table.pxi", line 3869, in pyarrow.lib.Table.from_pandas
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 613, in dataframe_to_arrays
    arrays = [convert_column(c, f)
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 613, in <listcomp>
    arrays = [convert_column(c, f)
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 600, in convert_column
    raise e
  File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 594, in convert_column
    result = pa.array(col, type=type_, from_pandas=True, safe=safe)
  File "pyarrow/array.pxi", line 340, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 86, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: ('Could not convert MyObject with type MyObject: did not recognize Python value type when inferring an Arrow data type', 'Conversion failed for column my_object with type object')

It seems like Ray is running into trouble because it is confusing a pandas batch format with an arrow one. This is somewhere down in the Ray data code when it is building iterators over Blocks. I’ve stepped through but haven’t been able to figure out what is going on.

Am I making an obvious mistake? Can you give me any pointers on debugging this? I’ll try to create a minimal repro scenario, but I’ve already burned a week on this an am getting pressure to move on.

Thanks.