Hi everyone,
my question is about concurrent queries in a Serve instance.
We use a setup which can be simplified to the following example. I deployed two simple functions, where the second function depends on the output of the first function:
import asyncio
from typing import Coroutine
import ray
from ray import serve
@serve.deployment()
async def composed_model(_id: int):
first_func_h = first_func.get_handle()
second_func_h = second_func.get_handle()
first_res_h = first_func_h.remote(_id=_id)
second_func_h.remote(_id=first_res_h)
@serve.deployment
async def first_func(_id):
if _id == 0:
await asyncio.sleep(1000)
print(f'First output: {_id}')
return _id
@serve.deployment
async def second_func(_id):
while isinstance(_id, ray.ObjectRef) or isinstance(_id, Coroutine):
_id = await _id
print(f'Second output: {_id}')
return _id
client = serve.start(detached=True)
composed_model.deploy()
first_func.deploy()
second_func.deploy()
main_p = composed_model.get_handle()
main_p.remote(_id=0)
main_p.remote(_id=1)
Expected output
When executing the script above, the expected output would be that both function process the second query, since we are using async code:
First output: 1
Second output: 1
Actual output
However, the second function seems to be blocked by the first query with _id=0 and the second query is only processed by the first function:
First output: 1
Workaround
Currently we are using a workaround, where we await the output of the first function:
@serve.deployment()
async def composed_model(_id: int):
first_func_h = first_func.get_handle()
second_func_h = second_func.get_handle()
first_res_h = first_func_h.remote(_id=_id)
second_func_h.remote(_id=await first_res_h)
With this workaround, we get the expected output mentioned above. However, this behavior blocks our performance, since this creates a bottleneck and we have to wait for the first function to finish, which in our case takes quite some time. In our setup we have multiple of these long running functions and the following functions would have to wait for all of them to be finished.
Another workaround would be to use the asyncio.wait() function, but we expected, that above mentioned example should already work.
Is there a reason for this behavior, or is this some kind of bug?