@chengsu Thanks for fixing the issue so quickly!
But I got another problem running my code using a large jsonl file (~75GB). The data can be found here. For a small jsonl file (~106MB), the code worked out fine.
Code:
from pyarrow import fs
import ray
from pprint import pprint
from pyarrow import json
import os
CLASSPATH = os.popen('$HADOOP_HOME/bin/hadoop classpath --glob').read()
ray.init(address='auto', runtime_env={"env_vars": {'CLASSPATH':CLASSPATH}})
pprint(ray.cluster_resources())
num_nodes = len(ray.nodes())
read_options = json.ReadOptions(block_size = 10 << 20)
path = '/redpajama/stackexchange.jsonl'
hdfs = fs.HadoopFileSystem.from_uri("hdfs://10.165.9.53:9000/?user=root")
with hdfs.open_input_file(path) as f:
table = json.read_json(f, read_options=read_options)
ray_data = ray.data.from_arrow(table)
print(ray_data)
ray_dataset = ray_data.repartition(360)
tokenized_data = ray_data.map_batches(preprocess_megatron, batch_format="numpy", batch_size=None)
tokenized_data.materialize()
Log:
MaterializedDataset(
num_blocks=1,
num_rows=29825086,
schema={
text: string,
meta...: struct<language: string, url: string, timestamp: timestamp[s], source: string, question_score: string>
}
)
2023-07-24 02:53:37,700 INFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(preprocess_megatron)]
2023-07-24 02:53:37,700 INFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-07-24 02:53:37,701 INFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432) for b_out in fn(iter(blocks), ctx):
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432) yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432) for batch in formatted_batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432) for formatted_batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432) yield from batch_iter
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432) for batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432) formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432) return self.to_numpy()
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432) array = array.combine_chunks()
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432) for b_out in fn(iter(blocks), ctx):
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432) yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432) for batch in formatted_batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432) for formatted_batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432) yield from batch_iter
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432) for batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432) formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432) return self.to_numpy()
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432) array = array.combine_chunks()
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432) for b_out in fn(iter(blocks), ctx):
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432) yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432) for batch in formatted_batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432) for formatted_batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432) yield from batch_iter
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432) for batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432) formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432) return self.to_numpy()
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432) array = array.combine_chunks()
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
(MapBatches(preprocess_megatron) pid=265432) Task failed with retryable exception: TaskID(c26a4eb1b95b821fffffffffffffffffffffffff19000000).
(MapBatches(preprocess_megatron) pid=265432) Traceback (most recent call last):
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 1191, in ray._raylet.execute_dynamic_generator_and_store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "python/ray/_raylet.pyx", line 3680, in ray._raylet.CoreWorker.store_task_outputs
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
(MapBatches(preprocess_megatron) pid=265432) for b_out in fn(iter(blocks), ctx):
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
(MapBatches(preprocess_megatron) pid=265432) yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
(MapBatches(preprocess_megatron) pid=265432) for batch in formatted_batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
(MapBatches(preprocess_megatron) pid=265432) for formatted_batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
(MapBatches(preprocess_megatron) pid=265432) yield from batch_iter
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
(MapBatches(preprocess_megatron) pid=265432) for batch in batch_iter:
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
(MapBatches(preprocess_megatron) pid=265432) formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
(MapBatches(preprocess_megatron) pid=265432) return self.to_numpy()
(MapBatches(preprocess_megatron) pid=265432) File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
(MapBatches(preprocess_megatron) pid=265432) array = array.combine_chunks()
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
(MapBatches(preprocess_megatron) pid=265432) File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
(MapBatches(preprocess_megatron) pid=265432) pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
Traceback (most recent call last):
File "/home/user/workspace/tools/redpajama_data_processing/dp_test.py", line 134, in <module>
tokenized_data.materialize()
File "/opt/conda/lib/python3.10/site-packages/ray/data/dataset.py", line 4316, in materialize
copy._plan.execute(force_read=True)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/plan.py", line 591, in execute
blocks = execute_to_legacy_block_list(
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 116, in execute_to_legacy_block_list
block_list = _bundles_to_block_list(bundles)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/legacy_compat.py", line 349, in _bundles_to_block_list
for ref_bundle in bundles:
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/interfaces/executor.py", line 37, in __next__
return self.get_next()
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 129, in get_next
raise item
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 187, in run
while self._scheduling_loop_step(self._topology) and not self._shutdown:
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor.py", line 235, in _scheduling_loop_step
process_completed_tasks(topology)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/streaming_executor_state.py", line 333, in process_completed_tasks
op.notify_work_completed(ref)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/task_pool_map_operator.py", line 65, in notify_work_completed
task.output = self._map_ref_to_ref_bundle(ref)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 357, in _map_ref_to_ref_bundle
all_refs = list(ray.get(ref))
File "/opt/conda/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
return fn(*args, **kwargs)
File "/opt/conda/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/opt/conda/lib/python3.10/site-packages/ray/_private/worker.py", line 2493, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(ArrowInvalid): ray::MapBatches(preprocess_megatron)() (pid=265432, ip=10.165.9.53)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 415, in _map_task
for b_out in fn(iter(blocks), ctx):
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 76, in do_map
yield from transform_fn(blocks, ctx, *fn_args, **fn_kwargs)
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/planner/map_batches.py", line 117, in fn
for batch in formatted_batch_iter:
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 156, in batch_blocks
for formatted_batch in batch_iter:
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/block_batching.py", line 152, in _iterator_fn
yield from batch_iter
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 210, in extract_data_from_batch
for batch in batch_iter:
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py", line 160, in format_batches
formatted_batch = BlockAccessor.for_block(batch.data).to_batch_format(
File "/opt/conda/lib/python3.10/site-packages/ray/data/block.py", line 337, in to_batch_format
return self.to_numpy()
File "/opt/conda/lib/python3.10/site-packages/ray/data/_internal/arrow_block.py", line 266, in to_numpy
array = array.combine_chunks()
File "pyarrow/table.pxi", line 731, in pyarrow.lib.ChunkedArray.combine_chunks
File "pyarrow/array.pxi", line 3321, in pyarrow.lib.concat_arrays
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: offset overflow while concatenating arrays
Please note that my above example first reads the data using pyarrow and then convert the ray dataset, because I think this may help for debugging.