[Dataset] Ray Dataset reading multiple parquet files with different columns crashes due to TProtocolException: Exceeded size limit

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi, I am using Ray Dataset to concatenate outputs from another program that outputs tsv files.
This produces one file per sample and I transform these samples with pandas before saving them to parquet using Ray Dataset. Each file can possess a varying number of columns across 1 099 511 627 776 possibilities depending on the sample composition.

Afterwards, I use Ray Dataset to read all parquet files into a dataset using the ray.data.read_parquet_bulk method and supplying all the files.

Since my dataset is ~6 million samples, I splitted it to accelerate data processing on the cluster I am using. Each sub-dataset produced contains around 60 000 samples and around 25 000 + feature columns.

For most of these sub-datasets, execution goes well but for some (~10%) I get the following error that seems to be linked to pyarrow…

  File "/usr/local/lib/python3.8/dist-packages/data/kmers_collection.py", line 224, in _batch_read_write
    df.write_parquet(dir)
  File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset.py", line 1999, in write_parquet
    self.write_datasource(
  File "/usr/local/lib/python3.8/dist-packages/ray/data/dataset.py", line 2217, in write_datasource
    blocks, metadata = zip(*self._plan.execute().get_blocks_with_metadata())
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/lazy_block_list.py", line 287, in get_blocks_with_metadata
    blocks, metadata = self._get_blocks_with_metadata()
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/lazy_block_list.py", line 324, in _get_blocks_with_metadata
    metadata = read_progress_bar.fetch_until_complete(list(unique_meta_refs))
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/progress_bar.py", line 75, in fetch_until_complete
    for ref, result in zip(done, ray.get(done)):
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/worker.py", line 2275, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(OSError): e[36mray::_execute_read_task()e[39m (pid=939497, ip=10.80.44.29)
  File "/usr/local/lib/python3.8/dist-packages/ray/data/_internal/lazy_block_list.py", line 576, in _execute_read_task
    block = task()
  File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/datasource.py", line 202, in __call__
    for block in result:
  File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/file_based_datasource.py", line 427, in read_files
    for data in read_stream(f, read_path, **reader_args):
  File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/file_based_datasource.py", line 225, in _read_stream
    yield self._read_file(f, path, **reader_args)
  File "/usr/local/lib/python3.8/dist-packages/ray/data/datasource/parquet_base_datasource.py", line 28, in _read_file
    return pq.read_table(f, use_threads=use_threads, **reader_args)
  File "/usr/local/lib/python3.8/dist-packages/pyarrow/parquet/__init__.py", line 2737, in read_table
    dataset = _ParquetDatasetV2(
  File "/usr/local/lib/python3.8/dist-packages/pyarrow/parquet/__init__.py", line 2340, in __init__
    [fragment], schema=schema or fragment.physical_schema,
  File "pyarrow/_dataset.pyx", line 870, in pyarrow._dataset.Fragment.physical_schema.__get__
  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
  File "pyarrow/error.pxi", line 115, in pyarrow.lib.check_status
OSError: Could not open Parquet input source '<Buffer>': Couldn't deserialize thrift: TProtocolException: Exceeded size limit

The previously executed operations are as follow:

import ray
from glob import glob

ray.init()

def batch_read_write(batch, dir):
    df = ray.data.read_parquet_bulk(batch)
    df.write_parquet(dir)


tmp_dir = '...' # Path to the directory containing the parquet files
Xy_file = '...' # Path to the directory containing the final dataset

pq_list = glob(os.path.join(tmp_dir, '*.parquet'))

nb_batch = 0
# Read/concatenate files with Ray by batches
while np.ceil(len(pq_list)/1000) > 1:
    # Split the list of files into batches
    batches_list = np.array_split(pq_list, np.ceil(len(pq_list)/1000))
    # New tmp batch folder
    batch_dir = os.path.join(tmp_dir, 'batch_{}'.format(nb_batch))
    os.mkdir(batch_dir)
    # Read, concatenate and write each batch
    for batch in batches_list:
        batch_read_write(list(batch), batch_dir)
    # Redefine a new list of file if batch is not convenient
    pq_list = glob(os.path.join(batch_dir, '*.parquet'))
    nb_batch += 1
# Read/concatenate batches with Ray
df = ray.data.read_parquet_bulk(pq_list)
# Save dataset
df.write_parquet(Xy_file)

I am using a cluster of 64 CPU cores with 249G RAM.
My python 3.8.10 environment is located in a Singularity container.
This environment uses Ray 2.0.0 and pyarrow 8.0.0 as well as a lot of other python packages.

Also while reading on this forum and on the ray documentation for this problem, I have stumbled upon the ray.data.DatasetPipeline class and the ray.data.Dataset.window() method. Would using these a more efficient way of executing my operations?

Thanks in advance

Hi @nicdemon, Ray 2.0 doesn’t support pyarrow 8.0.0. From what others experienced, using a newer version may have this TProtocolException. Could you try with a version that is >=6.0.1 and < 7.0.0?

For DatasetPipeline, in general if you have resource constraint and the amount of data is larger than memory available, it would be a good option. This however also depends on the nature of the workload (whether it can be pipelined). In your use case, do you need to read all files together in order to perform the operation? Or can it read e.g. 10% of files, perform operation on them, and then move to next 10% etc.? Only the later can benefit from using DatasetPipeline.

Hi @jianxiao
Thank you for this specification as I didn’t know ray supported only these versions of pyarrow and I saw in another post a link to pyarrow 9.0.0.
I will test to see it the error persists with pyarrow 7.0.0.

For DatasetPipeline, I need to ensure that all columns from all files are present before doing the data transformations so I think that it is necessary to read them all…

Upgrading to 9.0.0 is ongoing work and doesn’t exist in Ray 2.0. It’s estimated to be included in Ray 2.2, which you may check back in the future.
Btw, the 7.0.0 is also not supported (the range is 6.0.1 <= supported-version < 7.0.0). You may try e.g. 6.0.1.

1 Like

I have tried downgrading pyarrow to version 6.0.1 from verison 8.0.0 and it still gave me the same error… I will still keep this version as it is supported by Ray and may cause problems elsewhere.

However, I have tried reading the files individually with pyarrow and noticed that the files that can’t be read and make pyarrow throw this exeption are the ones with a large number of columns only. Also, while testing I noticed that this kind of operation must be resolved before feeding the data to Ray and is more of a pyarrow/pandas/numpy/data wrangling problem…

Ray only reads metadata from the first parquet file and assumes the columns are the same in all files. Maybe another solution would be to use a custom ray.data.datasource.FileMetadataProvider?

From what you said, it seems the schema size is a potentially culprit. Could you try reading those files directly with pyarrow? Something like pq.read_table(file_path). This way we can narrow down the source of the problem.

You’re right that Ray Datasets infers the schema from the first file and uses that schema for the remaining files.
The FileMetadataProvider is more relevant/useful if the issue is there are many files in the directory and it’s slow to expand and find all files under the directory, which seems not the case here.

The pq.read_table(file_path) from pyarrow is the function I have used to identify the files that were problematic.
I am running some tests to see if I can extract all the columns from the files before merging them and transforming each parquet files using pyarrow to contain all of these columns so Ray can concatenate them easily.

After inspecting all files read with a try/except statement, there seem to be only one file in my dataset that raises this error. I will try to verify with pyarrows developpers since this seems to be related to pyarrow reading of parquet file and not with Ray

Sounds good, thanks for digging into the underlying stack here. Please circle back if anything Ray can help or improve :slight_smile:

1 Like

After investigation of the pyarrow error and testing with multiple datasets as well as asking the pyarrow community for help, I have come to understand the error I was getting.

This is caused by a too large number of columns in the parquet files, making it too big in memory to deserialize. In the version 6.0.1 of pyarrow, there are no option for dealing with this. However, in version 9.0.0 (which hopefully will be supported by Ray soon), there are two options for adjusting the size of the thrift in pyarrow.parquet.read_table() : thrift_string_size_limit and thrift_container_size_limit.

In the meantime, I have adjusted my pipeline to extract less columns from my data and have started using the options described above with pyarrow 9.0.0 even if they are not supported by Ray yet (hoping this will not cause problems later on).

@nicdemon Thank you for looking into this!

Supporting Arrow 7+ (including Arrow 9) is currently in progress, and is expected to land for Ray 2.2.

In the meantime, I have adjusted my pipeline to extract less columns from my data and have started using the options described above with pyarrow 9.0.0 even if they are not supported by Ray yet (hoping this will not cause problems later on).

I should note that using pyarrow 7+ (including pyarrow 9.0.0) with Ray will most likely not work; there’s a serialization bug in Arrow 7+ that results in a massive inflation of the serialized payload when pickling Arrow data (potentially multiple orders of magnitude), so I would strongly encourage not using Arrow 7+ with Ray until the fix included in the linked PR lands.

1 Like

Thanks for the clarification, I hadn’t realized there could be such a big impact by using pyarrow 7+.
I’ll then wait for Ray 2.2 for using version 9.0.0, anyways it was more of a safety than a necessity since I reduced my number of columns in the original data.

I hadn’t realized there could be such a big impact by using pyarrow 7+.

Indeed, we made sure to properly enforce this upper-bound with a runtime check in the upcoming 2.1 release since this can be quite a surprise to users!

I’ll then wait for Ray 2.2 for using version 9.0.0, anyways it was more of a safety than a necessity since I reduced my number of columns in the original data.

Sounds good, thanks again for drilling down on this! :raised_hands: We’ll push on getting this in for 2.2 on our end.

1 Like

In case someone else gets a similar problem, I have found a workaround this problem by merging all columns to a single tensor column which seems to be easily handled by ray/pyarrow.

However, since this value column is a tensor, it is impossible to use some kind of mapping directly so I need to extract the columns from the tensor after aving read all the parquet files.

@nicdemon I wanted to mention that, if you’re interested in using Ray nightly to bypass the column merging and instead use the requisite Thrift limit tweaks in the Parquet reading APIs, we’ve added support for Arrow 7 through Arrow 10 as well as Arrow nightly in Ray master!

1 Like