Actors with "const functions"?

Hi
My use case is a follows:
Load system - once

Prepare new session (happens every 10 minutes)
Multiple independent calculations (each is ~500 ms and takes around 500ms) on the existing session.
this loop of new session and calculations on it will go over and over for weeks.

Explanation of the prepare new session and independent calculations:

  1. Prepare new session: a “prepare” function which reads many & large parameters from database and other locations outside of the ray cluster, the function does some preprocessing on these parameters (this takes around 30 seconds).

  2. Independent calculation: A “calculate” function. Which given inputs, uses those preprocessed parameters in read only, calculates some algorithm and outputs the answer, it doesn’t change any state in the class

I was thinking about using actors because with remote functions I can’t have my “prepare” function.
However actors run the functions according to order of submission from a single client, and if one function’s inputs are not ready it will block all other functions which can run. Since my “run” functions are “const functions”, I want to run by order of submission (since I do want priority) but if inputs are not ready, I would like to run the function that is ready.

Is there a way to do this with Actors or somehow with remote functions?

Example that shows issue with submission to actor where inputs are not ready:
@ray.remote(num_cpus=0)
class CounterA(object):
def init(self):
pass

def get_counter_long(self, input1: int):
    time.sleep(10)
    print(str(time.time()) + " in get long " + str(input1))
    return input1

def get_counter_short(self, input1: int):
    time.sleep(1)
    print(str(time.time()) + " in get short " + str(input1))
    return input1

@ray.remote(num_cpus=0)
class CounterB(object):
def init(self):
pass

def get_counter(self, input1: int):
    print(str(time.time()) + " in get counter " + str(input1))
    time.sleep(1)
    return input1

class MyTestCase(unittest.TestCase):

def test_something(self):

    ray.init(num_cpus=0)

    counter_actor_a1 = CounterA.options().remote()
    counter_actor_a2 = CounterA.options().remote()
    counter_actor_b = CounterB.options().remote()

    f1 = counter_actor_a1.get_counter_long.remote(1)
    f2 = counter_actor_a2.get_counter_short.remote(2)
    f3 = counter_actor_a2.get_counter_short.remote(3)
    f4 = counter_actor_b.get_counter.remote(f1)
    f5 = counter_actor_b.get_counter.remote(f2)
    f6 = counter_actor_b.get_counter.remote(f3)
    object_refs = [f4, f5, f6]
    remaining_refs = object_refs
    while len(remaining_refs) != 0:
        ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)
        object_refs = remaining_refs
        print(ray.get(ready_refs))

if name == ‘main’:
unittest.main()

Output

(CounterA pid=32596) 1638700409.844329 in get short 2
(CounterA pid=32596) 1638700410.8458147 in get short 3
(CounterA pid=32543) 1638700418.1666076 in get long 1
(CounterB pid=32705) 1638700418.169151 in get counter 1
[1]
(CounterB pid=32705) 1638700419.1709929 in get counter 2
[2]
(CounterB pid=32705) 1638700420.1728997 in get counter 3
[3]

I would expect to get output:

(CounterA pid=32596) 1638700409.844329 in get short 2
(CounterA pid=32596) 1638700410.8458147 in get short 3
(CounterA pid=32543) 1638700418.1666076 in get long 1
(CounterB pid=32705) 1638700409.845 in get counter 2
[2]
(CounterB pid=32705) 1638700410.846 in get counter 3
[3]
(CounterB pid=32705) 1638700418.167 in get counter 1
[1]

Hi @shiranbi,

What version of ray are you using?
With the latest version I consistently got the order you expected.

Here is a collab with what I got:

I get exactly what you get
1
2
3

I expect to get
2
3
1

since the input 2 is ready first, then 3 and input 1 is ready around 9 seconds later

@shiranbi,

Ahh I see.

Check out the last cell in the collab. I updated it with what you want.

I think the issues is that when you pass an object reference into a remote call, ray will implicitly resolve the promise and pass the value instead of the reference. So in your case f1 is getting resolved to its concrete value before it is passed to counter_actor_b.

Hopefully someone from the ray core group will correct this explanation if I am wrong.