1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.
2. Environment:
- Ray version: 2.51.1
- Python version: 3.11.11
- OS: Linux
- Cloud/Infrastructure:
- Other libs/tools (if relevant):
3. What happened vs. what you expected:
I am currently using spark to preprocess data and save the result in a parquet dataframe. The preprocessed dataset consists in an array of 19 int64 elements. I then use Ray to read the parquet file of the preprocessed dataframe for training.
However, I noticed that when saving a parquet file using spark and reading it with Ray, the IO speed was very low compared to a write/read both realised with Ray.
The reason is that Spark saves the 19 elements as a list, without specifying the array length, therefore, Ray struggle to read it fast.
A custom solution was to force the schema in the read_parquet
import pyarrow as pa
from ray.data.extensions import ArrowTensorTypeV2
cat_tensor_type = ArrowTensorTypeV2(shape=(cate_dim,), dtype=pa.int32())
schema = pa.schema([
pa.field("label", pa.float64()),
pa.field(
"categorical_indices", cat_tensor_type,
),
])
df = read_parquet(
preprocessed_dataset_path,
file_extensions=["parquet"],
schema=schema,
concurrency=1,
override_num_blocks=20,
)
However, this leads to an Out-of_memory when reading the parquet file (of relatively small size ~1.5Go).
The detailed error is:
ray.exceptions.OutOfMemoryError: Task was killed due to the node running low on memory.
Memory on the node (IP: 10.17.14.58, ID: 44e2c78b6eb4951edfb456d3cccab7db2cb3537ac06244ca5867edbc) where the lease (actor ID: NIL_IDlease ID: 0000000004000000ffffffffffffffffffffffffffffffffffffffffffffffff, name=_fetch_parquet_file_info, pid=1364, memory used=58.39GB) was running was 60.22GB / 63.38GB (0.950204), which exceeds the memory usage threshold of 0.95. Ray killed this worker (ID: 6aa9c378e47942acd181036bed99d54da8d5b40cd66e2ea5ba9142b2) because it was the most recently scheduled task; to see more information about memory usage on this node, use `ray logs raylet.out -ip 10.17.14.58`. To see the logs of the worker, use `ray logs worker-6aa9c378e47942acd181036bed99d54da8d5b40cd66e2ea5ba9142b2*out -ip 10.17.14.58. Top 10 memory users:
PID MEM(GB) COMMAND
1364 58.39 ray::_fetch_parquet_file_info
Increasing the worker size did not helped…
Do you have any idea how to assign the correct type for read_parquet without making it load the whole parquet file ?
Thank you very much for your help