Hi
My use case is a follows:
Load system - once
Prepare new session (happens every 10 minutes)
Multiple independent calculations (each is ~500 ms and takes around 500ms) on the existing session.
this loop of new session and calculations on it will go over and over for weeks.
Explanation of the prepare new session and independent calculations:
-
Prepare new session: a “prepare” function which reads many & large parameters from database and other locations outside of the ray cluster, the function does some preprocessing on these parameters (this takes around 30 seconds).
-
Independent calculation: A “calculate” function. Which given inputs, uses those preprocessed parameters in read only, calculates some algorithm and outputs the answer, it doesn’t change any state in the class
I was thinking about using actors because with remote functions I can’t have my “prepare” function.
However actors run the functions according to order of submission from a single client, and if one function’s inputs are not ready it will block all other functions which can run. Since my “run” functions are “const functions”, I want to run by order of submission (since I do want priority) but if inputs are not ready, I would like to run the function that is ready.
Is there a way to do this with Actors or somehow with remote functions?
Example that shows issue with submission to actor where inputs are not ready:
@ray.remote(num_cpus=0)
class CounterA(object):
def init(self):
pass
def get_counter_long(self, input1: int):
time.sleep(10)
print(str(time.time()) + " in get long " + str(input1))
return input1
def get_counter_short(self, input1: int):
time.sleep(1)
print(str(time.time()) + " in get short " + str(input1))
return input1
@ray.remote(num_cpus=0)
class CounterB(object):
def init(self):
pass
def get_counter(self, input1: int):
print(str(time.time()) + " in get counter " + str(input1))
time.sleep(1)
return input1
class MyTestCase(unittest.TestCase):
def test_something(self):
ray.init(num_cpus=0)
counter_actor_a1 = CounterA.options().remote()
counter_actor_a2 = CounterA.options().remote()
counter_actor_b = CounterB.options().remote()
f1 = counter_actor_a1.get_counter_long.remote(1)
f2 = counter_actor_a2.get_counter_short.remote(2)
f3 = counter_actor_a2.get_counter_short.remote(3)
f4 = counter_actor_b.get_counter.remote(f1)
f5 = counter_actor_b.get_counter.remote(f2)
f6 = counter_actor_b.get_counter.remote(f3)
object_refs = [f4, f5, f6]
remaining_refs = object_refs
while len(remaining_refs) != 0:
ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
object_refs = remaining_refs
print(ray.get(ready_refs))
if name == ‘main’:
unittest.main()
Output
(CounterA pid=32596) 1638700409.844329 in get short 2
(CounterA pid=32596) 1638700410.8458147 in get short 3
(CounterA pid=32543) 1638700418.1666076 in get long 1
(CounterB pid=32705) 1638700418.169151 in get counter 1
[1]
(CounterB pid=32705) 1638700419.1709929 in get counter 2
[2]
(CounterB pid=32705) 1638700420.1728997 in get counter 3
[3]
I would expect to get output:
(CounterA pid=32596) 1638700409.844329 in get short 2
(CounterA pid=32596) 1638700410.8458147 in get short 3
(CounterA pid=32543) 1638700418.1666076 in get long 1
(CounterB pid=32705) 1638700409.845 in get counter 2
[2]
(CounterB pid=32705) 1638700410.846 in get counter 3
[3]
(CounterB pid=32705) 1638700418.167 in get counter 1
[1]