How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I’m encountering a performance issue with Ray Data where it’s processing blocks in limited batches rather than fully utilizing available parallelism.
import ray
ray.init()
ds = ray.data.from_items([
{"key": "value"}
for x in range(10)
], override_num_blocks=10)
def process(x):
import time
time.sleep(5)
return x
ds = ds.map(process, num_cpus=0.01)
len(ds.to_pandas())
When executing this code, Ray appears to process only 3 blocks in parallel first, and then processes the remaining blocks after those complete. This results in a total execution time of approximately 10 seconds, when ideally the workload should be more evenly distributed.(The 10 seconds breaks down as follows: each block contains two records, and processing one block takes 5s. Ray first processes 3 blocks concurrently which takes 5s, then processes the remaining blocks in parallel which takes another 5s.)
This reminds me of a similar issue in PySpark, where specifying column names but not providing type information in the schema causes PySpark to execute a subset of partitions first for schema inference before processing the remaining partitions. (eg, rdd.toDF([“key1”, “key2”], schema=None))
Is Ray Data doing some kind of schema inference that’s causing this sequential batch processing? How can I force Ray Data to process all blocks in parallel without this initial sampling phase?
Any insights or solutions would be greatly appreciated!
I tested both Windows and Linux, and it took 5 seconds. It seems that this issue only occurs on macOS (my chip is the M3 Pro).