How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have Dataset object created by actors that are using the pandas batch format. When I materialize it, I get an error about Ray failing to infer an Arrow type.
The code in which I am hitting this is proprietary and somewhat complex, so I can’t post it here. I haven’t created a minial minimal repro scenario yet, but it appears to be caused by a groupby/aggregate operation.
from operator import attrgetter
from ray.data import Dataset
from ray.data.aggregate import AggregateFn
def collate(dataset: Dataset) -> Dataset:
dataset = dataset.add_column("document_name", lambda f: f["document"].apply(attrgetter("name")))
dataset = dataset.groupby("document_name").aggregate(
AggregateFn(
init=lambda column: MyObject(),
accumulate_row=lambda a, r: a | r["my_object"],
merge=lambda a1, a2: a1 | a2,
name="my_object",
)
)
return dataset.drop_columns(["document_name"])
The dataset coming in to this function has two columns: document
and my_object
. I want to group the my_object
items by document. Ray won’t let me use the document
objects temselves as group key, so I temporarily add a document_name
column that contains a unique string returned by the name
property of document.
MyObject
defines an or
operator that combines two MyObject
instances into a single one.
When I run either take()
or to_pandas()
on a dataset that has had this aggregation, I get the following error.
File "<ipython-input-1-ac5b7c801a02>", line 1, in <module>
d.take()
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/dataset.py", line 2408, in take
for row in limited_ds.iter_rows():
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/iterator.py", line 245, in _wrapped_iterator
for batch in batch_iterable:
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/iterator.py", line 162, in _create_iterator
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/iterator/iterator_impl.py", line 33, in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/exceptions.py", line 86, in handle_trace
raise e.with_traceback(None) from SystemException()
ray.exceptions.RayTaskError(ArrowInvalid): ray::reduce() (pid=59917, ip=127.0.0.1)
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/planner/exchange/aggregate_task_spec.py", line 70, in reduce
return BlockAccessor.for_block(mapper_outputs[0]).aggregate_combined_blocks(
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 606, in aggregate_combined_blocks
blocks = TableBlockAccessor.normalize_block_types(blocks, "arrow")
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 296, in normalize_block_types
results = [BlockAccessor.for_block(block).to_arrow() for block in blocks]
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 296, in <listcomp>
results = [BlockAccessor.for_block(block).to_arrow() for block in blocks]
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/ray/data/_internal/pandas_block.py", line 266, in to_arrow
return pyarrow.table(self._table)
File "pyarrow/table.pxi", line 5147, in pyarrow.lib.table
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pandas/core/frame.py", line 1025, in __arrow_c_stream__
table = pa.Table.from_pandas(self, schema=requested_schema)
File "pyarrow/table.pxi", line 3869, in pyarrow.lib.Table.from_pandas
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 613, in dataframe_to_arrays
arrays = [convert_column(c, f)
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 613, in <listcomp>
arrays = [convert_column(c, f)
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 600, in convert_column
raise e
File "/Users/bill.mcneill/anaconda3/envs/my-project/lib/python3.10/site-packages/pyarrow/pandas_compat.py", line 594, in convert_column
result = pa.array(col, type=type_, from_pandas=True, safe=safe)
File "pyarrow/array.pxi", line 340, in pyarrow.lib.array
File "pyarrow/array.pxi", line 86, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: ('Could not convert MyObject with type MyObject: did not recognize Python value type when inferring an Arrow data type', 'Conversion failed for column my_object with type object')
It seems like Ray is running into trouble because it is confusing a pandas batch format with an arrow one. This is somewhere down in the Ray data code when it is building iterators over Blocks. I’ve stepped through but haven’t been able to figure out what is going on.
Am I making an obvious mistake? Can you give me any pointers on debugging this? I’ll try to create a minimal repro scenario, but I’ve already burned a week on this an am getting pressure to move on.
Thanks.