Cancelling requests during model composition results in unresolved async tasks

1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.

2. Environment:

  • Ray version: 2.44
  • Python version: 3.11
  • OS: MacOS

3. What happened vs. what you expected:

  • Expected: Ray resolves all async tasks upon cancellation by retrieving their exception.
  • Actual: Some async tasks exceptions are never retrieved, which is logged as an error.

I am using model composition to implement a somewhat complex task-chain including multiple deployments. However, i am running into problems whenever the request was dropped. In this case, the scheduled tasks are cancelled, which is the intended behavior, but also logged as errors. I am unsure why this happens. My expectation would be for Ray to propagate the cancellation through all composed DeploymentResponses, which doesn’t seem to happen. The following is a minimum example, where three Deployments are called in order. The request is dropped after 1 second, while the whole chain needs at least 3 seconds. This results in logged errors, where at least some async tasks were apparently not resolved before being garbage collected (my assumption, not sure).

import asyncio

import requests
from fastapi import FastAPI
from ray import serve
from ray.serve.handle import DeploymentResponse

# 1: Define a FastAPI app and wrap it in a deployment with a route handler.
app = FastAPI()


@serve.deployment
class A:
    def __init__(self):
        pass

    async def __call__(self, text):
        await asyncio.sleep(1)
        return f"{text} A"


@serve.deployment
class B:
    def __init__(self):
        pass

    async def __call__(self, text):
        await asyncio.sleep(1)
        return f"{text} B"


@serve.deployment
class C:
    def __init__(self):
        pass

    async def __call__(self, text):
        await asyncio.sleep(1)
        return f"{text} C"


@serve.deployment
@serve.ingress(app)
class FastAPIDeployment:
    def __init__(self, handles):
        self.handles = handles

    # FastAPI will automatically parse the HTTP request for us.
    @app.get("/foo")
    async def foo(self, txt: str) -> str:
        a: DeploymentResponse = self.handles["A"].remote(txt)
        b: DeploymentResponse = self.handles["B"].remote(a)
        c: DeploymentResponse = self.handles["C"].remote(b)
        return await c


# 2: Deploy the deployment.
serve.run(FastAPIDeployment.bind({
    "A": A.bind(),
    "B": B.bind(),
    "C": C.bind()
}), route_prefix="/")

# 3: Query the deployment and print the result.
print(requests.get("http://localhost:8000/foo", params={"txt": "foobar"}, timeout=1).json())

Output:

(ProxyActor pid=54058) INFO 2025-03-25 14:01:48,189 proxy 127.0.0.1 e2f0a5a5-aab3-4c34-afc3-7b4e6d2cb24f -- Client for request e2f0a5a5-aab3-4c34-afc3-7b4e6d2cb24f disconnected, cancelling request.
(ServeReplica:default:A pid=54053) INFO 2025-03-25 14:01:48,192 default_A z9plmhov e2f0a5a5-aab3-4c34-afc3-7b4e6d2cb24f -- CALL /foo CANCELLED 972.4ms
(ServeReplica:default:FastAPIDeployment pid=54055) INFO 2025-03-25 14:01:48,191 default_FastAPIDeployment dhcf6y0l e2f0a5a5-aab3-4c34-afc3-7b4e6d2cb24f -- GET /foo CANCELLED 999.9ms
(ServeReplica:default:FastAPIDeployment pid=54055) ERROR 2025-03-25 14:01:48,203 default_FastAPIDeployment dhcf6y0l -- LongPollHost errored
(ServeReplica:default:FastAPIDeployment pid=54055) Traceback (most recent call last):
(ServeReplica:default:FastAPIDeployment pid=54055)     raise CancelledError()
(ServeReplica:default:FastAPIDeployment pid=54055) concurrent.futures._base.CancelledError
(ServeReplica:default:FastAPIDeployment pid=54055) 
(ServeReplica:default:FastAPIDeployment pid=54055) During handling of the above exception, another exception occurred:
(ServeReplica:default:FastAPIDeployment pid=54055) 
(ServeReplica:default:FastAPIDeployment pid=54055) Traceback (most recent call last):
(ServeReplica:default:FastAPIDeployment pid=54055)   File "python/ray/_raylet.pyx", line 1888, in ray._raylet.execute_task
(ServeReplica:default:FastAPIDeployment pid=54055)   File "python/ray/_raylet.pyx", line 1996, in ray._raylet.execute_task
(ServeReplica:default:FastAPIDeployment pid=54055)   File "python/ray/_raylet.pyx", line 1895, in ray._raylet.execute_task
(ServeReplica:default:FastAPIDeployment pid=54055)   File "python/ray/_raylet.pyx", line 1829, in ray._raylet.execute_task.function_executor
(ServeReplica:default:FastAPIDeployment pid=54055)   File "python/ray/_raylet.pyx", line 4615, in ray._raylet.CoreWorker.run_async_func_or_coro_in_event_loop
(ServeReplica:default:FastAPIDeployment pid=54055) ray.exceptions.TaskCancelledError: Task: TaskID(ab6ba9967d954f192d73135284f0cb993724227201000000) was cancelled. 
(ServeReplica:default:FastAPIDeployment pid=54055) 
(ServeReplica:default:FastAPIDeployment pid=54055) Task exception was never retrieved
(ServeReplica:default:FastAPIDeployment pid=54055) future: <Task finished name='Task-18' coro=<UserCallableWrapper.call_user_method() done, defined at /Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/ray/serve/_private/replica.py:1558> exception=RequestCancelledError('e2f0a5a5-aab3-4c34-afc3-7b4e6d2cb24f')>
(ServeReplica:default:FastAPIDeployment pid=54055) Traceback (most recent call last):
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/ray/serve/_private/replica.py", line 1610, in call_user_method
(ServeReplica:default:FastAPIDeployment pid=54055)     result, sync_gen_consumed = await self._call_func_or_gen(
(ServeReplica:default:FastAPIDeployment pid=54055)                                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/ray/serve/_private/replica.py", line 1328, in _call_func_or_gen
(ServeReplica:default:FastAPIDeployment pid=54055)     result = await result
(ServeReplica:default:FastAPIDeployment pid=54055)              ^^^^^^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/ray/serve/_private/http_util.py", line 502, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     await self._asgi_app(
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     await super().__call__(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     await self.middleware_stack(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     raise exc
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     await self.app(scope, receive, _send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
(ServeReplica:default:FastAPIDeployment pid=54055)     raise exc
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
(ServeReplica:default:FastAPIDeployment pid=54055)     await app(scope, receive, sender)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
(ServeReplica:default:FastAPIDeployment pid=54055)     await self.middleware_stack(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
(ServeReplica:default:FastAPIDeployment pid=54055)     await route.handle(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
(ServeReplica:default:FastAPIDeployment pid=54055)     await self.app(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
(ServeReplica:default:FastAPIDeployment pid=54055)     await wrap_app_handling_exceptions(app, request)(scope, receive, send)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
(ServeReplica:default:FastAPIDeployment pid=54055)     raise exc
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
(ServeReplica:default:FastAPIDeployment pid=54055)     await app(scope, receive, sender)
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/starlette/routing.py", line 72, in app
(ServeReplica:default:FastAPIDeployment pid=54055)     response = await func(request)
(ServeReplica:default:FastAPIDeployment pid=54055)                ^^^^^^^^^^^^^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/fastapi/routing.py", line 278, in app
(ServeReplica:default:FastAPIDeployment pid=54055)     raw_response = await run_endpoint_function(
(ServeReplica:default:FastAPIDeployment pid=54055)                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
(ServeReplica:default:FastAPIDeployment pid=54055)     return await dependant.call(**values)
(ServeReplica:default:FastAPIDeployment pid=54055)            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/PycharmProjects/ray_minimal/main.py", line 54, in foo
(ServeReplica:default:FastAPIDeployment pid=54055)     return await c
(ServeReplica:default:FastAPIDeployment pid=54055)            ^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/ray/serve/handle.py", line 403, in __await__
(ServeReplica:default:FastAPIDeployment pid=54055)     replica_result = yield from self._fetch_future_result_async().__await__()
(ServeReplica:default:FastAPIDeployment pid=54055)                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(ServeReplica:default:FastAPIDeployment pid=54055)   File "/Users/aho/opt/anaconda3/envs/torchenv311/lib/python3.11/site-packages/ray/serve/handle.py", line 287, in _fetch_future_result_async
(ServeReplica:default:FastAPIDeployment pid=54055)     raise RequestCancelledError(self.request_id) from None
(ServeReplica:default:FastAPIDeployment pid=54055) ray.serve.exceptions.RequestCancelledError: Request e2f0a5a5-aab3-4c34-afc3-7b4e6d2cb24f was cancelled.

Hi @aho, welcome to the community and thanks for your post! To avoid the errors in your logs, I’d suggest handling the exceptions in a try/except block in your application code. Here’s a link to how this could be done. The one discrepancy from your repro and the example in the docs is that in your case a RequestCancelledError is raised instead of an asyncio.CancelledError. The exact exception behavior is being discussed here (so in the future the raised exception in your case may actually change to be aasyncio.CancelledError), but in the meantime you’d need to handle a RequestCancelledError.