Question about scheduling of remote calls when passing refs

In the following snippet -

import ray
import time
from datetime import datetime
ray.init()

@ray.remote
def slow():
    time.sleep(10)
    return "slow"

@ray.remote
def fast():
    time.sleep(1)
    return "fast"

@ray.remote
class Actor:

    def run(self, x):
        print(datetime.utcnow())
        print(x)

actor = Actor.options(max_concurrency=2).remote()
ref_slow = slow.remote()
actor.run.remote(ref_slow)

ref_fast = fast.remote()
actor.run.remote(ref_fast)

I was expecting the fast call to happen first in the actor, but it looks like it gets queued behind the slow call.

Version info

Name: ray
Version: 1.10.0

NAME="Ubuntu"
VERSION="18.04.6 LTS (Bionic Beaver)"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 18.04.6 LTS"

Hi there,
I think for max_concurrency it only works when it’s multi-threading or async operations are involved. AsyncIO / Concurrency for Actors — Ray v1.10.0

If my actor method is not async, by default it is a threaded actor right?

Anyhow, I find this behavior surprising. In this example, even if the max concurrency is 1, it should first execute the run call for fast, as that ref will become available faster

Ray will execute them one by one. They are not executing in parallel. You submit the slow one first, it’ll execute the slow. It’s like a FIFO queue.

Ok, thanks for the clarification. Do you think it is something which can be improved? FIFO makes sense when calling with concrete inputs, but when you have refs, I think it is efficient in most cases to run the function as and when the input dependencies to the function become available.

My use case is the following - I have a function F that utilizes two actors A and B. It calls in order A.method_1, B.method and A.method_2 , without ray.get , by passing the refs along. A separate function W then waits for this pipeline to complete, meanwhile, the function F can receive another input and start this pipeline.

The issue is that now the second pipeline cannot start until the first pipeline is entirely complete, which is inefficient. It should be possible to run A.method_1 of second pipeline while B.method of first pipeline is still running. I could get around the issue by splitting actor A into two actors, each with only 1 method, but if ray implements the smarter scheduling then I can use one less actor and save on some resources.

To execute one-by-one is for a safety guarantee and it makes the code simpler and easier to write especially when the ordering matters and for a lot of cases it is.

I think you can pass [obj_ref] to prevent the object from being dereferenced and use the obj_ref inside actor by passing it to another remote function.

some pseudocode here :

def run(obj, t):
   print(obj, t)

class Actor:
   def f(self, obj):
       t =datetime.utcnow()
       print(t)
       run.remote(obj[0], t)


actor.f.remote([ref_slow])

So basically defer the deref of the object ref.

Thanks for the suggestion. This could indeed work, but ideally, I would like to keep the computation within the actor.

I was exploring this a bit further, and it seems the scheduling behavior is actually non-deterministic.

With the following script

import ray
import time
import logging

logger = logging.getLogger("ray")

ray.init()

logger.info("start")

@ray.remote
def slow():
    time.sleep(10)
    return "slow"

@ray.remote
def fast():
    time.sleep(1)
    return "fast"

@ray.remote
class Actor:

    def run(self, x):
        logger.info(x)

actor = Actor.options(max_concurrency=2).remote()
ref_slow = slow.remote()
actor.run.remote(ref_slow)

ref_fast = fast.remote()
actor.run.remote(ref_fast)

time.sleep(15)

Outputs for 4 runs-

container_user@ws-saswata:~/workspace$ python schedulingtestactor.py 
2022-02-16 12:53:38,881 INFO services.py:1374 -- View the Ray dashboard at http://127.0.0.1:8266
2022-02-16 12:53:40,149 INFO schedulingtestactor.py:9 -- start
(Actor pid=2317) 2022-02-16 12:53:41,968        INFO schedulingtestactor.py:25 -- fast
(Actor pid=2317) 2022-02-16 12:53:51,944        INFO schedulingtestactor.py:25 -- slow
container_user@ws-saswata:~/workspace$ python schedulingtestactor.py 
2022-02-16 12:54:04,087 INFO services.py:1374 -- View the Ray dashboard at http://127.0.0.1:8266
2022-02-16 12:54:05,367 INFO schedulingtestactor.py:9 -- start
(Actor pid=2517) 2022-02-16 12:54:07,188        INFO schedulingtestactor.py:25 -- fast
(Actor pid=2517) 2022-02-16 12:54:17,171        INFO schedulingtestactor.py:25 -- slow
container_user@ws-saswata:~/workspace$ python schedulingtestactor.py 
2022-02-16 12:54:26,231 INFO services.py:1374 -- View the Ray dashboard at http://127.0.0.1:8266
2022-02-16 12:54:27,500 INFO schedulingtestactor.py:9 -- start
(Actor pid=2717) 2022-02-16 12:54:38,339        INFO schedulingtestactor.py:25 -- slow
(Actor pid=2717) 2022-02-16 12:54:39,311        INFO schedulingtestactor.py:25 -- fast
container_user@ws-saswata:~/workspace$ python schedulingtestactor.py 
2022-02-16 12:54:50,914 INFO services.py:1374 -- View the Ray dashboard at http://127.0.0.1:8266
2022-02-16 12:54:52,236 INFO schedulingtestactor.py:9 -- start
(Actor pid=2917) 2022-02-16 12:55:03,087        INFO schedulingtestactor.py:25 -- slow
(Actor pid=2917) 2022-02-16 12:55:04,057        INFO schedulingtestactor.py:25 -- fast

Hmmm, interesting. I think we also have something like out-of-order execution in actors, but I’m not sure about that. @sangcho do you have some ideas about this? Could it be related to remote task submission being done async?

cc @Chen_Shen

I think the ordering is not guaranteed when max_concurrency is set. I remember Chen made changes to remove the ordering property for threaded/async actors. (Have we documented?)

Btw, if max_concurrency is set to be 2, I think it becomes a threaded actor, meaning the actor method can run in multiple threads.

Slow call being scheduled before fast call seems a bit unexpected from my perspective. Maybe we need some investigation. When I call this on my local laptop, the fast is always called first

yes, for threaded and async actors, the tasks may be executed out of submission order.

https://docs.ray.io/en/latest/fault-tolerance.html

Thanks for the answers!

Based on your answers, it seems to me that when an actor method is called with a ref, the waiting for the ref to be available is done within the actor process/thread itself - so if the actor is not a threaded actor, or the max concurrency is 1, the actor is blocked at this point and cannot accept other inputs. Is my understanding correct?

Yes, you are right. If you don’t have a concurrency, it is possible the slow one blocking the fast call because slow call should be processed first.

But I don’t actually know if this is a desired behavior we intended. My expectation is that we should submit tasks “after dependencies are resolved”, meaning the actor task that depends on the fast call should be submitted first under the hood. cc @ericl is it the expected behavior? It looks like tasks that depends on the slow call has the low seq no, so the fast one is blocked by that.

1 Like

I’m not able to reproduce this issue on nightly. The fast call always runs first. Maybe there was a bug in an earlier Ray version prior to the recent concurrency queue improvements?