I would like to make a request, which starts a process and returns before waiting until the started process is finished. With FastAPI it’s possible to archive this with background tasks:
app = FastAPI()
def long_process(id: str):
time.sleep(10)
print(f"{id} has been processed.")
@app.get("/")
async def test_bgtask(background_tasks: BackgroundTasks):
background_tasks.add_task(long_process, "1234")
return {"response before finish processing"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
If I combine FastAPI with Ray Serve then it didn’t worked. Here the response waits for the background task:
app = FastAPI()
def long_process(id: str):
time.sleep(10)
print(f"{id} has been processed.")
@app.get("/")
async def test_bgtask(background_tasks: BackgroundTasks):
background_tasks.add_task(long_process, "1234")
return {"response before finish processing"}
@serve.deployment(route_prefix="/")
@serve.ingress(app)
class FastAPIWrapper:
pass
serve.run(FastAPIWrapper.bind())
print("send request")
resp = requests.get("http://localhost:8000/")
print(resp)
Any idea how I can archive it?
@Alpe6825 what was the behavior you observed when using the Ray Serve version?
resp will be returned after 10s.
I see, and what if you try:
async def long_process(id: str):
await asyncio.sleep(10)
print(f"{id} has been processed.")
?
(maybe it’s related to the event loop blocking)
nothing changed. resp will be returned after 10s again.
But I found a little workaround when I don’t use ray serve but FastAPI and RayCore. Instead of a FastAPI Background task I use a ray task like this
@ray.remote
def long_process(id: str):
time.sleep(10)
print(f"{id} has been processed.")
@app.get("/")
async def test_bgtask():
long_process.remote("1234")
return {"response before finish processing"}
I found this workaround here.
@eoakes Do you think this workaround has any disadvantages in comparison to ray serve?
Hey guys! I tried to add background tasks to my Ray Serve endpoints, but I followed the official docs of FastAPI and found BackgroundTasks from FastAPI cannot achieve my goal as the add_tasks method is not an async function.
After a bit of workarounds in this topic, the correct answer is to directly use the BackgroundTask or BackgroundTasks from the starlette.background module. Generally, the boilerplate will be like -
import uuid
import ray
from ray.serve.handle import DeploymentHandle
from starlette.background import BackgroundTask
from starlette.responses import JSONResponse
app = FastAPI()
@serve.deployment
@serve.ingress(app)
class YourAppIngress:
def __init__(self, deployment_handler: DeploymentHandle):
self.handler = deployment_handler
@app.post("/schedule")
def schedule(self, path_or_query_or_body_params_as_usual: str):
try:
# Optional session_id, or task_id, etc.
session_id = str(uuid.uuid1())
your_background_task = BackgroundTask(
self.handler.handle_long_running_request.remote,
session_id,
path_or_query_or_body_params_as_usual,
other_params_that_you_need_to_pass_to_your_task_function
)
except Exception as e:
raise HTTPException(status_code=404, detail=f"Failed to schedule due to: {e}") from None
return JSONResponse(
content={
"session_id": session_id,
"message": "Successfully scheduled.",
},
background= your_background_task,
)
Hope it would help those who are struggling to find a solution.
1 Like