Why does Ray Data execute 3 blocks first on MAC

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’m encountering a performance issue with Ray Data where it’s processing blocks in limited batches rather than fully utilizing available parallelism.

import ray

ray.init()

ds = ray.data.from_items([
    {"key": "value"}
    for x in range(10)
], override_num_blocks=10)

def process(x):
    import time
    time.sleep(5)
    return x

ds = ds.map(process, num_cpus=0.01)
len(ds.to_pandas())

When executing this code, Ray appears to process only 3 blocks in parallel first, and then processes the remaining blocks after those complete. This results in a total execution time of approximately 10 seconds, when ideally the workload should be more evenly distributed.(The 10 seconds breaks down as follows: each block contains two records, and processing one block takes 5s. Ray first processes 3 blocks concurrently which takes 5s, then processes the remaining blocks in parallel which takes another 5s.)

This reminds me of a similar issue in PySpark, where specifying column names but not providing type information in the schema causes PySpark to execute a subset of partitions first for schema inference before processing the remaining partitions. (eg, rdd.toDF([“key1”, “key2”], schema=None))

Is Ray Data doing some kind of schema inference that’s causing this sequential batch processing? How can I force Ray Data to process all blocks in parallel without this initial sampling phase?

Any insights or solutions would be greatly appreciated!


I tested both Windows and Linux, and it took 5 seconds. It seems that this issue only occurs on macOS (my chip is the M3 Pro).

The reason why it only happens for Mac is because by default Mac has only 2GB object store memory.
Thus backpressure kicks in more quickly.

2 Likes