Repartition kiilled because OOM?

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

I have a 100GB parquet dataset in google cloud storage. Currently, there are 500 partitions, and I’m trying to change it to 2000 partitions. I attempted to run the code below on a cluster of 5 nodes, each of which has 600GB of RAM.

import gcsfs
import ray.data as rd

gs_project = "my-project"
pq_paths = ["gs://path-1.parquet", ..., "gs://path-500.parquet"]
fs = gcsfs.GCSFileSystem(project=gs_project)
output_dir = "gs://output.parquet"
num_partitions = 2000
(
    rd.read_parquet(pq_paths, filesystem=fs, ray_remote_args={"max_retries": 10}).
    repartition(num_blocks=num_partitions, shuffle=False).
    write_parquet(output_dir, filesystem=fs)
)

When this runs, memory spikes sharply to ~100% on 2 of my nodes, then I just see “killed” in bash. I’m using ray 1.11.0. If it helps, this snippet worked with ray 1.8.0: Memory would spike, but plasma would fill up, and data would spill to disk.

Is there anything I can do to fix it?

Hi @hahdawg, thanks for posting!

That suggests that data/tasks are not getting properly load balanced across the cluster, which should happen automatically. A few clarifying questions and suggestions:

  1. What’s the memory utilization of the other 3 nodes when that spike happens?
  2. How large is the in-memory representation of the data?
  3. You’re saying that this exact same snippet worked on Ray 1.8, correct?
  4. Could you try the recent Ray 1.12 release and see if it still OOMs?