[Nondeterministic] Ray Serve & Fast API Asyncio Pipeline

Hello,

I have this Ray Code:

import ray
import requests
import starlette.requests
import asyncio
import base64
import os
import asyncio
from datetime import datetime
import pytz

from fastapi import FastAPI, UploadFile, File
from ray import serve
from queue import Queue

import hashlib
import time
from pydantic import BaseModel

app = FastAPI()

class AudioWrapper(BaseModel):
    audio: str

@ray.remote
class Step1:
    async def run(self, value):
        sleep_value = 20
        print(f"[STEP 1 - BEFORE - VALUE: {value}] {datetime.now(pytz.utc)} sleep {sleep_value}")
        time.sleep(sleep_value)
        print(f"[STEP 1 - AFTER - VALUE: {value}] {datetime.now(pytz.utc)} sleep {sleep_value}")
        return 2

@ray.remote
class Step2:
    async def run(self, value):
        sleep_value = 10
        print(f"[STEP 2 - BEFORE - VALUE: {value}] {datetime.now(pytz.utc)} sleep {sleep_value}")
        time.sleep(sleep_value)
        print(f"[STEP 2 - AFTER - VALUE: {value}] {datetime.now(pytz.utc)} sleep {sleep_value}")
        return 3
    

@ray.remote
class Step3:
    async def run(self, value):
        sleep_value = 2
        print(f"[STEP 3 - BEFORE - VALUE: {value}] {datetime.now(pytz.utc)} sleep {sleep_value}")
        time.sleep(sleep_value)
        print(f"[STEP 3 - AFTER - VALUE: {value}] {datetime.now(pytz.utc)} sleep {sleep_value}")
        return 3
    

@serve.deployment
@serve.ingress(app)
class MyFastAPIDeployment:
    def __init__(self):
        self.num_values = None
        self.step1 = Step1.remote()
        self.step2 = Step2.remote()
        self.step3 = Step3.remote()

    @app.post("/upload")
    async def root(self, value:int):

        print(f'{datetime.now(pytz.utc)} before try: {value}')
        try:

            print(f'{datetime.now(pytz.utc)} before await x: {value}')
            x = await self.step1.run.remote(value)

            print(f'{datetime.now(pytz.utc)} before await y: {value}')
            y = await self.step2.run.remote(value)

            print(f'{datetime.now(pytz.utc)} before await z: {value}')
            z = await self.step3.run.remote(value)

            return x, y, z
        except Exception as e:
            print(f"Big Exception: {e}")
        finally:
            print(f"{datetime.now(pytz.utc)} Done with: {value}")



my_fast_api = MyFastAPIDeployment.bind()

which I deploy by running:

serve run --route-prefix / file_sha:my_fast_api

where file_sha is the name of the file (file_sha.py) holding the content listed first.

Given upload to endpoint of numbers: 44, 13 and 88, I get the following log messages (sorted after creation date):

('10:56:52.028727', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:56:52.028727+00:00 before try: 44')
('10:56:52.028774', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:56:52.028774+00:00 before await x: 44')
('10:56:52.350736', '(Step1 pid=1026021) [STEP 1 - BEFORE - VALUE: 44] 2025-01-03 10:56:52.350736+00:00 sleep 20')
('10:56:54.660197', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:56:54.660197+00:00 before try: 13')
('10:56:54.660281', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:56:54.660281+00:00 before await x: 13')
('10:56:56.145404', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:56:56.145404+00:00 before try: 88')
('10:56:56.145507', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:56:56.145507+00:00 before await x: 88')
('10:57:12.416653', '(Step1 pid=1026021) [STEP 1 - AFTER - VALUE: 44] 2025-01-03 10:57:12.416653+00:00 sleep 20')
('10:57:12.418429', '(Step1 pid=1026021) [STEP 1 - BEFORE - VALUE: 13] 2025-01-03 10:57:12.418429+00:00 sleep 20')
('10:57:12.419241', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:57:12.419241+00:00 just after step1 for value: 44')
('10:57:12.419404', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:57:12.419404+00:00 before await y: 44')
('10:57:12.492274', '(Step2 pid=1026022) [STEP 2 - BEFORE - VALUE: 44] 2025-01-03 10:57:12.492274+00:00 sleep 10')
('10:57:22.541178', '(Step2 pid=1026022) [STEP 2 - AFTER - VALUE: 44] 2025-01-03 10:57:22.541178+00:00 sleep 10')
('10:57:22.543138', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:57:22.543138+00:00 before await z: 44')
('10:57:22.623055', '(Step3 pid=1026023) [STEP 3 - BEFORE - VALUE: 44] 2025-01-03 10:57:22.623055+00:00 sleep 2')
('10:57:24.634918', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:57:24.634918+00:00 Done with: 44')
('10:57:32.518004', '(Step1 pid=1026021) [STEP 1 - AFTER - VALUE: 13] 2025-01-03 10:57:32.518004+00:00 sleep 20 [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#log-deduplication for more options.)')
('10:57:32.518244', '(Step1 pid=1026021) [STEP 1 - BEFORE - VALUE: 88] 2025-01-03 10:57:32.518244+00:00 sleep 20')
('10:57:52.617986', '(Step1 pid=1026021) [STEP 1 - AFTER - VALUE: 88] 2025-01-03 10:57:52.617986+00:00 sleep 20')
('10:57:52.619809', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:57:52.619809+00:00 before await y: 13')
('10:57:52.620633', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:57:52.620633+00:00 before await y: 88')
('10:57:52.622731', '(Step2 pid=1026022) [STEP 2 - BEFORE - VALUE: 13] 2025-01-03 10:57:52.622731+00:00 sleep 10')
('10:58:02.670690', '(Step2 pid=1026022) [STEP 2 - AFTER - VALUE: 13] 2025-01-03 10:58:02.670690+00:00 sleep 10')
('10:58:02.671625', '(Step2 pid=1026022) [STEP 2 - BEFORE - VALUE: 88] 2025-01-03 10:58:02.671625+00:00 sleep 10')
('10:58:02.672723', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:58:02.672723+00:00 before await z: 13')
('10:58:02.675348', '(Step3 pid=1026023) [STEP 3 - BEFORE - VALUE: 13] 2025-01-03 10:58:02.675348+00:00 sleep 2')
('10:58:04.682785', '(Step3 pid=1026023) [STEP 3 - AFTER - VALUE: 13] 2025-01-03 10:58:04.682785+00:00 sleep 2')
('10:58:04.684515', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:58:04.684515+00:00 Done with: 13')
('10:58:12.720846', '(Step2 pid=1026022) [STEP 2 - AFTER - VALUE: 88] 2025-01-03 10:58:12.720846+00:00 sleep 10')
('10:58:12.722649', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:58:12.722649+00:00 before await z: 88')
('10:58:12.725161', '(Step3 pid=1026023) [STEP 3 - BEFORE - VALUE: 88] 2025-01-03 10:58:12.725161+00:00 sleep 2')
('10:58:14.731471', '(ServeReplica:default:MyFastAPIDeployment pid=1025833) 2025-01-03 10:58:14.731471+00:00 Done with: 88')

For the beginning, the prints look ok.

When step1 finishes execution for 44, step1 starts for 13, and step2 starts for 44.

But for subsequent steps similar pipeline behavior doesn’t occur anymore, e.g. finish of step1 for 13 does not trigger execution for step2 of 13, but only for step1 of 88.

Can anyone find a reason for this?
I’ve been struggling with understanding this for 2 days now!

Hi @rares_petruc what Ray version are you using?

Hi Cindy!

Thanks a lot for reaching out!
My Ray version is 2.32.0.

Sorry for omitting that inside the issue’s description.