How severe does this issue affect your experience using Ray?
- Low: It’s a minor annoyance.
Hi everyone,
I’m encountering an issue processing a Parquet file with a map<string, binary>
type column using Ray Data. Here’s what I’ve noticed:
- Transformations like
map()
orfilter()
appear to convert themap
column into a list of key-value list. - This causes a decoding error with
map<string, binary>
columns, as they are converted intolist<list<string>>
, and binary data can’t be decoded as strings.
Here’s a minimal example:
import ray
import pyarrow as pa
import pyarrow.parquet as pq
ray.init()
data = [
# {"item": {"key1": b'\xa3\x5f\x9c\x47\x2d\x8e\xb1\x62'}, "id": 1}, # Uncomment for decode error
# {"item": {"key2": b'\xa3\x5f\x9c\x47\x2d\x8e\xb1\x62'}, "id": 2},
{"item": {"key1": b'bytes'}, "id": 1},
{"item": {"key2": b'bytes'}, "id": 4},
]
schema = pa.schema([
pa.field("id", pa.int64()),
pa.field("item", pa.map_(pa.string(), pa.binary()))
])
table = pa.Table.from_pylist(data, schema=schema)
pq.write_table(table, 'data.parquet')
dataset = ray.data.read_parquet('data.parquet')
print(dataset.schema())
print(dataset.filter(lambda row: row["id"] == 1).schema())
Output:
2024-12-04 12:18:26,157 INFO worker.py:1821 -- Started a local Ray instance.
[dataset]: Run `pip install tqdm` to enable progress reporting.
Column Type
------ ----
id int64
item map<string, binary ('item')>
2024-12-04 12:18:27,183 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-12-04_12-18-25_191566_21934/logs/ray-data
2024-12-04 12:18:27,183 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Filter(<lambda>)] -> LimitOperator[limit=1]
Column Type
------ ----
id int64
item list<item: list<item: string>>
If you uncomment the lines with improperly encoded binary strings (in my real case those are raw image bytes), you get a decode error like this:
2024-12-04 12:20:55,438 INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-12-04_12-20-53_494124_24271/logs/ray-data
2024-12-04 12:20:55,438 INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Filter(<lambda>)] -> LimitOperator[limit=1]
(Filter(<lambda>) pid=24307) Failed to convert column 'item' into pyarrow array due to: Error converting data to Arrow: [list([('key1', b'\xa3_\x9cG-\x8e\xb1b')])]; falling back to serialize as pickled python objects
(Filter(<lambda>) pid=24307) Traceback (most recent call last):
(Filter(<lambda>) pid=24307) File "/Users/dmitrii.shmyrev/venvs/ray_issue/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 188, in _convert_to_pyarrow_native_array
(Filter(<lambda>) pid=24307) return pa.array(column_values, type=dtype)
(Filter(<lambda>) pid=24307) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Filter(<lambda>) pid=24307) File "pyarrow/array.pxi", line 360, in pyarrow.lib.array
(Filter(<lambda>) pid=24307) File "pyarrow/array.pxi", line 87, in pyarrow.lib._ndarray_to_array
(Filter(<lambda>) pid=24307) File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
(Filter(<lambda>) pid=24307) pyarrow.lib.ArrowInvalid: Could not convert b'\xa3_\x9cG-\x8e\xb1b' with type bytes: was not a utf8 string
(Filter(<lambda>) pid=24307)
(Filter(<lambda>) pid=24307) The above exception was the direct cause of the following exception:
(Filter(<lambda>) pid=24307)
(Filter(<lambda>) pid=24307) Traceback (most recent call last):
(Filter(<lambda>) pid=24307) File "/Users/dmitrii.shmyrev/venvs/ray_issue/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 118, in convert_to_pyarrow_array
(Filter(<lambda>) pid=24307) return _convert_to_pyarrow_native_array(column_values, column_name)
(Filter(<lambda>) pid=24307) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(Filter(<lambda>) pid=24307) File "/Users/dmitrii.shmyrev/venvs/ray_issue/lib/python3.11/site-packages/ray/air/util/tensor_extensions/arrow.py", line 190, in _convert_to_pyarrow_native_array
(Filter(<lambda>) pid=24307) raise ArrowConversionError(str(column_values)) from e
(Filter(<lambda>) pid=24307) ray.air.util.tensor_extensions.arrow.ArrowConversionError: Error converting data to Arrow: [list([('key1', b'\xa3_\x9cG-\x8e\xb1b')])]
My questions:
- Is the conversion from
map
to a list of pairs expected? If so, is this documented anywhere? - If this is intentional, should the decoding error be considered a bug?
Thanks for your help!