1. Severity of the issue: (select one)
Medium: Significantly affects my productivity but can find a workaround.
2. Environment:
- Ray version: 2.49.1
- Python version: 3.10.18
- OS:
rayproject/ray:2.49.1-py310-cpuimage - Cloud/Infrastructure: infra
- Other libs/tools (if relevant):
- --extra-index-url https://download.pytorch.org/whl/cpu
- ray[serve]==2.49.1
- numpy==1.24.2
- torch==2.0.0
- matplotlib==3.7.1
- Pillow==10.0.0
- opencv-python==4.7.0.72
- PyYAML==6.0
- scikit-learn==1.2.2
- scipy==1.10.1
- pydicom==2.3.1
- scikit-image==0.20.0
- requests==2.28.2
- fastapi==0.115.1
- pydantic==1.10.12
- tritonclient[all]==2.31.0
- highdicom==0.21.0
- imagecodecs==2023.3.16
- pylibjpeg[libjpeg,openjpeg,rle]==1.4.0
- python-gdcm==3.0.22
- connected-components-3d===3.12.3
- loguru==0.7.2
- Env vars:
- RAY_metrics_report_interval_ms=1000
- RAY_worker_niceness=10
- RAY_LOG_TO_STDERR=0
- RAY_DISABLE_DOCKER_CPU_WARNING=1
- RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_S=600
- RAY_SERVE_HTTP_REQUEST_MAX_RETRIES=0
- RAY_ROTATION_MAX_BYTES=52428800
- RAY_ROTATION_BACKUP_COUNT=5
- RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS=0
- RAY_SERVE_PROXY_GC_THRESHOLD=1000
- Ray init params:
ray start \
--dashboard-host=0.0.0.0 \
--dashboard-port=8265 \
--min-worker-port 10002 \
--max-worker-port 19999 \
--metrics-export-port=8080 \
--object-store-memory=110453397504 \
--resources='{"actors": 1048576}' \
--system-config '{
"local_fs_capacity_threshold": 0.99,
"kill_idle_workers_interval_ms": 0,
"object_spilling_config":
"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/workspace/Runtime/object-spilling\"}}"
}'
3. What happened vs. what you expected:
- Expected: ray::IDLE processes release memory. It’s ok to have some ::IDLE processes with 200 mb, but some of them after task execution allocate 12-20 GB.
- Actual:ray::IDLE processes reserve memory
1. Increased Processing Time After Upgrade
- Initially, our services ran on Ray Serve 2.5.0. After multiple releases addressed memory leak issues, we decided to upgrade (after testing). We noticed increased processing time, about 15-20% (sometimes it grows 2x under heavy load). We could not reproduce that with our code example, but on test env (not prod) we see that processing time increased in all apps.
- Hardware (server, cores, RAM) and workload remain unchanged.
- We believe this is due to
NumpyStoreor it may be something else: other Ray parameters/constants? - What else might be affecting performance?
2. Memory Leak: ray::IDLE Processes Not Released
- Observations:
- In Grafana and Ray Dashboard,
ray::IDLEprocesses persist in memory and are not terminated. - Potential causes we suspect:
- High network load (inference via Triton)
- Problems with specific library versions (
fastapi,numpy,torch) - Ray parameters/constants
- Our code
- In Grafana and Ray Dashboard,
- What We’ve Tried:
- Processing speed (sequential requests, not load testing) is the same for 2.5.0 and 2.49.1 (tested with the sample script above). We tested only
NumpyStoreversions. - Without
NumpyStorein 2.49.1, processing is faster, likely due to fewer unnecessaryray.get|putcalls andObjectRefpassing without awaiting result where not needed. RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS=0had no effect; memory is not cleared to baseline levels. The setup was: run warmup requests, then run the heavy load (send request each n seconds).- Adding
gc.collect()in modules and remote functions did not help. - Setting
"idle_worker_killing_time_threshold_ms": 10000and"kill_idle_workers_interval_ms": 10000 or 0did not removeray::IDLEprocesses. - Lowering
RAY_memory_usage_thresholdto clean upray::IDLEis not feasible, as it may trigger during peak load, causing500errors.
- Processing speed (sequential requests, not load testing) is the same for 2.5.0 and 2.49.1 (tested with the sample script above). We tested only
In production we see the following:
Our production deployments are also leaking, but not as much as ray::IDLE ( i wasn’t able to reproduce that with example script, as we dont have triton inference here)
Request for Help
We seek advice from developers and contributors:
- Where should we look to diagnose the memory leaks? Or it is unfixable, by design, so those workers could be killed only when
RAY_memory_usage_thresholdis more than threshold. - Could network load (Triton requests|responses) or specific library versions be contributing?
- Only NumpyStore makes code slower or something else could do also?
- Any recommendations for where to look?
Thank you for your help!
Example script
import ray
from ray import serve
from fastapi import FastAPI, Request
from ray.serve.handle import DeploymentHandle
from typing import Dict, Any, Tuple
import numpy as np
import logging
import cc3d
from time import time
from ray import ObjectRef
import torch
from PIL import Image, ImageDraw, ImageFont
import cv2
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
DEPLOYMENT_PARAMS = {
"max_ongoing_requests": 5,
"num_replicas": 1,
"ray_actor_options": {"num_cpus": 0, "resources": {"actors": 1}},
"health_check_period_s": 10,
"health_check_timeout_s": 30.0,
"graceful_shutdown_wait_loop_s": 2.0,
"graceful_shutdown_timeout_s": 600.0,
}
app = FastAPI()
# ray.2.49.1
ListOfTupleSlices = list[tuple[Tuple[int, int], tuple[int, int], tuple[int, int]]]
# ===========Plasma Store===========
@ray.remote
class PlasmaStoreActor:
def __init__(self):
self.alive = True
def ensure_alive(self) -> bool:
return self.alive
class NumpyStore:
def __init__(self, data: np.ndarray):
if not isinstance(data, np.ndarray):
raise TypeError(type(data))
self.shape = data.shape
self.size = data.size
self.ndim = data.ndim
self.dtype = data.dtype
self.length = len(data)
actor = ray.get_actor(name="PlasmaStoreActor", namespace="PlasmaStore")
self.data_ref = [ray.put(data, _owner=actor)]
def retrieve(self) -> Any:
return ray.get(self.data_ref[0])
def __len__(self) -> int:
return self.length
def create_plasma_store_actor() -> None:
actor = PlasmaStoreActor.options(
name="PlasmaStoreActor",
namespace="PlasmaStore",
lifetime="detached",
get_if_exists=True,
num_cpus=0,
).remote()
ray.get(actor.ensure_alive.remote())
@ray.remote
def generate_array(size: Tuple[int, int, int], store: bool = False) -> np.ndarray:
image = np.random.random(size) * 10
if store:
return NumpyStore(image)
return image
@ray.remote
def find_cc(arr: np.ndarray | NumpyStore, store: bool = False) -> np.ndarray:
if store:
res = NumpyStore(cc3d.connected_components(arr.retrieve().astype("int")))
return res
res = cc3d.connected_components(arr.astype("int"))
return res
@ray.remote
def iter_through_cc(cc: np.ndarray | NumpyStore) -> list[int]:
if isinstance(cc, NumpyStore):
cc = cc.retrieve()
cc = cc.astype("int")
sums = []
for i in range(1, 11):
cc_i = cc3d.connected_components(cc == i)
sums.append(cc_i.sum())
return sums
@ray.remote
def do_argmax(arr: np.ndarray | NumpyStore) -> np.ndarray | NumpyStore:
is_store = isinstance(arr, NumpyStore)
if is_store:
arr = arr.retrieve()
result = arr.argmax(axis=0, keepdims=True).astype("uint8")
return NumpyStore(result) if is_store else result
@ray.remote
def do_sum(
arr1: np.ndarray | NumpyStore, arr2: np.ndarray | NumpyStore, argmax: np.ndarray | NumpyStore, resampled: np.ndarray | NumpyStore
) -> np.ndarray | NumpyStore:
is_store1 = isinstance(arr1, NumpyStore)
is_store2 = isinstance(arr2, NumpyStore)
is_store3 = isinstance(argmax, NumpyStore)
is_store4 = isinstance(resampled, NumpyStore)
if is_store1:
arr1 = arr1.retrieve()
if is_store2:
arr2 = arr2.retrieve()
if is_store3:
argmax = argmax.retrieve()
if is_store4:
resampled = resampled.retrieve()
result = arr1 + arr2 + argmax.max() + resampled.max()
return NumpyStore(result) if (is_store1 or is_store2) else result
@ray.remote
def do_resample(arr: np.ndarray | NumpyStore) -> np.ndarray | NumpyStore:
# return arr
is_store = isinstance(arr, NumpyStore)
if is_store:
arr = arr.retrieve()
if arr.ndim != 3:
raise ValueError(f"Array must be 3D, got {arr.ndim}D")
src_dtype = arr.dtype
arr = torch.from_numpy(arr.copy()).float()[None, None]
arr = torch.nn.functional.interpolate(arr, scale_factor=0.5, mode="trilinear").numpy().astype(src_dtype)[0, 0]
return NumpyStore(arr) if is_store else arr
@ray.remote
def do_draw(arr: np.ndarray | NumpyStore) -> np.ndarray | NumpyStore:
is_store = isinstance(arr, NumpyStore)
if is_store:
arr = arr.retrieve()
if arr.ndim != 3:
raise ValueError(f"Array must be 3D, got {arr.ndim}D")
binary = (arr > arr.mean()).astype(np.uint8)
result = np.zeros_like(arr, dtype=np.uint8)
for i in range(len(arr)):
arr_slice = binary[i]
image_slice = Image.fromarray(arr[i])
image_slice_draw = ImageDraw.Draw(image_slice)
contours, _ = cv2.findContours(arr_slice, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
for contour in contours:
if len(contour) < 3:
continue
contour = contour.reshape((-1, 2)).tolist()
contour = [tuple(x) for x in contour]
image_slice_draw.polygon(contour, outline=255, width=2)
result[i] = np.array(image_slice)
return NumpyStore(result) if is_store else result
# ===========Processing Modules===========
@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule1:
async def __call__(self, shape: Tuple[int, int, int]) -> tuple[ObjectRef, ObjectRef]:
array_ref: ObjectRef = generate_array.remote(shape)
cc_ref: ObjectRef = find_cc.remote(array_ref)
return array_ref, cc_ref
@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule1WithStore:
async def __call__(self, shape: Tuple[int, int, int]) -> tuple[NumpyStore, NumpyStore]:
# await because of NumpyStore
image: NumpyStore = await generate_array.remote(shape, store=True)
cc: NumpyStore = await find_cc.remote(image, store=True)
cc.retrieve()
image.retrieve()
return image, cc
@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule2:
async def __call__(self, image: np.ndarray) -> tuple[ObjectRef, ObjectRef]:
resampled: ObjectRef = do_resample.remote(image)
amax: ObjectRef = do_argmax.remote(resampled)
draw: ObjectRef = do_draw.remote(image)
return amax, resampled
@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule2WithStore:
async def __call__(self, image: NumpyStore) -> tuple[NumpyStore, NumpyStore]:
resampled: NumpyStore = await do_resample.remote(image)
amax: NumpyStore = await do_argmax.remote(resampled)
amax.retrieve()
resampled.retrieve()
return amax, resampled
@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule4:
async def __call__(self, core_output: dict) -> ObjectRef:
logger.info("ProcessorModule4: Combining images.")
output = do_sum.remote(core_output["image"], core_output["cc"], core_output["argmax"], core_output["resampled"])
draw = do_draw.remote(output)
return draw
@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule4WithStore:
async def __call__(self, core_output: dict) -> NumpyStore:
logger.info("ProcessorModule4WithStore: Combining images.")
image_store = core_output["image_store"]
cc_store = core_output["cc_store"]
resampled_store = core_output["resampled_store"]
argmax_store = core_output["argmax_store"]
output = await do_sum.remote(image_store, cc_store, argmax_store, resampled_store)
draw = await do_draw.remote(output)
return draw
# ===========Core Module===========
@serve.deployment(**DEPLOYMENT_PARAMS)
class CoreModule:
def __init__(
self,
processor_module1: DeploymentHandle,
processor_module2: DeploymentHandle,
):
self.processor_module1 = processor_module1
self.processor_module2 = processor_module2
async def __call__(self, shape: Tuple[int, int, int]) -> dict:
image, cc = await self.processor_module1.remote(shape)
argmax, resampled = await self.processor_module2.remote(image)
result = {
"image": image,
"argmax": argmax,
"resampled": resampled,
"cc": cc,
}
t = {k: type(v) for k, v in result.items()}
logger.error(f"CoreModule: Result: {t}")
return result
@serve.deployment(**DEPLOYMENT_PARAMS)
class CoreModuleWithStore:
def __init__(
self,
processor_module1: DeploymentHandle,
processor_module2: DeploymentHandle,
):
self.processor_module1 = processor_module1
self.processor_module2 = processor_module2
async def __call__(self, shape: Tuple[int, int, int]) -> dict:
image_store, cc_store = await self.processor_module1.remote(shape)
argmax_store, resampled_store = await self.processor_module2.remote(image_store)
result = {
"image_store": image_store,
"cc_store": cc_store,
"argmax_store": argmax_store,
"resampled_store": resampled_store,
}
t = {k: type(v) for k, v in result.items()}
logger.error(f"CoreModuleWithStore: Result: {t}")
return result
# ===========Ingress Module===========
@serve.deployment(**DEPLOYMENT_PARAMS)
@serve.ingress(app)
class IngressModule:
shape = (1024, 512, 512)
def __init__(
self,
core_module: DeploymentHandle,
processor_module4: DeploymentHandle,
core_module_with_store: DeploymentHandle,
processor_module4_with_store: DeploymentHandle,
):
self.core_module = core_module
self.processor_module4 = processor_module4
self.core_module_with_store = core_module_with_store
self.processor_module4_with_store = processor_module4_with_store
create_plasma_store_actor()
@app.post("/process")
async def process(self, request: Request) -> Dict[str, Any]:
logger.info("IngressModule: Received request.")
start = time()
request_body = await request.json()
core_output: ObjectRef = await self.core_module.remote(self.shape)
output: ObjectRef = await self.processor_module4.remote(core_output)
res: np.ndarray = ray.get(output)
res: float = float(res.max())
logger.info(f"ProcessorModule4: Output: {type(output)}")
logger.info("IngressModule: Processing complete.")
end = time()
return {"max": res, "duration": end - start}
@app.post("/process_with_store")
async def process_with_store(self, request: Request) -> Dict[str, Any]:
logger.info("IngressModule: Received request for processing with store.")
start = time()
request_body = await request.json()
core_output = await self.core_module_with_store.remote(self.shape)
output = await self.processor_module4_with_store.remote(core_output)
logger.info("IngressModule: Processing with store complete.")
res = float(output.retrieve().max())
end = time()
return {"max": res, "duration": end - start}
core_module = CoreModule.bind(
processor_module1=ProcessorModule1.bind(),
processor_module2=ProcessorModule2.bind(),
)
core_module_with_store = CoreModuleWithStore.bind(
processor_module1=ProcessorModule1WithStore.bind(),
processor_module2=ProcessorModule2WithStore.bind(),
)
app_graph = IngressModule.bind(
core_module=core_module,
processor_module4=ProcessorModule4.bind(),
core_module_with_store=core_module_with_store,
processor_module4_with_store=ProcessorModule4WithStore.bind(),
)
Load test script
#!/usr/bin/env python3
# send_requests.py
"""Send multiple requests to the Ray Serve FastAPI ingress with async pattern."""
import argparse
import asyncio
import json
import sys
from time import time
from typing import Any, Dict, List, Tuple
import aiohttp
DEFAULT_URL = "http://localhost:8000/process"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Send test payloads to Ray Serve ingress.")
parser.add_argument("--test", default=0, type=int, choices=[0, 1, 2], help="Value for the 'test' field in request body")
parser.add_argument("--url", default=DEFAULT_URL, help=f"Ingress URL (default: {DEFAULT_URL})")
parser.add_argument("--timeout", type=float, default=600.0, help="HTTP request timeout in seconds")
parser.add_argument("-n", "--num-requests", type=int, default=1, help="Number of requests to send")
parser.add_argument("-s", "--interval", type=float, default=0.0, help="Sleep interval between request dispatches in seconds")
return parser.parse_args()
async def send_request(session: aiohttp.ClientSession, url: str, payload: Dict[str, Any], request_id: int) -> Tuple[int, Dict[str, Any], float]:
"""Send a single request and return the response data and timing."""
req_start = time()
try:
async with session.post(url, json=payload) as response:
response.raise_for_status()
data = await response.json()
req_end = time()
return (request_id, data, req_end - req_start)
except aiohttp.ClientError as exc:
print(f"Request {request_id} failed: {exc}", file=sys.stderr)
return (request_id, {"error": str(exc)}, time() - req_start)
except json.JSONDecodeError:
print(f"Request {request_id} returned invalid JSON", file=sys.stderr)
return (request_id, {"error": "Invalid JSON response"}, time() - req_start)
async def main_async(args: argparse.Namespace) -> None:
payload: Dict[str, Any] = {"test": args.test}
overall_start = time()
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=args.timeout)) as session:
tasks: List[asyncio.Task] = []
print(f"Dispatching {args.num_requests} request(s) with {args.interval}s interval...")
for i in range(args.num_requests):
task = asyncio.create_task(send_request(session, args.url, payload, i + 1))
tasks.append(task)
if i < args.num_requests - 1: # Don't sleep after the last task
await asyncio.sleep(args.interval)
print(f"All requests dispatched. Waiting for responses...")
results = await asyncio.gather(*tasks)
overall_end = time()
# Print results
print(f"\n{'='*60}")
print(f"Total time (dispatch + all responses): {overall_end - overall_start:.3f} seconds")
print(f"{'='*60}\n")
for request_id, data, duration in results:
print(f"Request {request_id} (took {duration:.3f}s):")
print(json.dumps(data, indent=2))
print()
def main() -> None:
args = parse_args()
asyncio.run(main_async(args))
if __name__ == "__main__":
main()
Warmup
python3 send_requests.py --url http://localhost:8000/process_with_store -n 30 -s 20
Heavy Load
python3 send_requests.py --url http://localhost:8000/process_with_store -n 10 -s 5
As you can see memory usage after heavy load gets higher and not release to previous levels.
In ray dashboard after the requests (and processing) are done we can see some ray::IDLE processes still consuming memory
