New to here and not sure if this is the best subformu to ask, but I can’t find and info on how to used Dask with Ray Cluster and custom resources.
Of course, I am aware that there is something like: dask.annotate to specify custom resources for the job, but this seems not to work with Ray Cluster resources.
There is a Ray docs section Advanced Usage — Ray v1.9.2 on resource options, but I can’t make it work either with Dask and Ray.
Can anyone drop some hint where can I find some information on how to put some resource constraints/requirements on specific Dask operations that are scheduled buy Ray?
TLDR: I have a custom resource assigned to Ray Cluster head/worker. I am using Dask with Ray scheduler and I want dask-with-ray to respect custom resources assigned to each head/worker.
I tried modifying ray/python/ray/util/dask/scheduler.py by adding resource kwargs to _rayify_task function in this place
object_refs = dask_task_wrapper.options(
name=f"dask:{key!s}",
num_returns=(
1 if not isinstance(func, MultipleReturnFunc) else func.num_returns
),
resources=resources,
).remote(
func,
repack,
key,
ray_pretask_cbs,
ray_posttask_cbs,
*arg_object_refs,
)
```, but still, the task is scheduled on a Ray worker with 0 custom resources assigned.
@mmww Thanks for trying out Dask-on-Ray! We have an open feature request for interoperability with dask.annotate, which seems to be exactly what you’re looking for.
Thanks for letting me know about the issue. I was searching in Github, but somehow missed it. Any ETA for this feature? If I could be of any help let me know.
@mmww I just opened up a PR that adds support! This should most likely be merged by EOD, at which point you’ll be able to try out a master commit or nightly wheel.
import ray
from ray.util.dask import enable_dask_on_ray
import dask
import dask.array as da
ray.init(num_cpus=2, resources={'custom_resource': 1, 'other_custom_resource': 1, 'another_custom_resource': 1})
enable_dask_on_ray()
with dask.annotate(
ray_remote_args=dict(num_cpus=2, resources={"custom_resource": 10})
):
d_arr = da.ones(100)
with dask.annotate(ray_remote_args=dict(resources={"other_custom_resource": 10})):
d_arr = 2 * d_arr
sum_ = d_arr.sum()
result = sum_.compute(
ray_remote_args=dict(max_retries=5, resources={"another_custom_resource": 10}),
optimize_graph=False,
)
print(result)
# 200
ray.shutdown()
The code works and none framework is complaining about insufficient resources.
Should the annotations work in ray.init local mode? I would expect the code above to fail, as there is only 1 piece of each resource, while each operation requires at least 10. Am I missing something?
EDIT:
Also after specifying the resources in ray.init() and then running ray.available_resources() I can see that the resources are added to Ray and visible.