I have some questions about the run time of my code. My code follows below.
# python 3.6.9
# ray 1.10.0
import numpy as np
import ray
import time
## Initialize Ray
num_workers = 12
# Check if ray.init has been called yet.
if not ray.is_initialized():
# measure initialization time
start_time = time.perf_counter()
# Connect to an existing Ray cluster or start one and connect to it.
ray.init(num_cpus=num_workers,
include_dashboard=False)
print("Ray initialized!\n")
print("Duration: {:0.3f} seconds".format(time.perf_counter() - start_time))
print("Ray has started {} worker(s)\n".format(num_workers))
else:
print("Ray already initialized!\n")
## Define helper functions
def use_workers_for_first_time(num_workers):
@ray.remote(num_returns=1, num_cpus=1, max_calls=0)
def use_worker():
time.sleep(1)
call_list = []
for _ in range(num_workers):
call_list.append(use_worker.remote())
result_list = ray.get(call_list)
print("The worker(s) has/have been used for the first time.\n")
def count_number_of_active_workers(max_num=20, sleep_duration=1):
@ray.remote(num_returns=1, num_cpus=1, max_calls=0)
def get_start_time_worker(sleep_duration):
start_time = time.time()
time.sleep(sleep_duration)
return start_time
count_start_time = time.time()
call_list = []
for _ in range(max_num):
call_list.append(get_start_time_worker.remote(sleep_duration))
start_times = ray.get(call_list)
time_differences = abs(np.array(start_times)-count_start_time)
num_active = np.sum(time_differences<=0.5*sleep_duration)
print("There is/are {} worker(s) currently active.\n".format(num_active))
## Define remote functions of nested for loop
@ray.remote(num_returns=1, num_cpus=1, max_calls=0)
def inner_func():
time.sleep(2)
return 1
@ray.remote(num_returns=1, num_cpus=6, max_calls=0) # vary num_cpus {1,6}
def outer_func(input_bool):
print("Started at {}".format(time.time()))
if input_bool:
time.sleep(5)
# do inner loop
list_values = ray.get([inner_func.remote() for _ in range(3)])
if not input_bool:
time.sleep(5)
return list_values
use_workers_for_first_time(num_workers=num_workers)
count_number_of_active_workers()
## Case I (True): do sleep then inner loop
start_time = time.perf_counter()
call_list = []
for _ in range(4):
call_list.append(outer_func.remote(True))
result_list = ray.get(call_list)
print("Duration for True: {:0.3f}\n".format(time.perf_counter() - start_time))
count_number_of_active_workers()
## Case II (False): do inner loop then sleep
start_time = time.perf_counter()
call_list = []
for _ in range(4):
call_list.append(outer_func.remote(False))
result_list = ray.get(call_list)
print("Duration for False: {:0.3f}\n".format(time.perf_counter() - start_time))
count_number_of_active_workers()
## Stop ray
ray.shutdown()
print("Ray has been shutdown!")
Code Description:
My code consists of 7 parts: (1) Python library imports, (2) Ray initialization, (3) defining helper functions, (4) defining remote functions, (5) Case I (True), (6) Case II (False), and (7) stopping Ray. Two functions are defined for a nested for loop: an outer_func() and an inner_func(). In both Case I (True) and Case II (False), a nested for loop is executed remotely. The difference between the two cases is that in Case I (True) a sleep is done in the outer_func() before the loop with inner_func() is done, while in Case II (False) the sleep is done after the loop with inner_func(). The function outer_func() has num_cpus=6 and inner_func() has num_cpus=1. Below is the code output. I also tested the two cases in reverse and with num_cpus=1 for outer_func().
Code Output I:
ray.init: num_workers=12
outer_func: num_cpus=6
inner_func: num_cpus=1
order: Case I (True), Case II (False)
Ray initialized!
Duration: 2.991 seconds
Ray has started 12 worker(s)The worker(s) has/have been used for the first time.
There is/are 12 worker(s) currently active.
(outer_func pid=55511) Started at 1646475127.0693052
(outer_func pid=55520) Started at 1646475127.076406
(outer_func pid=55514) Started at 1646475132.0831115
(outer_func pid=55515) Started at 1646475132.0792353
Duration for True: 13.993There is/are 12 worker(s) currently active.
(outer_func pid=55516) Started at 1646475143.071734
(outer_func pid=55520) Started at 1646475143.075857
(outer_func pid=55518) Started at 1646475143.073674
(outer_func pid=56479) Started at 1646475143.0714066
Duration for False: 8.948There is/are 12 worker(s) currently active.
Ray has been shutdown!
Code Output II:
ray.init: num_workers=12
outer_func: num_cpus=6
inner_func: num_cpus=1
order: Case II (False), Case I (True)
Ray initialized!
Duration: 3.018 seconds
Ray has started 12 worker(s)The worker(s) has/have been used for the first time.
There is/are 12 worker(s) currently active.
(outer_func pid=61003) Started at 1646475256.1376336
(outer_func pid=60997) Started at 1646475256.1396778
(outer_func pid=60999) Started at 1646475256.139047
(outer_func pid=61002) Started at 1646475256.1357586
Duration for False: 8.094There is/are 12 worker(s) currently active.
(outer_func pid=61003) Started at 1646475266.232393
(outer_func pid=61000) Started at 1646475266.232137
(outer_func pid=61006) Started at 1646475271.238262
(outer_func pid=61601) Started at 1646475271.2375622
Duration for True: 13.083There is/are 12 worker(s) currently active.
Ray has been shutdown!
Code Output III:
ray.init: num_workers=12
outer_func: num_cpus=1
inner_func: num_cpus=1
order: Case I (True), Case II (False)
Ray initialized!
Duration: 3.050 seconds
Ray has started 12 worker(s)The worker(s) has/have been used for the first time.
There is/are 12 worker(s) currently active.
(outer_func pid=64798) Started at 1646475340.065754
(outer_func pid=64795) Started at 1646475340.0575573
(outer_func pid=64794) Started at 1646475340.066311
(outer_func pid=64803) Started at 1646475340.063826
Duration for True: 8.947There is/are 12 worker(s) currently active.
(outer_func pid=64794) Started at 1646475351.016567
(outer_func pid=64800) Started at 1646475351.0157266
(outer_func pid=64801) Started at 1646475351.01608
(outer_func pid=65514) Started at 1646475351.0154388
Duration for False: 8.102There is/are 12 worker(s) currently active.
Ray has been shutdown!
Code Output IV:
ray.init: num_workers=12
outer_func: num_cpus=1
inner_func: num_cpus=1
order: Case II (False), Case I (True)
Ray initialized!
Duration: 2.983 seconds
Ray has started 12 worker(s)The worker(s) has/have been used for the first time.
There is/are 12 worker(s) currently active.
(outer_func pid=68730) Started at 1646475428.5518966
(outer_func pid=68728) Started at 1646475428.552344
(outer_func pid=68729) Started at 1646475428.5521011
(outer_func pid=68726) Started at 1646475428.5528276
Duration for False: 8.210There is/are 12 worker(s) currently active.
(outer_func pid=68734) Started at 1646475438.7653153
(outer_func pid=68726) Started at 1646475438.7649622
(outer_func pid=68732) Started at 1646475438.7646677
(outer_func pid=69336) Started at 1646475438.764401
Duration for True: 9.362There is/are 12 worker(s) currently active.
Ray has been shutdown!
Observations:
- There are 12 workers started. The function outer_func() needs 6 CPUs and inner_func() needs 1 CPU per execution. So I expected that there would be at most 2 instances of outer_func() running at the same time, but for Case II (False) there are 4 instances running at the same time (see Code Output I and II).
- For both orders, Case I (True) takes about 13-14 seconds while Case II (False) takes about 8 seconds if num_cpus=6 for outer_func(). So Case II (False) is faster. When num_cpus=1 for outer_func() then Case I (True) takes about 9 seconds while Case II (False) takes about 8 seconds. So again Case II (False) is faster than Case I (True).
- For Case I (True), first two workers start with outer_func() and 5 seconds later (this is equal to the sleep) two more workers start with outer_func() if num_cpus=6 for outer_func(). In Case II (False), four workers with an outer_func() each start immediately if num_cpus=6 for outer_func().
- The number of workers is always equal to 12.
Questions:
- How can the function outer_func() be started four times simultaneously, when it needs 6 CPUs each and there are only 12 workers?
- What is done with the 6 CPUs that are reserved for an outer_func()? Are five of them reserved, but just not used? Are these five available for every remote function or only for the inner_func() instances that are started from a particular outer_func()?
I would like to hear what causes this observed behavior, what the answer are to my questions, and if there is any documentation available about it.