@chengsu  Thanks for fixing the issue so quickly!
But I got another problem running my code using a large jsonl file (~75GB). The data can be found here. For a small jsonl file (~106MB), the code worked out fine.
Code:
from pyarrow import fs
import ray 
from pprint import pprint
from pyarrow import json
import os 
CLASSPATH = os.popen('$HADOOP_HOME/bin/hadoop classpath --glob').read() 
ray.init(address='auto', runtime_env={"env_vars": {'CLASSPATH':CLASSPATH}})
pprint(ray.cluster_resources())
num_nodes = len(ray.nodes())
read_options = json.ReadOptions(block_size = 10 << 20)
path = '/redpajama/stackexchange.jsonl'
hdfs = fs.HadoopFileSystem.from_uri("hdfs://10.165.9.53:9000/?user=root")
with hdfs.open_input_file(path) as f:
    table = json.read_json(f, read_options=read_options)
ray_data = ray.data.from_arrow(table)
print(ray_data)
ray_dataset = ray_data.repartition(360)
tokenized_data = ray_data.map_batches(preprocess_megatron, batch_format="numpy", batch_size=None)
tokenized_data.materialize()
Log:
MaterializedDataset(
   num_blocks=1,
   num_rows=29825086,
   schema={
      text: string,
      meta...: struct<language: string, url: string, timestamp: timestamp[s], source: string, question_score: string>
   }
)
2023-07-24 02:53:37,700 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(preprocess_megatron)]
2023-07-24 02:53:37,700 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-24 02:53:37,701 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):                                    
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432)     for b_out in fn(iter(blocks), ctx):                               
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432)     yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)       
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432)     for batch in formatted_batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432)     for formatted_batch in batch_iter:                                
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432)     yield from batch_iter                                             
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432)     for batch in batch_iter:                                          
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432)     formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432)     return self.to_numpy()                                            
(MapBatches(preprocess_megatron) pid=265432)   File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432)     array = array.combine_chunks()                                    
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays   
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432)   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status     
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays  
Traceback (most recent call last):                                                                                 
  File "/home/user/workspace/tools/redpajama_data_processing/dp_test.py", line 134, in <module>
    tokenized_data.materialize()
  File "/opt/conda/lib/python3.10/site-packages/ray/data/dataset.py", line 4316, in materialize
    copy._plan.execute(force_read=True)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 591, in execute
    blocks = execute_to_legacy_block_list(
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 116, in execute_to_legacy_block_list
    block_list = _bundles_to_block_list(bundles)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 349, in _bundles_to_block_list
    for ref_bundle in bundles:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
    return self.get_next()
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 129, in get_next
    raise item
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 187, in run
    while self._scheduling_loop_step(self._topology) and not self._shutdown:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 235, in _scheduling_loop_step
    process_completed_tasks(topology)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 333, in process_completed_tasks
    op.notify_work_completed(ref)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py", line 65, in notify_work_completed
    task.output = self._map_ref_to_ref_bundle(ref)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 357, in _map_ref_to_ref_bundle
    all_refs = list(ray.get(ref))
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/_private/worker.py", line 2493, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowInvalid): ray::MapBatches(preprocess_megatron)() (pid=265432, ip=10.165.9.53)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
    for b_out in fn(iter(blocks), ctx):
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
    yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
    for batch in formatted_batch_iter:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
    for formatted_batch in batch_iter:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
    yield from batch_iter
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
    for batch in batch_iter:
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
    formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
  File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
    return self.to_numpy()
  File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
    array = array.combine_chunks()
  File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
  File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
Please note that my above example first reads the data using pyarrow and then convert the ray dataset, because I think this may help for debugging.