Does ray.data.read_json() support reading from HDFS?

Hi folks,

a few weeks ago, I tried to read json files from my HDFS database using the ray.data.read_json() API. But it was not successful. I checked the source code and it seems to me that it was indeed not supported.

Can you confirm my statement? If it is supported, do you have any example code so that I can try it again?

Thanks a lot!

Okay, I was able to read json file from HDFS. And below is my working code:

from pyarrow import fs
import ray 
from pprint import pprint
from pyarrow import json
import os 

CLASSPATH = os.popen('$HADOOP_HOME/bin/hadoop classpath --glob').read() 
ray.init(address='auto', runtime_env={"env_vars": {'CLASSPATH':CLASSPATH}})
pprint(ray.cluster_resources())
num_nodes = len(ray.nodes())

read_options = json.ReadOptions(block_size = 2**30)

path = '/redpajama/book_sample.json'
hdfs = fs.HadoopFileSystem.from_uri("hdfs://10.165.9.53:9000/?user=root")

ray_data = ray.data.read_json(paths=path, filesystem=hdfs, read_options=read_options)
print(ray_data)

However, when I changed the .json extention to .jsonl. I got the following error:

2023-07-22 01:15:56,549 INFO worker.py:1429 -- Connecting to existing Ray cluster at address: 10.165.9.53:6379...
2023-07-22 01:15:56,602 INFO worker.py:1610 -- Connected to Ray cluster. View the dashboard at 127.0.0.1:9999 
{'CPU': 384.0,
 'memory': 1120701115803.0,
 'node:10.165.9.164': 1.0,
 'node:10.165.9.23': 1.0,
 'node:10.165.9.52': 1.0,
 'node:10.165.9.53': 1.0,
 'node:__internal_head__': 1.0,
 'object_store_memory': 484586192485.0}
Traceback (most recent call last):
  File "/home/user/workspace/tools/redpajama_data_processing/dp_test.py", line 33, in <module>
    ray_data = ray.data.read_json(paths=path, filesystem=hdfs, read_options=read_options)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/read_api.py", line 1014, in read_json
    return read_datasource(
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/read_api.py", line 370, in read_datasource
    ) = ray.get(
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/worker.py", line 2491, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ValueError): ray::_get_read_tasks() (pid=17177, ip=10.165.9.53)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/read_api.py", line 2270, in _get_read_tasks
    reader = ds.create_reader(**kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 229, in create_reader
    return _FileBasedDatasourceReader(self, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py", line 402, in __init__
    raise ValueError(
ValueError: No input files found to read. Please double check that 'partition_filter' field is set properly.

Do you know how to define the partition_filter so it can also read jsonl files?

Hi @Faaany, sorry for the issue, https://github.com/ray-project/ray/pull/37637 should fix the partition_filter error. Until next Ray release, you can use followed to work around:

ray_data = ray.data.read_json(paths=path, filesystem=hdfs, partition_filter=None, read_options=read_options)

@chengsu Thanks for fixing the issue so quickly!

But I got another problem running my code using a large jsonl file (~75GB). The data can be found here. For a small jsonl file (~106MB), the code worked out fine.

Code:

from pyarrow import fs
import ray 
from pprint import pprint
from pyarrow import json
import os 

CLASSPATH = os.popen('$HADOOP_HOME/bin/hadoop classpath --glob').read() 
ray.init(address='auto', runtime_env={"env_vars": {'CLASSPATH':CLASSPATH}})
pprint(ray.cluster_resources())
num_nodes = len(ray.nodes())

read_options = json.ReadOptions(block_size = 10 << 20)
path = '/redpajama/stackexchange.jsonl'
hdfs = fs.HadoopFileSystem.from_uri("hdfs://10.165.9.53:9000/?user=root")

with hdfs.open_input_file(path) as f:
    table = json.read_json(f, read_options=read_options)

ray_data = ray.data.from_arrow(table)
print(ray_data)

ray_dataset = ray_data.repartition(360)
tokenized_data = ray_data.map_batches(preprocess_megatron, batch_format="numpy", batch_size=None)
tokenized_data.materialize()

Log:

MaterializedDataset(
   num_blocks=1,
   num_rows=29825086,
   schema={
      text: string,
      meta...: struct<language: string, url: string, timestamp: timestamp[s], source: string, question_score: string>
   }
)
2023-07-24 02:53:37,700 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(preprocess_megatron)]
2023-07-24 02:53:37,700 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-24 02:53:37,701 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
Traceback (most recent call last):                                                                                 
  File "/home/user/workspace/tools/redpajama_data_processing/dp_test.py", line 134, in <module>
    tokenized_data.materialize()
  File "/opt/conda/lib/python3.10/site-packages/ray/data/dataset.py", line 4316, in materialize
    copy._plan.execute(force_read=True)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 591, in execute
    blocks = execute_to_legacy_block_list(
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 116, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 349, in _bundles_to_block_list
    for ref_bundle in bundles:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 129, in get_next
    raise item
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 187, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 235, in _scheduling_loop_step
    process_completed_tasks(topology)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 333, in process_completed_tasks
    op.notify_work_completed(ref)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py", line 65, in notify_work_completed
    task.output = self._map_ref_to_ref_bundle(ref)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 357, in _map_ref_to_ref_bundle
    all_refs = list(ray.get(ref))
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/worker.py", line 2493, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowInvalid): ray::MapBatches(preprocess_megatron)() (pid=265432, ip=10.165.9.53)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
    for b_out in fn(iter(blocks), ctx):
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
    yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
    for batch in formatted_batch_iter:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
    for formatted_batch in batch_iter:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
    yield from batch_iter
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
    for batch in batch_iter:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
    formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
  File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
    return self.to_numpy()
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
    array = array.combine_chunks()
  File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
  File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays

Please note that my above example first reads the data using pyarrow and then convert the ray dataset, because I think this may help for debugging.

close this issue, since it was fixed. Moved to another thread for further discussion: pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays