PyArrow Error when processing records with missing columns with flat_map

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hey folks I’m running into an issue when applying a flat_map on a ray dataset that contains s3 paths. During the __call__ op, I am downloading the data from the s3 path (which contains a bunch of jsonl records) and processing jsonl records materialized as dictionaries where each record can have a slightly different columns and then I return a nested list of dictionaries. Say these are the records I’m processing in the flatmap.

record1 -> {"a": 1, "b": 2}
record2 -> {"a": 1, "b": 2, "c": 3}
record3 -> {"a": 1, "b": 2}
...

I am returning:

return [{"items": [record1, record2, record3]}]

to get around the fact that I might have slightly different schema of records. So instead of returning a list(record1, record2, ...)
I’m returning list({“items”: list(record1, record2, …)}) which I’m hoping pyarrow Table can be stored with a single column “items”. I am however getting the following error when I do ds.take(5)

pyarrow.lib.ArrowTypeError: struct fields don't match or are in the wrong order

Here is the full stacktrace in the log:

ray.exceptions.RayTaskError(ArrowTypeError): ray::FlatMap(ShardProcessor)() (pid=857231, ip=10.0.2.2, actor_id=e7d1e719651963e3e38b8bc506000000, repr=MapWorker(FlatMap(ShardProcessor)))
    yield from _map_task(
  File "/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 427, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 395, in __call__
    yield output_buffer.next()
  File "/lib/python3.10/site-packages/ray/data/_internal/output_buffer.py", line 73, in next
    block_to_yield = self._buffer.build()
  File "/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 64, in build
    return self._builder.build()
  File "/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 133, in build
    return self._concat_tables(tables)
  File "/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 148, in _concat_tables
    return transform_pyarrow.concat(tables)
  File "/lib/python3.10/site-packages/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 263, in concat
    table = pyarrow.concat_tables(blocks, promote_options="default")
  File "pyarrow/table.pxi", line 5962, in pyarrow.lib.concat_tables
  File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: struct fields don't match or are in the wrong order

Here is my processing function I use as part of a callable class in flat_map:

    def __call__(self, row: Dict[str, Any]) -> List[Dict[str, List[Dict[str, Any]]]]:
        import jsonlines
        import copy
        import gc

        data: List[Dict[str, List[Dict[str, Any]]]] = []
        chunks = []

        path = row['path']
        fileobj = self.download_fileobj(s3_url=path)
        fileobj.seek(0)
        with jsonlines.Reader(fileobj) as reader:
            for obj in reader:
                if len(chunks) >= self.chunk_size:
                    data.append({"items": chunks})
                    chunks = []
                else:
                    chunks.append(obj)

        if chunks:
            data.append({"items": chunks})

        gc.collect()
        return data

@RayAdmin anyone has some time to spare on this?

Can you provide a minimal repro script?