pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

Hi,

when using ray.data.read_json() to read a large jsonl file (75GB, data here) with the following code:

Code:

from pyarrow import json
import ray 

ray.init(address='auto')

local_path = '/home/user/tmp/stackexchange/stackexchange.jsonl'
read_options = json.ReadOptions(block_size = 10 << 20)


ray_data = ray.data.read_json(paths=local_path, read_options=read_options)

tokenized_data = ray_data.map_batches(preprocess_megatron, batch_format="numpy", batch_size=10000)
tokenized_data.materialize()

I got a the following error:

2023-07-24 15:04:52,979 INFO worker.py:1431 -- Connecting to existing Ray cluster at address: 10.165.9.53:6379...
2023-07-24 15:04:52,985 INFO worker.py:1612 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:9999 
{'CPU': 384.0,
 'memory': 1109527674677.0,
 'node:10.165.9.164': 1.0,
 'node:10.165.9.23': 1.0,
 'node:10.165.9.52': 1.0,
 'node:10.165.9.53': 1.0,
 'node:__internal_head__': 1.0,
 'object_store_memory': 479797574859.0}
2023-07-24 15:04:56,040 INFO read_api.py:400 -- To satisfy the requested parallelism of 768, each read task output is split into 6 smaller blocks.
2023-07-24 15:04:56,047 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON->SplitBlocks(894)] -> TaskPoolMapOperator[MapBatches(preprocess_megatron)]
2023-07-24 15:04:56,047 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-24 15:04:56,048 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164) Task failed with retryable exception: TaskID(e484295177ff0ad8ffffffffffffffffffffffff3d000000).
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164) Traceback (most recent call last):                   
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     for b_out in fn(iter(blocks), ctx):              
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_read_op.py", line 67, in do_read
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     yield from read_task()                           
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 218, in __call__
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     yield from self._do_additional_splits(block)     
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/datasource/datasource.py", line 234, in _do_additional_splits
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     yield block.slice(offset, offset + size, copy=True)
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 215, in slice
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     view = _copy_table(view)                         
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 653, in _copy_table
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     return transform_pyarrow.combine_chunks(table)   
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_ops/transform_pyarrow.py", line 284, in combine_chunks
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)     arr = col.combine_chunks()                       
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(ReadJSON->SplitBlocks(894) pid=334910, ip=10.165.9.164) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

But when I tries to first read the data using pyarrow and then do the repartition. It worked.

read_options = json.ReadOptions(block_size = 10 << 20)
path = '/redpajama/stackexchange.jsonl'

table = json.read_json(path, read_options=read_options)

ray_data = ray.data.from_arrow(table)
print(ray_data)

ray_dataset = ray_data.repartition(360)
tokenized_data = ray_data.map_batches(preprocess_megatron, batch_format="numpy", batch_size=10000)
tokenized_data.materialize()

Any idea or suggestion? Thanks!