How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hi, I am using Ray Dataset to concatenate outputs from another program that outputs tsv files.
This produces one file per sample and I transform these samples with pandas before saving them to parquet using Ray Dataset. Each file can possess a varying number of columns across 1 099 511 627 776 possibilities depending on the sample composition.
Afterwards, I use Ray Dataset to read all parquet files into a dataset using the ray.data.read_parquet_bulk method and supplying all the files.
Since my dataset is ~6 million samples, I splitted it to accelerate data processing on the cluster I am using. Each sub-dataset produced contains around 60 000 samples and around 25 000 + feature columns.
For most of these sub-datasets, execution goes well but for some (~10%) I get the following error that seems to be linked to pyarrow…
File "/usr/local/lib/python3.8/dist-packages/data/kmers_collection.py", line 224, in _batch_read_write
df.write_parquet(dir)
File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset.py", line 1999, in write_parquet
self.write_datasource(
File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset.py", line 2217, in write_datasource
blocks, metadata = zip(*self._plan.execute().get_blocks_with_metadata())
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/lazy_block_list.py", line 287, in get_blocks_with_metadata
blocks, metadata = self._get_blocks_with_metadata()
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/lazy_block_list.py", line 324, in _get_blocks_with_metadata
metadata = read_progress_bar.fetch_until_complete(list(unique_meta_refs))
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/progress_bar.py", line 75, in fetch_until_complete
for ref, result in zip(done, ray.get(done)):
File "/usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.8/dist-packages/ray/_private/worker.py", line 2275, in get
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(OSError): e[36mray::_execute_read_task()e[39m (pid=939497, ip=10.80.44.29)
File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/lazy_block_list.py", line 576, in _execute_read_task
block = task()
File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/datasource.py", line 202, in __call__
for block in result:
File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/file_based_datasource.py", line 427, in read_files
for data in read_stream(f, read_path, **reader_args):
File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/file_based_datasource.py", line 225, in _read_stream
yield self._read_file(f, path, **reader_args)
File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/parquet_base_datasource.py", line 28, in _read_file
return pq.read_table(f, use_threads=use_threads, **reader_args)
File "/usr/local/lib/python3.8/dist-packages/pyarrow/parquet/__init__.py", line 2737, in read_table
dataset = _ParquetDatasetV2(
File "/usr/local/lib/python3.8/dist-packages/pyarrow/parquet/__init__.py", line 2340, in __init__
[fragment], schema=schema or fragment.physical_schema,
File "pyarrow/_dataset.pyx", line 870, in pyarrow._dataset.Fragment.physical_schema.__get__
File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: Could not open Parquet input source '<Buffer>': Couldn't deserialize thrift: TProtocolException: Exceeded size limit
The previously executed operations are as follow:
import ray
from glob import glob
ray.init()
def batch_read_write(batch, dir):
df = ray.data.read_parquet_bulk(batch)
df.write_parquet(dir)
tmp_dir = '...' # Path to the directory containing the parquet files
Xy_file = '...' # Path to the directory containing the final dataset
pq_list = glob(os.path.join(tmp_dir, '*.parquet'))
nb_batch = 0
# Read/concatenate files with Ray by batches
while np.ceil(len(pq_list)/1000) > 1:
# Split the list of files into batches
batches_list = np.array_split(pq_list, np.ceil(len(pq_list)/1000))
# New tmp batch folder
batch_dir = os.path.join(tmp_dir, 'batch_{}'.format(nb_batch))
os.mkdir(batch_dir)
# Read, concatenate and write each batch
for batch in batches_list:
batch_read_write(list(batch), batch_dir)
# Redefine a new list of file if batch is not convenient
pq_list = glob(os.path.join(batch_dir, '*.parquet'))
nb_batch += 1
# Read/concatenate batches with Ray
df = ray.data.read_parquet_bulk(pq_list)
# Save dataset
df.write_parquet(Xy_file)
I am using a cluster of 64 CPU cores with 249G RAM.
My python 3.8.10 environment is located in a Singularity container.
This environment uses Ray 2.0.0 and pyarrow 8.0.0 as well as a lot of other python packages.
Also while reading on this forum and on the ray documentation for this problem, I have stumbled upon the ray.data.DatasetPipeline class and the ray.data.Dataset.window() method. Would using these a more efficient way of executing my operations?
Thanks in advance