How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hey Ray team! Thanks for building this framework.
I have a problem with retrieving data on-time with actor and ray.get (or ray.wait)
Here is the code: (close to serve-fastapi examples, i tried a lot of different envs, docker, with same result)
simple_code.py
# ray==2.6.1
# fastapi==0.95.2
import time
import requests
from dataclasses import dataclass
import ray
from ray import serve
from fastapi import FastAPI
from fastapi.responses import Response
app = FastAPI()
JOB_TIME = 2 # seconds
@dataclass
class Data:
idx: int
ref: object = None
@serve.deployment()
class Model:
def generate(self, idx):
start = time.time()
time.sleep(JOB_TIME)
print(f'JOB {idx} IS DONE?')
return start
@serve.deployment()
@serve.ingress(app)
class APIIngress:
def __init__(self, handle):
self.handle = handle
self.q = []
@app.get('/run_10_tasks')
async def test(self):
for idx in range(10):
idx = len(self.q)
ref = await self.handle.generate.remote(idx)
self.q.append(Data(ref=ref, idx=idx))
@app.get('/gg/{idx}')
async def get(self, idx):
idx = int(idx)
data = self.q[idx]
start = ray.get(data.ref) # block until job idx is available
# start is local start time of each job
walltime = time.time() - start
msg = f' ray.get Job #{data.idx}! It took: {walltime} sec\n'
msg += f'Retrieve time: {walltime - JOB_TIME}\n'
# this shoud return walltime of ~JOB_TIME, retrieve time << JOB_TIME, JOB_TIME in seconds
# from logs, methods called on Actor executed serially, job0->job1->...
# but ray.get (or blocking/non-blocking ray.wait) doesnot recieve anything for quite a while after
return Response(content=msg, media_type="text/plain")
entrypoint = APIIngress.bind(Model.bind())
serve.run(entrypoint)
requests.get("http://localhost:8000/run_10_tasks")
time.sleep(1)
IDX = 3 # choose some job out of 10
print(requests.get(f"http://localhost:8000/gg/{IDX}").text)
When I ran it, I expect to retrieve results with a blocking call ray.get(ref) ~ as soon as my job is done in remote actor. But what i get is a random delay after job is done (i know that job is done from logs).
So instead of
|===JOB1=====|======JOB2=====|
======ray.get++================
=============~~result==========
i got
|===JOB1=====|======JOB2======|=====JOB3====|=====JOB4=====|
=====ray.get+++++++++blocking+++++++++still blocking++++++++======
=============~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~result==
Total time always matches njobs x jobtime, but results are returned in groups:
R1
=========R2
=========R3
=========R4
===============R5
===============R6
instead of
R1
===R2
======R3
=========R4
============R5
===============R6
So for example i wait for R2 for 80 seconds instead of 20.
From run to run delay is not consistent, in a larger number of tasks (50) return groups can be as large as 30 tasks together at the same time. Delay time is always %jobtime == 0
ray.wait returns results that matches ray.get behavior.
I want to confirm whether this behavior is intended or if it might be a bug. If it’s the former, how can I modify my code to retrieve results from actor jobs as soon as they become available? Thanks!