Problem with FastAPI's Background Tasks

I would like to make a request, which starts a process and returns before waiting until the started process is finished. With FastAPI it’s possible to archive this with background tasks:

app = FastAPI()

def long_process(id: str):
    time.sleep(10)
    print(f"{id} has been processed.")

@app.get("/")
async def test_bgtask(background_tasks: BackgroundTasks):
    background_tasks.add_task(long_process, "1234")
    return {"response before finish processing"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

If I combine FastAPI with Ray Serve then it didn’t worked. Here the response waits for the background task:

app = FastAPI()

def long_process(id: str):
    time.sleep(10)
    print(f"{id} has been processed.")

@app.get("/")
async def test_bgtask(background_tasks: BackgroundTasks):
    background_tasks.add_task(long_process, "1234")
    return {"response before finish processing"}

@serve.deployment(route_prefix="/")
@serve.ingress(app)
class FastAPIWrapper:
    pass


serve.run(FastAPIWrapper.bind())
print("send request")
resp = requests.get("http://localhost:8000/")
print(resp)

Any idea how I can archive it?

@Alpe6825 what was the behavior you observed when using the Ray Serve version?

resp will be returned after 10s.

I see, and what if you try:

async def long_process(id: str):
    await asyncio.sleep(10)
    print(f"{id} has been processed.")

?

(maybe it’s related to the event loop blocking)

nothing changed. resp will be returned after 10s again.

But I found a little workaround when I don’t use ray serve but FastAPI and RayCore. Instead of a FastAPI Background task I use a ray task like this

@ray.remote
def long_process(id: str):
    time.sleep(10)
    print(f"{id} has been processed.")

@app.get("/")
async def test_bgtask():
    long_process.remote("1234")
    return {"response before finish processing"}

I found this workaround here.

@eoakes Do you think this workaround has any disadvantages in comparison to ray serve?

Hey guys! I tried to add background tasks to my Ray Serve endpoints, but I followed the official docs of FastAPI and found BackgroundTasks from FastAPI cannot achieve my goal as the add_tasks method is not an async function.

After a bit of workarounds in this topic, the correct answer is to directly use the BackgroundTask or BackgroundTasks from the starlette.background module. Generally, the boilerplate will be like -

import uuid

import ray
from ray.serve.handle import DeploymentHandle
from starlette.background import BackgroundTask
from starlette.responses import JSONResponse

app = FastAPI()

@serve.deployment
@serve.ingress(app)
class YourAppIngress:
    def __init__(self, deployment_handler: DeploymentHandle):
        self.handler = deployment_handler
    
    @app.post("/schedule")
    def schedule(self, path_or_query_or_body_params_as_usual: str):
        try:
            # Optional session_id, or task_id, etc.
            session_id = str(uuid.uuid1())
            your_background_task = BackgroundTask(
                self.handler.handle_long_running_request.remote,
                session_id,
                path_or_query_or_body_params_as_usual,
                other_params_that_you_need_to_pass_to_your_task_function
            )
        except Exception as e:
            raise HTTPException(status_code=404, detail=f"Failed to schedule due to: {e}") from None
        return JSONResponse(
            content={
                "session_id": session_id,
                "message": "Successfully scheduled.",
            },
            background= your_background_task,
        )

:slightly_smiling_face: Hope it would help those who are struggling to find a solution.

1 Like