Ray dataset pipeline scheduling missing opportunities

Hi folks!

I noticed what I perceive to be a potential issue in ray dataset pipeline scheduling for a workload I am trying to organize. The pipeline has a number of steps each with differing computing requirements and generally each step takes longer than the next. What I noticed is that the last step does not fully utilize the resources even though there appear to be blocks waiting to run that step and sufficient resources.

As an example, I have constructed a dummy with similar task requirements and timings. The first task uses a core and executes very fast (so 48 come back quickly), called t1. The next task (t2) uses 16 cores so 3 can run in parallel and takes some time. t3 uses 40 cores and takes yet longer, while t4 uses a GPU (of which I have 4) and takes the longest. Each of these is 1-to-1 for blocks so subsequent tasks should be ready immediately after parents. while the first t4 starts shortly after the first t3 completes, this is not so for remainders, and we can observe a case where for example 4 t3 have completed while we see only 2 t4 (we have 2 remaining GPU and sufficient cores), and this accounts for some time for scheduling). The image below depicts this case.

Has anyone encountered this?

@Arik_Mitschang You might want to post the code so we can understand. BTW, are you using
DatasetPipeline from a previous version of Ray < 2.4? DatasetPipeline is deprecated and replaced with default under the hood stream execution model. If yes, then try with the latest
Ray 2.6.1 release.

cc: @ericl

Hi @Jules_Damji , yeah, sorry should have done that in the original. Here is what I used to simulate, the real workflow has longer compute times at each step and thus exacerbates the issue of underutilized GPU resources in the last step. The machine has 48 cores and 4 GPUs:

def t1(i):                                               
    return i                                             

def t2(i):                                               
    time.sleep(10)                                       
    return i                                             

def t3(i):                                               
    time.sleep(20)                                       
    return i                                             

def t4(i):                                               
    time.sleep(200)                                      
    return i                                             

p = ray.data.range(100, parallelism=100) \                                                                        
        .map_batches(t1, batch_size=None) \                                                                       
        .map_batches(t2, batch_size=None, num_cpus=16) \                                                          
        .map_batches(t3, num_cpus=40, batch_size=None) \                                                          
        .map_batches(t4, num_gpus=1, batch_size=1)                                                                
list(p.iter_batches())

and this is using just dataset in ray 2.6.1

I think I might have a lead on what is causing this behavior. It seems the pending task is counting against the total resource usage, causing the check if a new operator can run within limits to return false, at https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/execution/streaming_executor_state.py#L552-L553. I haven’t observed all these variables in detail yet, but the progress bar data seems to back it up as it reads from the same usage structure (at https://github.com/ray-project/ray/blob/4d866464e3f31ec81120aceb153536b58c56cbc7/python/ray/data/_internal/execution/streaming_executor.py#L240) and at some point the progress bar shows higher usage than the limits, for example:

Running: 57.0/48.0 CPU, 1.0/4.0 GPU ...

while the real usage is 41 CPUs (one 40 cpu task, plus one core for GPU task).

While there may be some good reasons for counting pending tasks against the total, it seems that priority should go toward scheduling downstream tasks when there are resources available in the cluster (e.g. according to ray.available_resources()).