[Data] map_batches is not respecting concurrency from the beginning

Hi!

I observed that map_batches uses only 3/8 CPUs at the beginning, even though the concurrency is set to 8. I would ignore it if my UDF is quick, but for many applications it isn’t and resources are being wasted.

I encountered it when working with PyCaret integration via Fugue, where the latter is using map_batches to process all trials. As each trial, on a larger dataset, might take even 30 minutes, a lot of resources are idling for the first 3 trials to finish, which doesn’t make sense.

Is there an option to force Ray Data to start processing dataset with full concurrency from the beginning?

Here’s a script to reproduce the problem:

import time

import pandas as pd
import ray

ray.init(num_cpus=8)
ds = ray.data.from_items([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16])

# This two don't help:
# ds = ds.repartition(16)
# ds = ds.materialize()

def udf(df: pd.DataFrame) -> pd.DataFrame:
    index = int(df["item"][0])
    print("Processing:", index)
    time.sleep(30)
    print("Finished processing:", index)
    return df


ds.map_batches(
    udf,
    batch_size=1,
    batch_format="pandas",
    concurrency=8,
    num_cpus=1,
).take_all()

and the output (note it took 90 seconds, instead of 60):

2024-10-24 10:29:32,189	INFO worker.py:1807 -- Started a local Ray instance. View the dashboard at http://127.0.0.1:8267 
2024-10-24 10:29:33,844	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-24_10-29-30_642594_16479/logs/ray-data
2024-10-24 10:29:33,844	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(udf)]
Running 0: 0.00 row [00:00, ? row/s]
                                    
(MapBatches(udf) pid=16490) Processing: 1
(MapBatches(udf) pid=16493) Processing: 2
(MapBatches(udf) pid=16494) Processing: 3
Running 0: 0.00 row [00:00, ? row/s]
                                    
Running 0: 0.00 row [00:00, ? row/s]
                                    
Running 0: 0.00 row [00:00, ? row/s]
- MapBatches(udf) 1: 0.00 row [00:00, ? row/s]
- MapBatches(udf): Tasks: 3 [backpressured]; Queued blocks: 13; Resources: 3.0 CPU, 768.0MB object store: : 0.00 row [00:01, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:01, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:02, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:03, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:04, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:05, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:06, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:07, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:08, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:09, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:10, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:11, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:12, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:13, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:14, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:15, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:16, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:17, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:18, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:19, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:20, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:21, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:23, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:24, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:25, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:26, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:27, ? row/s]
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:28, ? row/s]
                                                                                                               
(MapBatches(udf) pid=16490) Finished processing: 1
Running Dataset. Active & requested resources: 3/8 CPU, 768.0MB/1.0GB object store: : 0.00 row [00:30, ? row/s]
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store: : 0.00 row [00:30, ? row/s]   
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:   0%|          | 0.00/24.0 [00:30<?, ? row/s]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row](MapBatches(udf) pid=16490) Processing: 6
(MapBatches(udf) pid=16493) Finished processing: 2
(MapBatches(udf) pid=16493) Processing: 4
(MapBatches(udf) pid=16494) Finished processing: 3
(MapBatches(udf) pid=16494) Processing: 5
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [00:30<03:32, 10.1s/ row](MapBatches(udf) pid=16495) Processing: 9
(MapBatches(udf) pid=16489) Processing: 7
(MapBatches(udf) pid=16491) Processing: 8
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:31<03:32, 10.1s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:31<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [00:31<03:32, 10.1s/ row](MapBatches(udf) pid=16507) Processing: 10
(MapBatches(udf) pid=16506) Processing: 11

Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:31<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:32<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:33<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:34<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:35<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:36<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:37<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:38<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:39<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:40<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:41<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:42<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:43<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:44<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:46<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:47<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:48<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:49<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:50<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:51<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:52<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:53<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:54<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:55<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:56<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:57<03:32, 10.1s/ row]
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [00:58<03:32, 10.1s/ row]
                                                                                                                                      
(MapBatches(udf) pid=16490) Finished processing: 6
(MapBatches(udf) pid=16493) Finished processing: 4
(MapBatches(udf) pid=16494) Finished processing: 5
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row](MapBatches(udf) pid=16490) Processing: 14
(MapBatches(udf) pid=16493) Processing: 13
(MapBatches(udf) pid=16494) Processing: 12
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row](MapBatches(udf) pid=16491) Finished processing: 8
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
                                                                                                                                      
Running Dataset. Active & requested resources: 8/8 CPU, 1.1KB/1.0GB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]
- MapBatches(udf): Tasks: 8 [backpressured]; Queued blocks: 5; Resources: 8.0 CPU, 1.1KB object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row](MapBatches(udf) pid=16495) Finished processing: 9
(MapBatches(udf) pid=16495) Processing: 16
(MapBatches(udf) pid=16489) Finished processing: 7
(MapBatches(udf) pid=16489) Processing: 15

- MapBatches(udf): Tasks: 7; Queued blocks: 0; Resources: 7.0 CPU, 980.0B object store:  12%|█▎        | 3.00/24.0 [01:00<03:32, 10.1s/ row]               
- MapBatches(udf): Tasks: 7; Queued blocks: 0; Resources: 7.0 CPU, 980.0B object store:  19%|█▉        | 3.00/16.0 [01:00<02:11, 10.1s/ row]
- MapBatches(udf): Tasks: 7; Queued blocks: 0; Resources: 7.0 CPU, 980.0B object store:  56%|█████▋    | 9.00/16.0 [01:00<00:44, 6.37s/ row]
                                                                                                                                       
(MapBatches(udf) pid=16507) Finished processing: 10
(MapBatches(udf) pid=16506) Finished processing: 11
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  56%|█████▋    | 9.00/16.0 [01:01<00:44, 6.37s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 7/8 CPU, 980.0B/1.0GB object store:  56%|█████▋    | 9.00/16.0 [01:01<00:44, 6.37s/ row]
- MapBatches(udf): Tasks: 7; Queued blocks: 0; Resources: 7.0 CPU, 980.0B object store:  56%|█████▋    | 9.00/16.0 [01:01<00:44, 6.37s/ row]
- MapBatches(udf): Tasks: 5; Queued blocks: 0; Resources: 5.0 CPU, 700.0B object store:  56%|█████▋    | 9.00/16.0 [01:01<00:44, 6.37s/ row]
- MapBatches(udf): Tasks: 5; Queued blocks: 0; Resources: 5.0 CPU, 700.0B object store:  69%|██████▉   | 11.0/16.0 [01:01<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:01<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:02<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:03<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:04<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:05<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:07<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:08<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:09<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:10<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:11<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:12<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:13<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:14<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:15<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:16<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:17<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:18<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:19<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:20<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:21<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:22<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:23<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:24<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:25<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:26<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:28<00:24, 4.85s/ row]
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:29<00:24, 4.85s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:30<00:24, 4.85s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:30<00:24, 4.85s/ row]
                                                                                                                                       
Running Dataset. Active & requested resources: 5/8 CPU, 700.0B/1.0GB object store:  69%|██████▉   | 11.0/16.0 [01:30<00:24, 4.85s/ row]
- MapBatches(udf): Tasks: 5; Queued blocks: 0; Resources: 5.0 CPU, 700.0B object store:  69%|██████▉   | 11.0/16.0 [01:30<00:24, 4.85s/ row](MapBatches(udf) pid=16490) Finished processing: 14
(MapBatches(udf) pid=16493) Finished processing: 13
(MapBatches(udf) pid=16494) Finished processing: 12
                                                                                                     
✔️  Dataset execution finished in 90.54 seconds: 100%|██████████| 16.0/16.0 [01:30<00:00, 5.66s/ row]

- MapBatches(udf): Tasks: 5; Queued blocks: 0; Resources: 5.0 CPU, 700.0B object store:  69%|██████▉   | 11.0/16.0 [01:30<00:24, 4.85s/ row]
- MapBatches(udf): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 280.0B object store:  69%|██████▉   | 11.0/16.0 [01:30<00:24, 4.85s/ row]
- MapBatches(udf): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 280.0B object store: 100%|██████████| 16.0/16.0 [01:30<00:00, 5.28s/ row]
- MapBatches(udf): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 280.0B object store: 100%|██████████| 16.0/16.0 [01:30<00:00, 5.28s/ row]
- MapBatches(udf): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 280.0B object store: 100%|██████████| 16.0/16.0 [01:30<00:00, 5.66s/ row]
(MapBatches(udf) pid=16495) Finished processing: 16
(MapBatches(udf) pid=16489) Finished processing: 15

I believe the number of blocks ray.data calculated did not match the number of actors specified.

NB. the number or blocks ray.data chunks is optimized for large scale data processing. So for your small scale example, try override_num_blocks=some_higher_number on the data ingestion stage to increase the granularity of the data chunks, which will then enable the saturation of map_batches.