No request can complete until all requests are ready

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Issue

Hello!

I have a really simple ray pipeline consisting of two applications but the time duration is very puzzling to me.

I have a service that takes 10 seconds to run:

@serve.deployment()
class SlowThing:

    def do_it(self, foo: Foo) -> str:
        logger.info(f"{foo.i} Start Request")
        duration = 0
        start = time.time()
        while duration < 10:
            duration = time.time() - start
            print("HI")
        # sleep(10)
        logger.info(f"{foo.i} Done Request")

        return foo.a.upper()

I am calling this from another service:

@serve.deployment()
@serve.ingress(app)
class HttpEntrypointService:

    def __init__(self):
        self.i =0

    @app.get("/")
    async def root(self, http_request:HttpRequest):
        self.i += 1
        i = self.i
        start = time.time()

        logger.info(f"{i} Got request")
        foo_handle: DeploymentHandle  = get_app_handle("slow_thing")
        foo = Foo(a=http_request.video_path, i=i)
        answer = await foo_handle.do_it.remote(foo)

        duration = time.time() - start

        logger.info(f"{i} Sending response {duration}")
        return answer

If I send a single request to the service, it completes in 10 seconds.

If I send 5 requests to the service, I would expect that it would behave like this:

Time 0s              Send 5 requests to service
Time 10s             1st request returns
Time 20s             2nd request returns
Time 30s             3rd request returns
Time 40s             4th request returns
Time 50s             5th request returns

But instead this happens:

Time 0s              Send 5 requests to service
Time 10s             1st request returns
Time 20s             
Time 30s             
Time 40s             
Time 50s             2nd, 3rd, 4th, 5th request returns

It seems like no request is able to complete while there is any pending work.

If I submit 10 requests, it takes 100 seconds before any request is returned:

Time 0s              Send 5 requests to service
Time 10s             1st request returns
...           
Time 100s             2nd, ..., 10th request returns

Logs:

Am I doing something obvious wrong?

(Note: Crossposted on slack

@Jacob_Summers Can you include the entire code here?

cc: @Gene @Sihan_Wang

1 Like

SlwoThing’s do_it() method is synchronous, so it’s probably blocking the event loop whenever it executes, which may be preventing finished requests from returning. If you make it async and add an await asyncio.sleep(0) call after each iteration of the while loop, this problem should go away.

1 Like

Is there an event loop for each deployment? Or a shared event loop for SlowThing & HttpEntrypointService?

Let me try that to see if it fixes the issue. I probably cannot do that in practice because the while loop is just a placeholder for a long-running-task (aka my model running) but interesting to know whether that will improve things.

Shreyas is totally right! If you modified code a bit to something like this, then it should work as expected

# deployment.py
import asyncio
import logging
import time
from fastapi import FastAPI
from ray import serve
from ray.serve import get_app_handle
from ray.serve.handle import DeploymentHandle
from starlette.requests import Request

logger = logging.getLogger("ray.serve")


class Foo:
    def __init__(self, a, i):
        self.a = a
        self.i = i


@serve.deployment()
class SlowThing:

    async def do_it(self, foo: Foo) -> str:
        logger.info(f"{foo.i} Start Request")
        duration = 0
        start = time.time()
        while duration < 10:
            duration = time.time() - start
            print("HI")
            await asyncio.sleep(1)
        logger.info(f"{foo.i} Done Request")

        return foo.a.upper()


app = FastAPI()


@serve.deployment()
@serve.ingress(app)
class HttpEntrypointService:

    def __init__(self):
        self.i = 0

    @app.get("/")
    async def root(self, http_request: Request):
        self.i += 1
        i = self.i
        start = time.time()

        logger.info(f"{i} Got request")
        foo_handle: DeploymentHandle = get_app_handle("slow_thing")
        foo = Foo(a=http_request.url.path, i=i)
        answer = await foo_handle.do_it.remote(foo)

        duration = time.time() - start

        logger.info(f"{i} Sending response {duration}")
        return answer


slow_thing = SlowThing.bind()
http_entrypoint_service = HttpEntrypointService.bind()
# config.yaml
applications:
  - name: slow_thing
    route_prefix: /slow_thing
    import_path: deployment:slow_thing
    runtime_env: {}
  - name: http_entrypoint_service
    route_prefix: /http_entrypoint_service
    import_path: deployment:http_entrypoint_service
    runtime_env: {}
# client.py
import aiohttp
import asyncio


async def send_query(num):
    async with aiohttp.ClientSession() as session:
        async with session.get("http://localhost:8000/http_entrypoint_service") as response:
            response = await response.text()
            print(f"in fetch_http: \nnum: {num} \n response: {response}")


async def run(num_times):
    tasks = [send_query(num) for num in range(num_times)]
    await asyncio.gather(*tasks)

num_requests = 5
asyncio.run(run(num_requests))

Is there an event loop for each deployment? Or a shared event loop for SlowThing & HttpEntrypointService?

Each replica is an independent process and runs its own event loop.

1 Like

Is there an event loop for each deployment? Or a shared event loop for SlowThing & HttpEntrypointService?

Each replica is an independent process and runs its own event loop.

This raises a couple questions:

  1. Can I view the event queue?
  2. How is the event queue ordered? First-in-first-out?
  3. When the SlowThing.do_it function completes, what needs to be put on the event queue? Why can’t the result be immediately serialized and sent?

Generally the behaviour feels pretty weird to me. User (aka me) defined work should be scheduled after system level work.

Would it be possible to:

  1. Order / prioritize / sort the event queue so that system tasks are scheduled before user (aka me) tasks?
  2. Have two separate queues - one for system tasks and one for user defined tasks? This could ensure that long running user jobs don’t block system calls.

TLDR: It feels like a bug to me that long running tasks are scheduled before returning completed tasks.

Can I view the event queue?

No, but I think this would be a pretty useful observability feature. Would you mind filing a feature request for it?

How is the event queue ordered? First-in-first-out?

The event loop on each Ray Serve actor is a standard, Python asyncio event loop. In general, there’s no guarantee about the ordering of tasks on a single event loop. The mental model is that whenever the code reaches an await statement, it may stop executing the current task and continue executing another async task.

When the SlowThing.do_it function completes, what needs to be put on the event queue? Why can’t the result be immediately serialized and sent?

The Serve replica calls the SlowThing.do_it inside an async function. The SlowThing.do_it method executes and returns asynchronously. After the method finishes executing, its result is stored by the event loop, and when the caller of SlowThing.do_it starts running again, it can use the result.

However, the event loop may not run the caller of SlowThing.do_it immediately after SlowThing.do_it finishes. For example, it may pick up a queued async task from another request and start executing that. This is why having a long, blocking section of code can impede performance. It can block other requests from finishing quickly.

If your use case requires blocking code, then you could run it in a separate thread using loop.run_in_executor. This lets the main event loop run smoothly without getting blocked.

1 Like

Another solution here could be to set the max_concurrent_queries to 1 for this deployment since it seems like you want to allow only one request to be processing at a time. Just a caveat though - the max_concurrent_queries may not always be met (eg. if 2 different callers try to send requests at the exact same time to the replica).

1 Like

Thanks for the reply @shrekris . This info helps to to really understand what is going on.

This behaviour is pretty surprising. It would probably be a good idea to improve the developer experience here. I noticed that my code was wrong here via testing but maybe you could do some of the following:

  1. Improve the Ray-Serve doc to explain what the process is to run long-running CPU/GPU bound tasks.
  2. Maybe it is possible to notify developers that long running tasks are not supported. Killing long running tasks might be one option. Logging long running tasks might be another. I haven’t used asyncio much so I am not sure about the feasibility of either of these solutions.

It might also make sense to build this functionality directly into ray. This could be done:

  1. Run all user-written code in a loop.run_in_executor call. This way user written code can never block Ray’s internal mechanics.
  2. Alternatively maybe you could add a flag to the deployment annotation, like this: @deployment(cpu_bound=True) . When that is true then you could execute the loop.run_in_executor on behalf of the user.

Thanks again for your insight. Have an awesome week.

If I do that, what happens to upstream requests? Are they queued somewhere (either in the caller aka HttpEntrypointService or the receiver akaSlowThing )?

Or will the service return something like a 503 Service Unavailable or a 429 Too Many Requests and the client will need to retry later?

If I do that, what happens to upstream requests? Are they queued somewhere (either in the caller aka HttpEntrypointService or the receiver akaSlowThing )?

They get queued in Serve’s HTTP Proxy.

This behaviour is pretty surprising. It would probably be a good idea to improve the developer experience here. I noticed that my code was wrong here via testing but maybe you could do some of the following:

Thanks for the detailed follow up and suggestion here @Jacob_Summers! I agree, we need to better document this workflow. I’ll also bring up the loop.run_in_executor suggestions with the team and see what they think.

Thanks again, and let us know if you have any further questions.

1 Like

The sending proxy or the receiving proxy?

:+1: Thanks for your help figuring this out. I really appreciate it. :rocket:

Looking at this further, I think this issue might be documented in this github issue: [Serve] (random) Delay in Collecting Results from a Remote Actor Deployment · Issue #40328 · ray-project/ray · GitHub

The sending proxy. This is the same as the receiving proxy– every request enters and exits from the same proxy.