OOM of read_parquet for a pyspark created dataframe

1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.

2. Environment:

  • Ray version: 2.51.1
  • Python version: 3.11.11
  • OS: Linux
  • Cloud/Infrastructure:
  • Other libs/tools (if relevant):

3. What happened vs. what you expected:

I am currently using spark to preprocess data and save the result in a parquet dataframe. The preprocessed dataset consists in an array of 19 int64 elements. I then use Ray to read the parquet file of the preprocessed dataframe for training.

However, I noticed that when saving a parquet file using spark and reading it with Ray, the IO speed was very low compared to a write/read both realised with Ray.

The reason is that Spark saves the 19 elements as a list, without specifying the array length, therefore, Ray struggle to read it fast.

A custom solution was to force the schema in the read_parquet

import pyarrow as pa
from ray.data.extensions import ArrowTensorTypeV2

cat_tensor_type = ArrowTensorTypeV2(shape=(cate_dim,), dtype=pa.int32())
schema = pa.schema([
    pa.field("label", pa.float64()),
    pa.field(
        "categorical_indices", cat_tensor_type,
    ),
])

df = read_parquet(
        preprocessed_dataset_path,
        file_extensions=["parquet"],
        schema=schema,
        concurrency=1,
        override_num_blocks=20,
    )

However, this leads to an Out-of_memory when reading the parquet file (of relatively small size ~1.5Go).

The detailed error is:

ray.exceptions.OutOfMemoryError: Task was killed due to the node running low on memory.

Memory on the node (IP: 10.17.14.58, ID: 44e2c78b6eb4951edfb456d3cccab7db2cb3537ac06244ca5867edbc) where the lease (actor ID: NIL_IDlease ID: 0000000004000000ffffffffffffffffffffffffffffffffffffffffffffffff, name=_fetch_parquet_file_info, pid=1364, memory used=58.39GB) was running was 60.22GB / 63.38GB (0.950204), which exceeds the memory usage threshold of 0.95. Ray killed this worker (ID: 6aa9c378e47942acd181036bed99d54da8d5b40cd66e2ea5ba9142b2) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 10.17.14.58`. To see the logs of the worker, use `ray logs worker-6aa9c378e47942acd181036bed99d54da8d5b40cd66e2ea5ba9142b2*out -ip 10.17.14.58. Top 10 memory users:

PID MEM(GB) COMMAND

1364 58.39 ray::_fetch_parquet_file_info

Increasing the worker size did not helped…

Do you have any idea how to assign the correct type for read_parquet without making it load the whole parquet file ?

Thank you very much for your help

This is a high-severity issue: Ray OOMs when reading a Spark-generated Parquet file with a custom schema using ArrowTensorTypeV2, even though the file is small (~1.5GB). The OOM occurs because Ray tries to load the entire file into memory, likely due to how the schema is specified and how Ray/Arrow handle variable-length lists vs. fixed-shape tensors. This is a known pain point when reading Spark Parquet arrays as fixed-shape tensors in Ray, and increasing worker memory does not help because the underlying read logic is not memory-efficient in this case. There is no built-in way in Ray to force efficient reading of Spark-generated variable-length list columns as fixed-shape tensors without loading the whole file, and this limitation is not fully addressed in the current Ray APIs or documentation.

For more detail, would you like a step-by-step breakdown of why this happens and possible workarounds, including code examples and references to relevant Ray and Arrow issues?

Sources:

Hint: Mention @RunLLM in the post for followups.