Dask on Ray + Ray Distributed Cluster - Workers not getting used?

(Pleas feel free to re-categorize)

Context

The context of my work is that I am bench-marking Dask workloads (Petabytes of data) on Dask distributed backend vs Ray backend.

I’ve done the benchmark for Dask distributed backend. My workload would cost ~$1 million dollars (yes you’ve read that right) with Dask distributed backend - it’d be running 35 r5.24xlarge instances continuously for ~180 days.

I am hoping Ray backend can be better :slight_smile:

So I’ve set up a Ray cluster with AWS EC2 (autoscaler).
The workload is about a million Dask graphs. Each graph’s last node is a Dask delayed object, so I do the compute using delayed_obj.compute(scheduler=ray_dask_get).

Problem

It doesn’t seem that any of my worker nodes are getting utilized. I am submitting 1000 graphs to the cluster at a time, so there should be a plenty of work to do.

I am wondering if Dask on Ray + Ray Distributed Cluster is supposed to work? I am looking at this line ray/scheduler.py at master · ray-project/ray · GitHub . I have head/worker nodes with VCPU 96. Total CPU in the cluster should be 96 * 35. But when I do print(CPU_COUNT), where CPU_COUNT is from dask.system, I’ll get 96 (which makes sense). So does this mean thread pool of size 96 is getting used instead of pool of 96 * 35?

1 Like

HI @jennakwon06, Dask Distributed and Dask-on-Ray won’t work well together, since they are different backends for Dask. You can default to using the Dask-on-Ray scheduler by setting it in the config at the beginning of your script:

dask.config.set(scheduler=ray_dask_get)

Note that in this case, you should not create a Dask.distributed client, since according to the Dask docs, this will set the Dask.distributed cluster as the default backend, like this:

client = Client('scheduler:8786')

Can you also try something like

@ray.remote
def f():
    time.sleep(10)

refs = [f.remote() for _ in range(10000)]

and if the cluster gets utilized?

Just to clear something up, that threadpool that you linked to is only used by the Dask-on-Ray scheduler to submit Ray tasks, not to execute Ray tasks. Specifically, the Dask-on-Ray scheduler traverses the Dask graph and submits each Dask task as a Ray task to the Ray cluster, using that threadpool to submit tasks in parallel; the Ray tasks should be executing across all of the cores (and machines) that have been allocated to your Ray cluster.

1 Like

Hi Stephanie! Thanks for replying. Maybe my original post was misleading. I edited it a bit so hopefully you’ll have more context.

When I am benchmarking Ray, I am not using Dask Distributed client.

Hey @sangcho! Great suggestion. So yes the cluster gets utilized while submitting a script with that code.

Yes, I misunderstood in my previous answer! This sounds like either there is a bug in the setup or a bug in Ray. Could you open a github issue for this? A reproducible script would be ideal so that a Ray developer can try it out too.

Yep sounds good! I’m working directly with Sang on it :slight_smile:

1 Like

For the future references who would find similar issues, the problem was just that we don’t support Dask client’s .compute method! If you are calling .compute within Dask on Ray, it follows the same API behavior as regular Dask running in a cluster, which means .compute will be a blocking call, not generating async futures!

Yes. To complement Sang’s comment -

I have a pattern where I submit BATCH_SIZE number of Dask graphs (collections) to the cluster at a time by calling res_future = dask_client.compute(delayed_object) for BATCH_SIZE number of times. I collect the res_future in a list of length BATCH_SIZE and use dask.distributed.wait on the list to wait until all graphs are done. I do this many times.

I was trying to do the same thing in Ray and I didn’t realize Ray didn’t return futures! So that was my bad.

The problem I am having right now is in equal work distribution though. When BATCH_SIZE number of Dask graphs are submitted to the cluster, it seems that all the tasks go to the head node first, then get distributed across the workers. I believe Sang is working on improving it for all nodes to get evenly full, to reduce pressure on distributed object storage.