Map parquet columns causes decoding error with binary data

How severe does this issue affect your experience using Ray?

  • Low: It’s a minor annoyance.

Hi everyone,

I’m encountering an issue processing a Parquet file with a map<string, binary> type column using Ray Data. Here’s what I’ve noticed:

  1. Transformations like map() or filter() appear to convert the map column into a list of key-value list.
  2. This causes a decoding error with map<string, binary> columns, as they are converted into list<list<string>>, and binary data can’t be decoded as strings.

Here’s a minimal example:

import ray
import pyarrow as pa
import pyarrow.parquet as pq

ray.init()

data = [
    # {"item": {"key1": b'\xa3\x5f\x9c\x47\x2d\x8e\xb1\x62'}, "id": 1}, # Uncomment for decode error
    # {"item": {"key2": b'\xa3\x5f\x9c\x47\x2d\x8e\xb1\x62'}, "id": 2},
    {"item": {"key1": b'bytes'}, "id": 1},
    {"item": {"key2": b'bytes'}, "id": 4},
]

schema = pa.schema([
    pa.field("id", pa.int64()),
    pa.field("item", pa.map_(pa.string(), pa.binary()))
])

table = pa.Table.from_pylist(data, schema=schema)
pq.write_table(table, 'data.parquet')

dataset = ray.data.read_parquet('data.parquet')

print(dataset.schema())
print(dataset.filter(lambda row: row["id"] == 1).schema())

Output:

2024-12-04 12:18:26,157	INFO worker.py:1821 -- Started a local Ray instance.
[dataset]: Run `pip install tqdm` to enable progress reporting.
Column  Type
------  ----
id      int64
item    map<string, binary ('item')>
2024-12-04 12:18:27,183	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-12-04_12-18-25_191566_21934/logs/ray-data
2024-12-04 12:18:27,183	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Filter(<lambda>)] -> LimitOperator[limit=1]
Column  Type
------  ----
id      int64
item    list<item: list<item: string>>

If you uncomment the lines with improperly encoded binary strings (in my real case those are raw image bytes), you get a decode error like this:

2024-12-04 12:20:55,438	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-12-04_12-20-53_494124_24271/logs/ray-data
2024-12-04 12:20:55,438	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Filter(<lambda>)] -> LimitOperator[limit=1]
(Filter(<lambda>) pid=24307) Failed to convert column 'item' into pyarrow array due to: Error converting data to Arrow: [list([('key1', b'\xa3_\x9cG-\x8e\xb1b')])]; falling back to serialize as pickled python objects
(Filter(<lambda>) pid=24307) Traceback (most recent call last):
(Filter(<lambda>) pid=24307)   File "/Users/dmitrii.shmyrev/venvs/ray_issue/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 188, in _convert_to_pyarrow_native_array
(Filter(<lambda>) pid=24307)     return pa.array(column_values, type=dtype)
(Filter(<lambda>) pid=24307)            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Filter(<lambda>) pid=24307)   File "pyarrow/array.pxi", line 360, in pyarrow.lib.array
(Filter(<lambda>) pid=24307)   File "pyarrow/array.pxi", line 87, in pyarrow.lib._ndarray_to_array
(Filter(<lambda>) pid=24307)   File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
(Filter(<lambda>) pid=24307) pyarrow.lib.ArrowInvalid: Could not convert b'\xa3_\x9cG-\x8e\xb1b' with type bytes: was not a utf8 string
(Filter(<lambda>) pid=24307)
(Filter(<lambda>) pid=24307) The above exception was the direct cause of the following exception:
(Filter(<lambda>) pid=24307)
(Filter(<lambda>) pid=24307) Traceback (most recent call last):
(Filter(<lambda>) pid=24307)   File "/Users/dmitrii.shmyrev/venvs/ray_issue/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 118, in convert_to_pyarrow_array
(Filter(<lambda>) pid=24307)     return _convert_to_pyarrow_native_array(column_values, column_name)
(Filter(<lambda>) pid=24307)            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Filter(<lambda>) pid=24307)   File "/Users/dmitrii.shmyrev/venvs/ray_issue/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 190, in _convert_to_pyarrow_native_array
(Filter(<lambda>) pid=24307)     raise ArrowConversionError(str(column_values)) from e
(Filter(<lambda>) pid=24307) ray.air.util.tensor_extensions.arrow.ArrowConversionError: Error converting data to Arrow: [list([('key1', b'\xa3_\x9cG-\x8e\xb1b')])]

My questions:

  1. Is the conversion from map to a list of pairs expected? If so, is this documented anywhere?
  2. If this is intentional, should the decoding error be considered a bug?

Thanks for your help!

What version of Ray are you using?
Currently unable to reproduce.

I’m using ray==2.40.0