Memory usage of `.map()`

How severe does this issue affect your experience of using Ray?

  • Low: It annoys or frustrates me for a moment.

I’m trying to read the subsampled 27Gb version of FineWeb dataset from Huggingface with ray.data.read_parquet(). I’m running the job on the cluster with 2 workers 8CPUs/48Gb RAM each.

When I call ds.materialize.summary() without any processing right after the read, I’m getting the following info:

Stats: Operator 0 Read: 1 tasks executed, 386 blocks produced in 71.67s
...
* Peak heap memory usage (MiB): 451.03 min, 5667.27 max, 3179 mean
* Output num rows per block: 731 min, 41000 max, 38532 mean, 14873731 total
* Output size bytes per block: 2578778 min, 137994936 max, 132322201 mean, 51076369752 total
* Output rows per task: 14873731 min, 14873731 max, 14873731 mean, 1 tasks used

In summary. I have 386 blocks on average 0.1 Gb of size.

Once I’m adding the

def identity(row):
    return row

ray_ds_stream = ray_ds_stream.map(identity, concurrency=8)

I see the constant increase in worker’s memory usage until the OOM error. I’m trying to understand how that can happen and can’t find any answer in the documentation. Here’s my reasoning and why I’m getting confused:

  • From the documentation I understood that each worker that executes a map function operates on a block and this block is loaded into the memory of the node on which this worker is scheduled.
  • So if I have concurrency set for 8, even if all workers are scheduled on the same node, only 0.8Gb (8 * <size_of_the_block>) should be loaded
  • Even if I assume that map operation actually keeps the input block in Node’s memory while processing each row from this block and creates a copy of the block for the result, it still doesn’t add up. The memory usage should be in the ballpark of 1.6 Gb…

I would appreciate any help in getting me unconfused :sweat_smile:. Also if you have any links to where I can get more details on how ray.data handles/shuffles blocks, I would really appreciate if you share them