Ray.get delayed return results from deployment

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hey Ray team! Thanks for building this framework.
I have a problem with retrieving data on-time with actor and ray.get (or ray.wait)
Here is the code: (close to serve-fastapi examples, i tried a lot of different envs, docker, with same result)

simple_code.py
# ray==2.6.1
# fastapi==0.95.2

import time
import requests
from dataclasses import dataclass

import ray
from ray import serve
from fastapi import FastAPI
from fastapi.responses import Response

app = FastAPI()
JOB_TIME = 2 # seconds

@dataclass
class Data:
    idx: int
    ref: object = None


@serve.deployment()
class Model:
    def generate(self, idx):
        start = time.time()
        time.sleep(JOB_TIME)
        print(f'JOB {idx} IS DONE?')
        return start


@serve.deployment()
@serve.ingress(app)
class APIIngress:
    def __init__(self, handle):
        self.handle = handle
        self.q = []

    @app.get('/run_10_tasks')
    async def test(self):
        for idx in range(10):
            idx = len(self.q)
            ref = await self.handle.generate.remote(idx)
            self.q.append(Data(ref=ref, idx=idx))

    @app.get('/gg/{idx}')
    async def get(self, idx):
        idx = int(idx)
        data = self.q[idx]
        start = ray.get(data.ref) # block until job idx is available
        # start is local start time of each job
        walltime = time.time() - start
        msg = f' ray.get Job #{data.idx}! It took: {walltime} sec\n'
        msg += f'Retrieve time: {walltime - JOB_TIME}\n'

        # this shoud return walltime of ~JOB_TIME, retrieve time << JOB_TIME, JOB_TIME in seconds
        # from logs, methods called on Actor executed serially, job0->job1->...
        # but ray.get (or blocking/non-blocking ray.wait) doesnot recieve anything for quite a while after
        return Response(content=msg, media_type="text/plain")

entrypoint = APIIngress.bind(Model.bind())
serve.run(entrypoint)
requests.get("http://localhost:8000/run_10_tasks")
time.sleep(1)
IDX = 3 # choose some job out of 10
print(requests.get(f"http://localhost:8000/gg/{IDX}").text)

When I ran it, I expect to retrieve results with a blocking call ray.get(ref) ~ as soon as my job is done in remote actor. But what i get is a random delay after job is done (i know that job is done from logs).
So instead of
|===JOB1=====|======JOB2=====|
======ray.get++================
=============~~result==========

i got

|===JOB1=====|======JOB2======|=====JOB3====|=====JOB4=====|
=====ray.get+++++++++blocking+++++++++still blocking++++++++======
=============~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~result==

Total time always matches njobs x jobtime, but results are returned in groups:

R1
=========R2
=========R3
=========R4
===============R5
===============R6

instead of
R1
===R2
======R3
=========R4
============R5
===============R6

So for example i wait for R2 for 80 seconds instead of 20.
From run to run delay is not consistent, in a larger number of tasks (50) return groups can be as large as 30 tasks together at the same time. Delay time is always %jobtime == 0
ray.wait returns results that matches ray.get behavior.

I want to confirm whether this behavior is intended or if it might be a bug. If it’s the former, how can I modify my code to retrieve results from actor jobs as soon as they become available? Thanks!

typical log
(ServeReplica:default_Model pid=41170) JOB 0 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:13:57,778 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1003.3ms
(ServeReplica:default_Model pid=41170) JOB 1 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:13:58,782 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1002.3ms
(ServeReplica:default_Model pid=41170) JOB 2 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:13:59,786 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1003.7ms
(ServeReplica:default_Model pid=41170) JOB 3 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:14:00,790 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1003.2ms
(ServeReplica:default_Model pid=41170) JOB 4 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:14:01,794 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1003.7ms
(ServeReplica:default_Model pid=41170) JOB 5 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:14:02,798 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1003.8ms
(ServeReplica:default_Model pid=41170) JOB 6 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:14:03,802 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1003.6ms
(ServeReplica:default_Model pid=41170) JOB 7 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:14:04,808 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1005.2ms
(ServeReplica:default_Model pid=41170) JOB 8 IS DONE?
(ServeReplica:default_Model pid=41170) INFO 2023-09-07 18:14:05,810 default_Model default_Model#mNkppv fHGYDfxDvy /run_10_tasks default replica.py:723 - GENERATE OK 1002.3ms
 ray.get Job #3! It took: 7.028521537780762 sec
Retrieve time: 6.028521537780762

You can fix this by making Model an async actor

import asyncio

...

@serve.deployment()
class Model:
    def __init__(self):
        self.lock = asyncio.Lock()

    async def generate(self, idx):
        async with self.lock:
            start = time.time()
            await asyncio.sleep(JOB_TIME)
            print(f'JOB {idx} IS DONE?')
            return start
(ServeReplica:default_APIIngress pid=179290) INFO 2023-09-20 14:54:11,744 default_APIIngress default_APIIngress#vuzsEa FHxgnYcdfZ /run_10_tasks default replica.py:723 - __CALL__ OK 44.7ms
(ServeReplica:default_Model pid=179289) JOB 0 IS DONE?
(ServeReplica:default_Model pid=179289) INFO 2023-09-20 14:54:13,732 default_Model default_Model#PfSJbd FHxgnYcdfZ /run_10_tasks default replica.py:723 - GENERATE OK 2010.5ms
(ServeReplica:default_Model pid=179289) JOB 1 IS DONE?
(ServeReplica:default_Model pid=179289) INFO 2023-09-20 14:54:15,741 default_Model default_Model#PfSJbd FHxgnYcdfZ /run_10_tasks default replica.py:723 - GENERATE OK 4017.7ms
(ServeReplica:default_Model pid=179289) JOB 2 IS DONE?
(ServeReplica:default_Model pid=179289) INFO 2023-09-20 14:54:17,748 default_Model default_Model#PfSJbd FHxgnYcdfZ /run_10_tasks default replica.py:723 - GENERATE OK 6021.4ms
 ray.get Job #3! It took: 2.011533737182617 sec
Retrieve time: 0.011533737182617188

(ServeReplica:default_Model pid=179289) JOB 3 IS DONE?
(ServeReplica:default_Model pid=179289) INFO 2023-09-20 14:54:19,758 default_Model default_Model#PfSJbd FHxgnYcdfZ /run_10_tasks default replica.py:723 - GENERATE OK 8029.5ms
(ServeReplica:default_APIIngress pid=179290) INFO 2023-09-20 14:54:19,760 default_APIIngress default_APIIngress#vuzsEa TouJVyksWK /gg/3 default replica.py:723 - __CALL__ OK 7006.6ms

Hey @cade ! Thanks for answering. I have some concerns about your solution, as it will probably block whole async APIIngress / FastAPI thing?
Perhaps my simple code was too simple, in reality Ive got /run_task endpoint that starts tasks and respond immediately with UID, and some other endpoint(s) like /get/{UID} that checks if tasks is done and retracts it. I think with a job under asyncio.Lock() my API will be blocked most of a time.

Can you share the model code you have? The lock is merely to simulate some kind of sequential execution.

Pretty sure it is working as i intended only with lock with with lock and await asyncio sleep(TIME). My Job is a heavy pytorch model, so essentially this:

import asyncio

...

@serve.deployment()
class Model:
    async def generate(self, idx):
            start = time.time()
            # deep learning goes brrrr
            import numpy as np; EQTIME=5000;a = np.zeros((EQTIME,EQTIME)); a@a.T
            print(f'JOB {idx} IS DONE!')
            return start

(i only added async to def generate) You can adjust EQTIME to your local CPU, iv got around 2 seconds of job for eqtime 5000. (actual SD-based pytorch model takes about 30 seconds)
With that load code behaves like in my starter post, i.e. delaying response.

one way to fix this is to move the model execution logic to another thread, so that the main thread (asyncio thread) only handles IO in the Model deployment. But this is non-trivial, and maybe Ray Serve has a better recommended way of doing things.

I will move the issue to that category.

1 Like

Thanks cade! I still struggle to understand how to use Ray Serve. Here is simplified code

ray==2.7.1, w new api
import time
from ray import serve

@serve.deployment()
class Downstream:
    def generate(self, idx):
        time.sleep(2)
        print('generated', idx)
        return idx
    
@serve.deployment
class Ingress:
    def __init__(self, handle):
        self.handle = handle

    def start(self):
        self.refs = [self.handle.generate.remote(i) for i in range(10)]
            
    async def get_task(self, i):
        ref = self.refs[i]
        print(f'waiting on {i}', ref)
        return await ref


app = Ingress.bind(Downstream.bind())
handle = serve.run(app)
handle.start.remote()
response = handle.get_task.remote(3)
r = response.result()
print('Got #', r) # would not return right after job#3 ends, 
# instead will 'glue' to other results and get returned after job#8-9-10 end

It is not using http, problem stays. I cannot make async retrieve data from actor deployment. I tried to make it work through streaming response, but not successful (docs looks a bit raw for now). Also moving execution logic to another thread is indeed not trivial (at least for me) and looks funky

This sounds like a bug. It should return after task 3 is done without waiting for 8, 9, and 10 to finish. Could you file this as a bug on the GitHub repo, so we can track it and fix it?

thanks for responding, here it is : [Serve] (random) Delay in Collecting Results from a Remote Actor Deployment · Issue #40328 · ray-project/ray · GitHub