Dask on Ray – custom resources

Hi,

New to here and not sure if this is the best subformu to ask, but I can’t find and info on how to used Dask with Ray Cluster and custom resources.

Of course, I am aware that there is something like: dask.annotate to specify custom resources for the job, but this seems not to work with Ray Cluster resources.

There is a Ray docs section Advanced Usage — Ray v1.9.2 on resource options, but I can’t make it work either with Dask and Ray.

Can anyone drop some hint where can I find some information on how to put some resource constraints/requirements on specific Dask operations that are scheduled buy Ray?

TLDR: I have a custom resource assigned to Ray Cluster head/worker. I am using Dask with Ray scheduler and I want dask-with-ray to respect custom resources assigned to each head/worker.

This is the place to ask!

cc @Clark_Zinzow can you address this question?

I tried modifying ray/python/ray/util/dask/scheduler.py by adding resource kwargs to _rayify_task function in this place

object_refs = dask_task_wrapper.options(
            name=f"dask:{key!s}",
            num_returns=(
                1 if not isinstance(func, MultipleReturnFunc) else func.num_returns
            ),
            resources=resources,
        ).remote(
            func,
            repack,
            key,
            ray_pretask_cbs,
            ray_posttask_cbs,
            *arg_object_refs,
        )
```, but still, the task is scheduled on a Ray worker with 0 custom resources assigned.

@mmww Thanks for trying out Dask-on-Ray! We have an open feature request for interoperability with dask.annotate, which seems to be exactly what you’re looking for.

1 Like

Thanks for letting me know about the issue. I was searching in Github, but somehow missed it. Any ETA for this feature? If I could be of any help let me know.

@mmww I just opened up a PR that adds support! This should most likely be merged by EOD, at which point you’ll be able to try out a master commit or nightly wheel.

2 Likes

Hi again @Clark_Zinzow

Sorry to bother you, but I tried to use the resources requirements when running ray.init() as local.

I tried the code from your example here ray/dask_ray_annotate_example.py at master · ray-project/ray · GitHub but to my surprise Ray/Dask seems not to respect the resources requirements.

Here is my exact code I used

import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da

ray.init(num_cpus=2, resources={'custom_resource': 1, 'other_custom_resource': 1, 'another_custom_resource': 1})
enable_dask_on_ray()

with dask.annotate(
    ray_remote_args=dict(num_cpus=2, resources={"custom_resource": 10})
):
    d_arr = da.ones(100)

with dask.annotate(ray_remote_args=dict(resources={"other_custom_resource": 10})):
    d_arr = 2 * d_arr

sum_ = d_arr.sum()

result = sum_.compute(
    ray_remote_args=dict(max_retries=5, resources={"another_custom_resource": 10}),
    optimize_graph=False,
)
print(result)
# 200
ray.shutdown()

The code works and none framework is complaining about insufficient resources.

Should the annotations work in ray.init local mode? I would expect the code above to fail, as there is only 1 piece of each resource, while each operation requires at least 10. Am I missing something?

EDIT:
Also after specifying the resources in ray.init() and then running ray.available_resources() I can see that the resources are added to Ray and visible.