Resource limits in ray.remote functions

In ray.remote(num_cpus=X), is this an upperbound on the number of cpus this function can access?

ray.remote(num_cpus=X)
def f(i):
    return i

So if I set X = 10 and run the following:

results_refs = [for ray.remote(i) for i in range(100)] 
results = ray.get(result_refs) 

Ray should technically not use more than 10 cpus at a time for this task. Am I correct?

Note that resources you specify for ray tasks and actors don’t really enforce the real resource isolation (so the task can still use more then 10 cpus). You can think you are just giving a hint to our scheduler to have a right scheduling decision. Real resource isolation is typically a job for VM or containers.

If you really would like to have resource isolation such as enforcing cpu affinity, you can use other libraries. For example, you can use pstuill to achieve this python - How to run ray correctly? - Stack Overflow

I believe ray.wait was what I was looking for, but I am not sure.

I followed this advice:

[Ray Tips and Tricks, Part I — ray.wait | by Dean Wampler | Distributed Computing with Ray | Medium]

By controlling num_returns argument in ray.wait, I was able to process only those tasks that were “batched”. I am not sure if this is the correct approach though.

The problem I am having is that ray.init was already called with N num_cpus beforehand and I am making calls to an API inside a task where the number of calls to the API is rate limited to only 10 concurrent connections.

Ah, I see. I misunderstood your question I think.

So, in your example, Ray can only run total_num_cpus // 10 number of tasks at a time.

Using ray.wait is one of solutions to achieve this. For example,

@ray.remote(num_cpus=1)
def f():
    pass

refs = [f.remote() for _ in range(10)]
# Rate limit to 10 tasks running at a time.
ready, unready = ray.wait(refs, timeout=0)
while unready:
    ray.get(ready) # do something with this
    if len(unready) <= 10:
        for _ in range(10 - len(unready)):
           unready.append(f.remote())
    ready, unready = ray.wait(unready, timeout=0)

This is super interesting. Now I am totally confused about what setting num_cpus does. I have 48 total_num_cpus. So setting num_cpus=10 in the tasks allows only 48//10 tasks to run at a time?

Yes. You can think each task consumes 10 cpus from 48 when they are running, and they are returned when the task is done.

What is use cases would there be for setting num_cpus > 1. This would allow for say a library to use parallel computation inside a task or allow vectorization with say, numpy?

Yeah I also saw people setting num_cpus > 1 when they use OMP_NUM_THREADS > 1 as well I think.