[Dask on Ray] Low cluster utilization

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hi,

I have a problem running dask on ray:
My configuration: 4 nodes, 4 cpus each = 16 cpus in total
My use case:

Simple broadcast join, left is large parquet based dask dataframe, right is small pandas dataframe

a = dd.read_parquet(...) # dask dataframe
a.npartitions = 200

b = pd.read_csv(...) # pandas dataframe

def my_presubmit_cb(task, key, deps):
    print(f"About to submit task {key}!")

with RayDaskCallback(ray_presubmit=my_presubmit_cb):
    rr = a.merge(b, on=[....], how="inner", broadcast=True
    ).compute(scheduler=ray_dask_get)

I can see that cluster utilization is very low
there are approximately 4 tasks running in parallel, though i have 16 cpus and potential parallelism is 200.

I was looking at dask-on-ray scheduler code and saw

Am i correct that number of active task is limit by pool size (in my case, num_cpus on driver = 4)?
Can i increase parallelism?

Thank you.