How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hey folks I’m running into an issue when applying a flat_map
on a ray dataset
that contains s3 paths. During the __call__
op, I am downloading the data from the s3 path (which contains a bunch of jsonl records) and processing jsonl records materialized as dictionaries where each record can have a slightly different columns and then I return a nested list of dictionaries. Say these are the records I’m processing in the flatmap.
record1 -> {"a": 1, "b": 2}
record2 -> {"a": 1, "b": 2, "c": 3}
record3 -> {"a": 1, "b": 2}
...
I am returning:
return [{"items": [record1, record2, record3]}]
to get around the fact that I might have slightly different schema of records. So instead of returning a list(record1, record2, ...)
I’m returning list({“items”: list(record1, record2, …)}) which I’m hoping pyarrow Table can be stored with a single column “items”. I am however getting the following error when I do ds.take(5)
pyarrow.lib.ArrowTypeError: struct fields don't match or are in the wrong order
Here is the full stacktrace in the log:
ray.exceptions.RayTaskError(ArrowTypeError): ray::FlatMap(ShardProcessor)() (pid=857231, ip=10.0.2.2, actor_id=e7d1e719651963e3e38b8bc506000000, repr=MapWorker(FlatMap(ShardProcessor)))
yield from _map_task(
File "/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 427, in _map_task
for b_out in map_transformer.apply_transform(iter(blocks), ctx):
File "/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 395, in __call__
yield output_buffer.next()
File "/lib/python3.10/site-packages/ray/data/_internal/output_buffer.py", line 73, in next
block_to_yield = self._buffer.build()
File "/lib/python3.10/site-packages/ray/data/_internal/delegating_block_builder.py", line 64, in build
return self._builder.build()
File "/lib/python3.10/site-packages/ray/data/_internal/table_block.py", line 133, in build
return self._concat_tables(tables)
File "/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 148, in _concat_tables
return transform_pyarrow.concat(tables)
File "/lib/python3.10/site-packages/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 263, in concat
table = pyarrow.concat_tables(blocks, promote_options="default")
File "pyarrow/table.pxi", line 5962, in pyarrow.lib.concat_tables
File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: struct fields don't match or are in the wrong order
Here is my processing function I use as part of a callable class in flat_map:
def __call__(self, row: Dict[str, Any]) -> List[Dict[str, List[Dict[str, Any]]]]:
import jsonlines
import copy
import gc
data: List[Dict[str, List[Dict[str, Any]]]] = []
chunks = []
path = row['path']
fileobj = self.download_fileobj(s3_url=path)
fileobj.seek(0)
with jsonlines.Reader(fileobj) as reader:
for obj in reader:
if len(chunks) >= self.chunk_size:
data.append({"items": chunks})
chunks = []
else:
chunks.append(obj)
if chunks:
data.append({"items": chunks})
gc.collect()
return data