How to check the lengh of queue for each replica of deployment

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hey, I am trying to create an application with Ray Serve and FastAPI that can serve a model with a few replicas to handle requests concurrently. The feature I am trying to implement is to response quickly with 503 http error when the queue of each replica of the deployment is full. Below I included minimal source code of the application where I have deployment with only one replica and max_concurrent_queries set to 1.

Currently, if I send 5 API queries in less than one second, it looks like all of them are queued and processed one by one. I would like to achieve HTTPException(status_code=503, detail="Try later.") response for four last queries since the first query is processed for 10 seconds by only existing replica and there is a limit for concurrent queries set to 1.

Is there a way to check the length of queue for each replica?

import time
from fastapi import FastAPI, HTTPException
from ray import serve

app = FastAPI()

@serve.deployment(
    name="Translate_A2B",
    num_replicas=1,
    ray_actor_options={
        "num_cpus": 1.0,
        "num_gpus": 1.0,
    },
    max_concurrent_queries=1,
)
class Translate_A2B:

    def __init__(self):
        self.mock_text = "Mock translation"

    def translate(self, text):
        time.sleep(10)
        return self.mock_text


@serve.deployment(name="Ingress")
@serve.ingress(app)
class Ingress:
    def __init__(self, model):
        self.model = model.options(use_new_handle_api=True)

    @app.post("/translate")
    async def transcribe(self,
        text: str,
        lang: str = "A2B",
    ):
        if lang != "A2B":
            raise HTTPException(status_code=404, detail="Unknown lang.")
        translated_text= await self.model.translate.remote(text)
        return translated_text


ingress = Ingress.bind(Translate_A2B.bind())

@RetroCymber Not sure how much of internal state we expose for APIs. Asking the serve team.

cc: @Akshay_Malik @Gene Do we expose the current state of replica queues publicly?

1 Like

Yes, there is already a metrics called ray_serve_replica_pending_queries that you can use for this. You can refer to this doc Monitor Your Application — Ray 2.7.1

1 Like

Yes, the dashboard shows that, but I suspect @RetroCymber is asking if it can be queried programmatically via state API: State API — Ray 2.7.1

1 Like

I suppose it can be queried using this API: Monitor Your Application — Ray 2.7.1

Here is one example Ray serve metrics: Monitor Your Application — Ray 2.7.1

Prometheus offers an HTTP api you can read more HTTP API | Prometheus
This is an example of querying the ray_serve_replica_pending_queries metrics given it’s on localhost with default 9090 port.

(ray) gene@geneanyscale2023 Downloads % curl 'http://localhost:9090/api/v1/query?query=ray_serve_replica_pending_queries'
{"status":"success","data":{"resultType":"vector","result":[{"metric":{"Component":"core_worker","NodeAddress":"127.0.0.1","SessionName":"session_2023-10-27_10-16-54_516574_13790","Version":"3.0.0.dev0","WorkerId":"585760360509f884165fbdf64ae9ccf076f127afb9177b48a3a25674","__name__":"ray_serve_replica_pending_queries","application":"app","deployment":"grpc-deployment","instance":"127.0.0.1:63773","job":"ray","replica":"app#grpc-deployment#GoBkks"},"value":[1698427448.652,"0"]},{"metric":{"Component":"core_worker","NodeAddress":"127.0.0.1","SessionName":"session_2023-10-27_10-16-54_516574_13790","Version":"3.0.0.dev0","WorkerId":"a39af3c7c22b893567f5a126cbdd7029c4bc7b7e5620bb5c477fa6e1","__name__":"ray_serve_replica_pending_queries","application":"app1","deployment":"http-deployment","instance":"127.0.0.1:63773","job":"ray","replica":"app1#http-deployment#JcJsfk"},"value":[1698427448.652,"0"]}]}}%
1 Like

Yeah, I wanted to query it via Python API. It seems to me that querying it over Prometheus API is an small overkill and I would need to setup Prometheus just for that. Anyway thanks for good advise. :smiley:

At this moment I solved my problem by creating custom logic that tracks everything I need across the cluster, however it’s probably duplicated work which is just not easily available from Ray’s Python API.

It would be great if one could access current state of the deployment (i.e., number of available replicas, length of queue for each replica, etc.) via object of DeploymentHandle type.