Violation of number of reserved CPUs for remote function

I have some questions about the run time of my code. My code follows below.

# python 3.6.9
# ray 1.10.0
import numpy as np
import ray
import time


## Initialize Ray
num_workers = 12

# Check if ray.init has been called yet.
if not ray.is_initialized():
    # measure initialization time
    start_time = time.perf_counter()

    # Connect to an existing Ray cluster or start one and connect to it.
    ray.init(num_cpus=num_workers,
            include_dashboard=False)

    print("Ray initialized!\n")
    print("Duration: {:0.3f} seconds".format(time.perf_counter() - start_time))
    print("Ray has started {} worker(s)\n".format(num_workers))

else:
    print("Ray already initialized!\n")


## Define helper functions
def use_workers_for_first_time(num_workers):
    @ray.remote(num_returns=1, num_cpus=1, max_calls=0)
    def use_worker():
        time.sleep(1)
        
    call_list = []
    for _ in range(num_workers):
        call_list.append(use_worker.remote())
    result_list = ray.get(call_list)
    
    print("The worker(s) has/have been used for the first time.\n")
    
def count_number_of_active_workers(max_num=20, sleep_duration=1):
    @ray.remote(num_returns=1, num_cpus=1, max_calls=0)
    def get_start_time_worker(sleep_duration):
        start_time = time.time()
        time.sleep(sleep_duration)
        return start_time
    
    count_start_time = time.time()
    call_list = []
    for _ in range(max_num):
        call_list.append(get_start_time_worker.remote(sleep_duration))
    start_times = ray.get(call_list)
    
    time_differences = abs(np.array(start_times)-count_start_time)
    num_active = np.sum(time_differences<=0.5*sleep_duration)
    
    print("There is/are {} worker(s) currently active.\n".format(num_active))
    
    
## Define remote functions of nested for loop
@ray.remote(num_returns=1, num_cpus=1, max_calls=0)
def inner_func():
    time.sleep(2)

    return 1

@ray.remote(num_returns=1, num_cpus=6, max_calls=0) # vary num_cpus {1,6}
def outer_func(input_bool):
    print("Started at {}".format(time.time()))

    if input_bool:
        time.sleep(5)

    # do inner loop
    list_values = ray.get([inner_func.remote() for _ in range(3)])

    if not input_bool:
        time.sleep(5)

    return list_values


use_workers_for_first_time(num_workers=num_workers)
count_number_of_active_workers()


## Case I (True): do sleep then inner loop
start_time = time.perf_counter()
call_list = []
for _ in range(4):
    call_list.append(outer_func.remote(True))
result_list = ray.get(call_list)
print("Duration for True: {:0.3f}\n".format(time.perf_counter() - start_time))


count_number_of_active_workers()


## Case II (False): do inner loop then sleep
start_time = time.perf_counter()
call_list = []
for _ in range(4):
    call_list.append(outer_func.remote(False))
result_list = ray.get(call_list)
print("Duration for False: {:0.3f}\n".format(time.perf_counter() - start_time))


count_number_of_active_workers()


## Stop ray
ray.shutdown()
print("Ray has been shutdown!")

Code Description:
My code consists of 7 parts: (1) Python library imports, (2) Ray initialization, (3) defining helper functions, (4) defining remote functions, (5) Case I (True), (6) Case II (False), and (7) stopping Ray. Two functions are defined for a nested for loop: an outer_func() and an inner_func(). In both Case I (True) and Case II (False), a nested for loop is executed remotely. The difference between the two cases is that in Case I (True) a sleep is done in the outer_func() before the loop with inner_func() is done, while in Case II (False) the sleep is done after the loop with inner_func(). The function outer_func() has num_cpus=6 and inner_func() has num_cpus=1. Below is the code output. I also tested the two cases in reverse and with num_cpus=1 for outer_func().

Code Output I:
ray.init: num_workers=12
outer_func: num_cpus=6
inner_func: num_cpus=1
order: Case I (True), Case II (False)

Ray initialized!

Duration: 2.991 seconds
Ray has started 12 worker(s)

The worker(s) has/have been used for the first time.

There is/are 12 worker(s) currently active.

(outer_func pid=55511) Started at 1646475127.0693052
(outer_func pid=55520) Started at 1646475127.076406
(outer_func pid=55514) Started at 1646475132.0831115
(outer_func pid=55515) Started at 1646475132.0792353
Duration for True: 13.993

There is/are 12 worker(s) currently active.

(outer_func pid=55516) Started at 1646475143.071734
(outer_func pid=55520) Started at 1646475143.075857
(outer_func pid=55518) Started at 1646475143.073674
(outer_func pid=56479) Started at 1646475143.0714066
Duration for False: 8.948

There is/are 12 worker(s) currently active.

Ray has been shutdown!

Code Output II:
ray.init: num_workers=12
outer_func: num_cpus=6
inner_func: num_cpus=1
order: Case II (False), Case I (True)

Ray initialized!

Duration: 3.018 seconds
Ray has started 12 worker(s)

The worker(s) has/have been used for the first time.

There is/are 12 worker(s) currently active.

(outer_func pid=61003) Started at 1646475256.1376336
(outer_func pid=60997) Started at 1646475256.1396778
(outer_func pid=60999) Started at 1646475256.139047
(outer_func pid=61002) Started at 1646475256.1357586
Duration for False: 8.094

There is/are 12 worker(s) currently active.

(outer_func pid=61003) Started at 1646475266.232393
(outer_func pid=61000) Started at 1646475266.232137
(outer_func pid=61006) Started at 1646475271.238262
(outer_func pid=61601) Started at 1646475271.2375622
Duration for True: 13.083

There is/are 12 worker(s) currently active.

Ray has been shutdown!

Code Output III:
ray.init: num_workers=12
outer_func: num_cpus=1
inner_func: num_cpus=1
order: Case I (True), Case II (False)

Ray initialized!

Duration: 3.050 seconds
Ray has started 12 worker(s)

The worker(s) has/have been used for the first time.

There is/are 12 worker(s) currently active.

(outer_func pid=64798) Started at 1646475340.065754
(outer_func pid=64795) Started at 1646475340.0575573
(outer_func pid=64794) Started at 1646475340.066311
(outer_func pid=64803) Started at 1646475340.063826
Duration for True: 8.947

There is/are 12 worker(s) currently active.

(outer_func pid=64794) Started at 1646475351.016567
(outer_func pid=64800) Started at 1646475351.0157266
(outer_func pid=64801) Started at 1646475351.01608
(outer_func pid=65514) Started at 1646475351.0154388
Duration for False: 8.102

There is/are 12 worker(s) currently active.

Ray has been shutdown!

Code Output IV:
ray.init: num_workers=12
outer_func: num_cpus=1
inner_func: num_cpus=1
order: Case II (False), Case I (True)

Ray initialized!

Duration: 2.983 seconds
Ray has started 12 worker(s)

The worker(s) has/have been used for the first time.

There is/are 12 worker(s) currently active.

(outer_func pid=68730) Started at 1646475428.5518966
(outer_func pid=68728) Started at 1646475428.552344
(outer_func pid=68729) Started at 1646475428.5521011
(outer_func pid=68726) Started at 1646475428.5528276
Duration for False: 8.210

There is/are 12 worker(s) currently active.

(outer_func pid=68734) Started at 1646475438.7653153
(outer_func pid=68726) Started at 1646475438.7649622
(outer_func pid=68732) Started at 1646475438.7646677
(outer_func pid=69336) Started at 1646475438.764401
Duration for True: 9.362

There is/are 12 worker(s) currently active.

Ray has been shutdown!

Observations:

  1. There are 12 workers started. The function outer_func() needs 6 CPUs and inner_func() needs 1 CPU per execution. So I expected that there would be at most 2 instances of outer_func() running at the same time, but for Case II (False) there are 4 instances running at the same time (see Code Output I and II).
  2. For both orders, Case I (True) takes about 13-14 seconds while Case II (False) takes about 8 seconds if num_cpus=6 for outer_func(). So Case II (False) is faster. When num_cpus=1 for outer_func() then Case I (True) takes about 9 seconds while Case II (False) takes about 8 seconds. So again Case II (False) is faster than Case I (True).
  3. For Case I (True), first two workers start with outer_func() and 5 seconds later (this is equal to the sleep) two more workers start with outer_func() if num_cpus=6 for outer_func(). In Case II (False), four workers with an outer_func() each start immediately if num_cpus=6 for outer_func().
  4. The number of workers is always equal to 12.

Questions:

  1. How can the function outer_func() be started four times simultaneously, when it needs 6 CPUs each and there are only 12 workers?
  2. What is done with the 6 CPUs that are reserved for an outer_func()? Are five of them reserved, but just not used? Are these five available for every remote function or only for the inner_func() instances that are started from a particular outer_func()?

I would like to hear what causes this observed behavior, what the answer are to my questions, and if there is any documentation available about it.

1 Like

Ray has a mechanism called “borrowing CPUs” to avoid deadlock in some scenarios. It happens when you call ray.get. See the below example;

ray.init(num_cpus=1)
@ray.remote
def f():
    pass

@ray.remote
def g():
    ray.get(f.remote())

ray.get(g.remote())

The above program is supposed to hang (because g requires 1CPU, and f cannot be scheduled forever because g already uses 1 CPU), but to avoid this issue, Ray releases the 1 CPU usage while g is running “ray.get”. It’s called g borrows CPU usage to f (so that it won’t be blocked). This can happen when ray.get or ray.wait is called within the remote method.

My guess is your observation 1 & 2 could be answered by this, but lmk if this is wrong!

1 Like

Hello @sangcho,

Many thanks for your reply.

  1. When a deadlock occurs, are all reserved CPUs then released or only the ones that are needed?
  2. I understand from your answer that the released CPUs can only be used for the inner_func() calls made by the specific outer_func() where the deadlock occurs, is that correct? So not for other tasks?
  3. Is there any documentation available on “borrowing CPUs”?

My Version Of Your Code:
I noticed that with the code you shared that an extra worker is temporarily created to solve the deadlock. It is unclear to me when this extra worker is killed. Below is a modified version of your code.

# python 3.6.9
# ray 1.10.0
import numpy as np
import ray
import time


if not ray.is_initialized():
    ray.init(num_cpus=1,
            include_dashboard=False)
    print("Ray initialized!")
    print("Ray has started {} worker!\n".format(N))
else:
    print("Ray already initialized!\n")

    
@ray.remote(num_cpus=1)
def f1():
    return ("Inner f1", os.getpid(), np.round(time.time(), 4))

@ray.remote(num_cpus=1)
def f2():
    return ("Inner f2", os.getpid(), np.round(time.time(), 4))

@ray.remote(num_cpus=1)
def g(f=f1):
    info_outer = ("Outer g", os.getpid(), np.round(time.time(), 4))
    info_inner = ray.get(f.remote())
    return info_outer, info_inner


info = ray.get(g.remote(f1))
print("{}\n".format(info))

info = ray.get(g.remote(f2))
print("{}\n".format(info))

info = ray.get([f1.remote() for _ in range(2)])
print("{}\n".format(info))


ray.shutdown()
print("Ray has been shutdown!") 

Code Output:

Ray initialized!
Ray has started 1 worker!

((‘Outer g’, 48993, 1647011336.8826), (‘Inner f1’, 49174, 1647011337.8569))

((‘Outer g’, 48993, 1647011337.9342), (‘Inner f2’, 49174, 1647011337.9378))

[(‘Inner f1’, 48993, 1647011337.9418), (‘Inner f1’, 48993, 1647011337.9425)]

Ray has been shutdown!

Observations:

  1. Ray starts an additional worker with pid=49174 to resolve the deadlock.
  2. The extra worker can be reused for another function to solve another deadlock.
  3. The extra worker cannot be used if there is no deadlock because now the worker with pid=48993 is used two times.

Question:

  1. Can the extra worker only be used if there is a new deadlock soon enough after it was used for the last time?

Ray workers can get reused for any task as long as it’s part of the same job. If the number of workers exceeds the number of cores and a worker has been idle long enough (I think the default is a few seconds), then Ray will kill the worker.

Hello @Stephanie_Wang,

Thank you for your reply.

What does “the same job” in your answer exactly mean? Do you mean that the extra worker can only be used if g() is executed because the extra worker has been created to resolve a deadlock that occurred by executing g()?

In my code, the extra worker was created to execute f1() remotely, but after that, the worker could also be used for f2() by giving f2() as an argument to g(). So this indicates that the extra worker can be used for another function (as long as g() is involved).

In another topic of me, @sangcho stated that Ray uses a soft limit for the number of workers and that Ray kills the surplus of workers when they are idle.

Same job means that they share the same root driver (the program block that calls ray.init). It looks like your program is all under one driver, so yes the workers could get reused for other functions.

1 Like

I conducted a few more experiments.

My New Code:

# python 3.6.9
# ray 1.10.0
import numpy as np
import ray
import time


if not ray.is_initialized():
    ray.init(num_cpus=1,
            include_dashboard=False)
    print("Ray initialized!")
    print("Ray has started {} worker!\n".format(1))
else:
    print("Ray already initialized!\n")

    
@ray.remote(num_cpus=1)
def f1():
    return ("Inner f1", os.getpid(), np.round(time.time(), 4))

@ray.remote(num_cpus=1)
def f2():
    return ("Inner f2", os.getpid(), np.round(time.time(), 4))

@ray.remote(num_cpus=1)
def g1(f):
    info_outer = ("Outer g1", os.getpid(), np.round(time.time(), 4))
    info_inner = ray.get(f.remote())
    return info_outer, info_inner

@ray.remote(num_cpus=1)
def g2(f):
    info_outer = ("Outer g2", os.getpid(), np.round(time.time(), 4))
    info_inner = ray.get(f.remote())
    return info_outer, info_inner


info = ray.get(g1.remote(f1))
print("{}\n".format(info))

info = ray.get(g2.remote(f2))
print("{}\n".format(info))

ref_list = [f1.remote(), f2.remote()]
info = ray.get(ref_list)
print("{}\n".format(info))

info = ray.get(g1.remote(f1))
print("{}\n".format(info))

time.sleep(5)
print("Sleep for 5 seconds\n")

info = ray.get(g2.remote(f2))
print("{}\n".format(info))


ray.shutdown()
print("Ray has been shutdown!") 

Code Output:

Ray initialized!
Ray has started 1 worker!

((‘Outer g1’, 81783, 1647347311.376), (‘Inner f1’, 81850, 1647347312.3727))

((‘Outer g2’, 81783, 1647347312.4727), (‘Inner f2’, 81850, 1647347312.4757))

[(‘Inner f1’, 81783, 1647347312.4791), (‘Inner f2’, 81783, 1647347312.4804)]

((‘Outer g1’, 81783, 1647347312.4832), (‘Inner f1’, 81850, 1647347312.4847))

Sleep for 5 seconds

((‘Outer g2’, 81783, 1647347317.4918), (‘Inner f2’, 82120, 1647347318.4617))

Ray has been shutdown!

Observations:

  1. Ray starts an additional worker with pid=81850 to resolve the deadlock. The deadlock occurs because g1() is a nested for loop, while there was only 1 worker.
  2. The new worker can be used to solve the deadlock in another nested for loop. The outer and inner function may be different than in the case of the first deadlock.
  3. If two separate remote functions are executed and no deadlock occurs then the extra worker is not used.
  4. The extra worker is killed if it is idle for too long.

My conclusion is that the extra worker is only intended for resolving deadlocks. After a certain amount of time being idle, the extra worker is killed.