DAGDriver memory increasing for basic DAGs

Hello Team,

I am testing out DAG driver for creating parallel processing branches for one of our use-case.
The use-case is as follows:

  1. Base64 image comes as request
  2. Image is converted to proper input for model
  3. External api call (Azure ocr api) is performed in one deployment
  4. Object detector model (Detectron) is called in another deployment.

I have implemented the functionality using deployments and DAGdriver. Although functionally it has been working as expected. I was seeing a constant memory increase for continuous requests.

In order to investigate properly, I converted my processing deployments (AzureAPI call and Detectron model inference) into simple functions which just return a dictionary or arrays.

DetectronDeployment

@serve.deployment()
class DetectronDeployment:
    def __init__(self):
        self.doc_detector = ObjDetector(MODEL_CFG)
        self.doc_processor = DocDetector(640, 640)


    async def detect_doc(self, file_bytes):
        start_time = time.time()
        image_np = np.frombuffer(file_bytes, np.uint8)
        img = cv2.imdecode(image_np, flags=1)

        try:
            img_scaler = self.doc_processor.standardise_img(img)
            detections = self.doc_detector.get_pred(self.doc_processor.final_img)

        except Exception as ex:
            log.error(ex)
            log.error(traceback.print_exc())

        filter_detections = []
        for i in range(len(detections)):
            filter_detections.append(
                {
                    "doc_type": self.doc_detector.classes[
                        detections.pred_classes[i].item()
                    ],
                    "confidence": detections.scores[i].item(),
                    "bbox": self.doc_processor.process_bbox(detections.pred_boxes[i]),
                }
            )
        log.info("Time taken for detections :: {}".format(time.time() - start_time))

        scale_params = self.doc_processor.export_params()

        return json.dumps([scale_params, filter_detections])

    async def forward(self,uploaded_file):
        start_time = time.time()
        resp = {
            "detection values":[1,2,3,4,5]
        }
        # resp = await self.detect_doc(uploaded_file)
        log.info("Time taken to analyze image :: {}".format(time.time() - start_time))
        return resp


detectron_deployment = DetectronDeployment.bind()

AzureDeployment

@serve.deployment()
class AzureDeployment:
    def __init__(self):
        self.ocr_api = AzureAPI(os.environ.get("SUBKEY"), os.environ.get("AZURE_ENDPOINT"))

    def forward(self,uploaded_file):
        start_time = time.time()
        time.sleep(0.2)
        # resp = self.ocr_api.analyze_image(uploaded_file)
        resp = {
                "status": "success",
                "status_code": 200,
                "result": {
                    "extracted_data": {
                        "aadhaarNumber": "567420078076",
                        "dob": " 08/01/1991",
                        "gender": "Male",
                        "name": "Purushothama",
                        "fatherName": "NA",
                        "address": "NA",
                        "docType": "aadhar front half"
                    },
                    "image_quality": {
                        "blur": 0,
                        "res": 0.05
                    }
                },
                "version": "1.2"
            }
        log.info("Time taken to analyze image :: {}".format(time.time() - start_time))
        return resp

azure_deployment = AzureDeployment.bind()

As you can see both the deployments contain a forward function which basically does nothing more than returning an array and a json respectively

My DAGDriver graph is defined in this way

with InputNode() as input_file:
    print(f"input file is :: {input_file}")
    health_response = health_deployment.forward.bind()
    dag = aggregate.bind(azure_deployment.forward.bind(input_file),detectron_deployment.forward.bind(input_file))

serve_dag = DAGDriver.options(num_replicas=1).bind({
    "/process":dag,
    "/health": health_response
})

Here is health_deployment is →

@serve.deployment()
class HealthDeployment():
    async def forward(self) -> str:
        return "Server is healthy"

If you look at the screenshots below, you will find that that though the object store memory and the memory utilised per Deployment is constant, but still the DAGDriver replica linearly increases with the requests coming in

Timestamp 1

Timestamp 2

Timestamp 3

Timestamp 4

Kindly have a look at this and let me know if I am doing something wrong here with my deployments and request flow

Hi @AbhishekBose, would you mind share which Ray version are you using? And additionally, what does the request traffic look like? Do you have a continuous stream of queries keep piling up to be processed? One potential cause of memory increase is the requests being queued up in memory in the DAGDriver process. We would also appreciate a reproduction script that is minimal so we can help diagnose and debug as well!

@simon-mo
Ray-version → 2.2.0
python version → 3.8.16

The test was done using locust. So the program kept sending http requests consisting of base64 image string to the ray sever.

Here is the testing code as well:

from locust import HttpUser, task, constant
import json
import random
import os

URL = "http://0.0.0.0:8000/process"

class RayServeTest(HttpUser):
    # wait_time is compulsory, even if its 0,
    wait_time = constant(0)

    @task
    def hit1(self):
        self.client.post(url=URL,json={
            "image":"base_64_string"
            }

Here is a public git repo which you guys try out.

docker compose up

should start the server.

@simon-mo Kindly have a look at the repo and suggest what could be done about this. Are we doing something wrong on our end or is the issue related to the package itself?

I can reproduce this in Ray 2.2.0 (as well as Ray 2.0.1, 2.1.0)

Fix PR is here: [Serve] Address incremental memory leak due to _PyObjScanner by simon-mo · Pull Request #31317 · ray-project/ray · GitHub

Once the PR is merged, you can install the Ray nightly wheel to confirm. Installing Ray — Ray 3.0.0.dev0

Edit: PR merged!