Memory not released to default levels: `ray::IDLE` Processes Not Released**

1. Severity of the issue: (select one)
Medium: Significantly affects my productivity but can find a workaround.

2. Environment:

  • Ray version: 2.49.1
  • Python version: 3.10.18
  • OS: rayproject/ray:2.49.1-py310-cpu image
  • Cloud/Infrastructure: infra
  • Other libs/tools (if relevant):
- --extra-index-url https://download.pytorch.org/whl/cpu
- ray[serve]==2.49.1
- numpy==1.24.2
- torch==2.0.0
- matplotlib==3.7.1
- Pillow==10.0.0
- opencv-python==4.7.0.72
- PyYAML==6.0
- scikit-learn==1.2.2
- scipy==1.10.1
- pydicom==2.3.1
- scikit-image==0.20.0
- requests==2.28.2
- fastapi==0.115.1
- pydantic==1.10.12
- tritonclient[all]==2.31.0
- highdicom==0.21.0
- imagecodecs==2023.3.16
- pylibjpeg[libjpeg,openjpeg,rle]==1.4.0
- python-gdcm==3.0.22
- connected-components-3d===3.12.3
- loguru==0.7.2
  • Env vars:
- RAY_metrics_report_interval_ms=1000
- RAY_worker_niceness=10
- RAY_LOG_TO_STDERR=0
- RAY_DISABLE_DOCKER_CPU_WARNING=1
- RAY_SERVE_REQUEST_PROCESSING_TIMEOUT_S=600
- RAY_SERVE_HTTP_REQUEST_MAX_RETRIES=0
- RAY_ROTATION_MAX_BYTES=52428800
- RAY_ROTATION_BACKUP_COUNT=5
- RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS=0
- RAY_SERVE_PROXY_GC_THRESHOLD=1000
  • Ray init params:
ray start \
  --dashboard-host=0.0.0.0 \
  --dashboard-port=8265 \
  --min-worker-port 10002 \
  --max-worker-port 19999 \
  --metrics-export-port=8080 \
  --object-store-memory=110453397504 \
  --resources='{"actors": 1048576}' \
  --system-config '{
      "local_fs_capacity_threshold": 0.99,
      "kill_idle_workers_interval_ms": 0,
      "object_spilling_config":
          "{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/workspace/Runtime/object-spilling\"}}"
      }'

3. What happened vs. what you expected:

  • Expected: ray::IDLE processes release memory. It’s ok to have some ::IDLE processes with 200 mb, but some of them after task execution allocate 12-20 GB.
  • Actual:ray::IDLE processes reserve memory

1. Increased Processing Time After Upgrade

  • Initially, our services ran on Ray Serve 2.5.0. After multiple releases addressed memory leak issues, we decided to upgrade (after testing). We noticed increased processing time, about 15-20% (sometimes it grows 2x under heavy load). We could not reproduce that with our code example, but on test env (not prod) we see that processing time increased in all apps.
    • Hardware (server, cores, RAM) and workload remain unchanged.
    • We believe this is due to NumpyStore or it may be something else: other Ray parameters/constants?
    • What else might be affecting performance?

2. Memory Leak: ray::IDLE Processes Not Released

  • Observations:
    • In Grafana and Ray Dashboard, ray::IDLE processes persist in memory and are not terminated.
    • Potential causes we suspect:
      • High network load (inference via Triton)
      • Problems with specific library versions (fastapi, numpy, torch)
      • Ray parameters/constants
      • Our code
  • What We’ve Tried:
    • Processing speed (sequential requests, not load testing) is the same for 2.5.0 and 2.49.1 (tested with the sample script above). We tested only NumpyStore versions.
    • Without NumpyStore in 2.49.1, processing is faster, likely due to fewer unnecessary ray.get|put calls and ObjectRef passing without awaiting result where not needed.
    • RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS=0 had no effect; memory is not cleared to baseline levels. The setup was: run warmup requests, then run the heavy load (send request each n seconds).
    • Adding gc.collect() in modules and remote functions did not help.
    • Setting "idle_worker_killing_time_threshold_ms": 10000 and "kill_idle_workers_interval_ms": 10000 or 0 did not remove ray::IDLE processes.
    • Lowering RAY_memory_usage_threshold to clean up ray::IDLE is not feasible, as it may trigger during peak load, causing 500 errors.

In production we see the following:


Our production deployments are also leaking, but not as much as ray::IDLE ( i wasn’t able to reproduce that with example script, as we dont have triton inference here)


Request for Help

We seek advice from developers and contributors:

  • Where should we look to diagnose the memory leaks? Or it is unfixable, by design, so those workers could be killed only when RAY_memory_usage_threshold is more than threshold.
  • Could network load (Triton requests|responses) or specific library versions be contributing?
  • Only NumpyStore makes code slower or something else could do also?
  • Any recommendations for where to look?

Thank you for your help!

Example script

import ray
from ray import serve
from fastapi import FastAPI, Request
from ray.serve.handle import DeploymentHandle
from typing import Dict, Any, Tuple
import numpy as np
import logging
import cc3d
from time import time
from ray import ObjectRef
import torch
from PIL import Image, ImageDraw, ImageFont
import cv2


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


DEPLOYMENT_PARAMS = {
    "max_ongoing_requests": 5,
    "num_replicas": 1,
    "ray_actor_options": {"num_cpus": 0, "resources": {"actors": 1}},
    "health_check_period_s": 10,
    "health_check_timeout_s": 30.0,
    "graceful_shutdown_wait_loop_s": 2.0,
    "graceful_shutdown_timeout_s": 600.0,
}


app = FastAPI()

# ray.2.49.1

ListOfTupleSlices = list[tuple[Tuple[int, int], tuple[int, int], tuple[int, int]]]



# ===========Plasma Store===========


@ray.remote
class PlasmaStoreActor:
    def __init__(self):
        self.alive = True

    def ensure_alive(self) -> bool:
        return self.alive


class NumpyStore:
    def __init__(self, data: np.ndarray):
        if not isinstance(data, np.ndarray):
            raise TypeError(type(data))
        self.shape = data.shape
        self.size = data.size
        self.ndim = data.ndim
        self.dtype = data.dtype
        self.length = len(data)
        actor = ray.get_actor(name="PlasmaStoreActor", namespace="PlasmaStore")
        self.data_ref = [ray.put(data, _owner=actor)]

    def retrieve(self) -> Any:
        return ray.get(self.data_ref[0])

    def __len__(self) -> int:
        return self.length


def create_plasma_store_actor() -> None:
    actor = PlasmaStoreActor.options(
        name="PlasmaStoreActor",
        namespace="PlasmaStore",
        lifetime="detached",
        get_if_exists=True,
        num_cpus=0,
    ).remote()
    ray.get(actor.ensure_alive.remote())


@ray.remote
def generate_array(size: Tuple[int, int, int], store: bool = False) -> np.ndarray:
    image = np.random.random(size) * 10
    if store:
        return NumpyStore(image)
    return image


@ray.remote
def find_cc(arr: np.ndarray | NumpyStore, store: bool = False) -> np.ndarray:
    if store:
        res = NumpyStore(cc3d.connected_components(arr.retrieve().astype("int")))
        return res
    res = cc3d.connected_components(arr.astype("int"))
    return res


@ray.remote
def iter_through_cc(cc: np.ndarray | NumpyStore) -> list[int]:
    if isinstance(cc, NumpyStore):
        cc = cc.retrieve()
    cc = cc.astype("int")
    sums = []
    for i in range(1, 11):
        cc_i = cc3d.connected_components(cc == i)
        sums.append(cc_i.sum())
    return sums


@ray.remote
def do_argmax(arr: np.ndarray | NumpyStore) -> np.ndarray | NumpyStore:
    is_store = isinstance(arr, NumpyStore)
    if is_store:
        arr = arr.retrieve()
    result = arr.argmax(axis=0, keepdims=True).astype("uint8")
    return NumpyStore(result) if is_store else result

@ray.remote
def do_sum(
    arr1: np.ndarray | NumpyStore, arr2: np.ndarray | NumpyStore, argmax: np.ndarray | NumpyStore, resampled: np.ndarray | NumpyStore
) -> np.ndarray | NumpyStore:
    is_store1 = isinstance(arr1, NumpyStore)
    is_store2 = isinstance(arr2, NumpyStore)
    is_store3 = isinstance(argmax, NumpyStore)
    is_store4 = isinstance(resampled, NumpyStore)
    if is_store1:
        arr1 = arr1.retrieve()
    if is_store2:
        arr2 = arr2.retrieve()
    if is_store3:
        argmax = argmax.retrieve()
    if is_store4:
        resampled = resampled.retrieve()
    result = arr1 + arr2 + argmax.max() + resampled.max()
    return NumpyStore(result) if (is_store1 or is_store2) else result


@ray.remote
def do_resample(arr: np.ndarray | NumpyStore) -> np.ndarray | NumpyStore:
    # return arr
    is_store = isinstance(arr, NumpyStore)
    if is_store:
        arr = arr.retrieve()
    if arr.ndim != 3:
        raise ValueError(f"Array must be 3D, got {arr.ndim}D")
    src_dtype = arr.dtype
    arr = torch.from_numpy(arr.copy()).float()[None, None]
    arr = torch.nn.functional.interpolate(arr, scale_factor=0.5, mode="trilinear").numpy().astype(src_dtype)[0, 0]
    return NumpyStore(arr) if is_store else arr

@ray.remote
def do_draw(arr: np.ndarray | NumpyStore) -> np.ndarray | NumpyStore:
    is_store = isinstance(arr, NumpyStore)
    if is_store:
        arr = arr.retrieve()
    if arr.ndim != 3:
        raise ValueError(f"Array must be 3D, got {arr.ndim}D")
    binary = (arr > arr.mean()).astype(np.uint8)
    result = np.zeros_like(arr, dtype=np.uint8)
    for i in range(len(arr)):
        arr_slice = binary[i]
        image_slice = Image.fromarray(arr[i])
        image_slice_draw = ImageDraw.Draw(image_slice)
        contours, _ = cv2.findContours(arr_slice, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        for contour in contours:
            if len(contour) < 3:
                continue
            contour = contour.reshape((-1, 2)).tolist()
            contour = [tuple(x) for x in contour]
            image_slice_draw.polygon(contour, outline=255, width=2)
        result[i] = np.array(image_slice)
    return NumpyStore(result) if is_store else result



# ===========Processing Modules===========


@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule1:
    async def __call__(self, shape: Tuple[int, int, int]) -> tuple[ObjectRef, ObjectRef]:
        array_ref: ObjectRef = generate_array.remote(shape)
        cc_ref: ObjectRef = find_cc.remote(array_ref)
        return array_ref, cc_ref


@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule1WithStore:
    async def __call__(self, shape: Tuple[int, int, int]) -> tuple[NumpyStore, NumpyStore]:
        # await because of NumpyStore
        image: NumpyStore = await generate_array.remote(shape, store=True)
        cc: NumpyStore = await find_cc.remote(image, store=True)
        cc.retrieve()
        image.retrieve()
        return image, cc


@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule2:
    async def __call__(self, image: np.ndarray) -> tuple[ObjectRef, ObjectRef]:
        resampled: ObjectRef = do_resample.remote(image)
        amax: ObjectRef = do_argmax.remote(resampled)
        draw: ObjectRef = do_draw.remote(image)
        return amax, resampled


@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule2WithStore:
    async def __call__(self, image: NumpyStore) -> tuple[NumpyStore, NumpyStore]:
        resampled: NumpyStore = await do_resample.remote(image)
        amax: NumpyStore = await do_argmax.remote(resampled)
        amax.retrieve()
        resampled.retrieve()
        return amax, resampled


@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule4:
    async def __call__(self, core_output: dict) -> ObjectRef:
        logger.info("ProcessorModule4: Combining images.")
        output = do_sum.remote(core_output["image"], core_output["cc"], core_output["argmax"], core_output["resampled"])
        draw = do_draw.remote(output)
        return draw


@serve.deployment(**DEPLOYMENT_PARAMS)
class ProcessorModule4WithStore:
    async def __call__(self, core_output: dict) -> NumpyStore:
        logger.info("ProcessorModule4WithStore: Combining images.")
        image_store = core_output["image_store"]
        cc_store = core_output["cc_store"]
        resampled_store = core_output["resampled_store"]
        argmax_store = core_output["argmax_store"]
        output = await do_sum.remote(image_store, cc_store, argmax_store, resampled_store)
        draw = await do_draw.remote(output)
        return draw


# ===========Core Module===========


@serve.deployment(**DEPLOYMENT_PARAMS)
class CoreModule:
    def __init__(
        self,
        processor_module1: DeploymentHandle,
        processor_module2: DeploymentHandle,
    ):
        self.processor_module1 = processor_module1
        self.processor_module2 = processor_module2

    async def __call__(self, shape: Tuple[int, int, int]) -> dict:
        image, cc = await self.processor_module1.remote(shape)
        argmax, resampled = await self.processor_module2.remote(image)
        result = {
            "image": image,
            "argmax": argmax,
            "resampled": resampled,
            "cc": cc,
        }
        t = {k: type(v) for k, v in result.items()}
        logger.error(f"CoreModule: Result: {t}")
        return result


@serve.deployment(**DEPLOYMENT_PARAMS)
class CoreModuleWithStore:
    def __init__(
        self,
        processor_module1: DeploymentHandle,
        processor_module2: DeploymentHandle,
    ):
        self.processor_module1 = processor_module1
        self.processor_module2 = processor_module2

    async def __call__(self, shape: Tuple[int, int, int]) -> dict:
        image_store, cc_store = await self.processor_module1.remote(shape)
        argmax_store, resampled_store = await self.processor_module2.remote(image_store)
        result  = {
            "image_store": image_store,
            "cc_store": cc_store,
            "argmax_store": argmax_store,
            "resampled_store": resampled_store,
        }
        t = {k: type(v) for k, v in result.items()}
        logger.error(f"CoreModuleWithStore: Result: {t}")
        return result


# ===========Ingress Module===========


@serve.deployment(**DEPLOYMENT_PARAMS)
@serve.ingress(app)
class IngressModule:
    shape = (1024, 512, 512)

    def __init__(
        self,
        core_module: DeploymentHandle,
        processor_module4: DeploymentHandle,
        core_module_with_store: DeploymentHandle,
        processor_module4_with_store: DeploymentHandle,
    ):
        self.core_module = core_module
        self.processor_module4 = processor_module4
        self.core_module_with_store = core_module_with_store
        self.processor_module4_with_store = processor_module4_with_store
        create_plasma_store_actor()

    @app.post("/process")
    async def process(self, request: Request) -> Dict[str, Any]:
        logger.info("IngressModule: Received request.")
        start = time()
        request_body = await request.json()
        core_output: ObjectRef = await self.core_module.remote(self.shape)
        output: ObjectRef = await self.processor_module4.remote(core_output)
        res: np.ndarray = ray.get(output)
        res: float = float(res.max())
        logger.info(f"ProcessorModule4: Output: {type(output)}")
        logger.info("IngressModule: Processing complete.")
        end = time()
        return {"max": res, "duration": end - start}

    @app.post("/process_with_store")
    async def process_with_store(self, request: Request) -> Dict[str, Any]:
        logger.info("IngressModule: Received request for processing with store.")
        start = time()
        request_body = await request.json()
        core_output = await self.core_module_with_store.remote(self.shape)
        output = await self.processor_module4_with_store.remote(core_output)
        logger.info("IngressModule: Processing with store complete.")
        res = float(output.retrieve().max())
        end = time()
        return {"max": res, "duration": end - start}


core_module = CoreModule.bind(
    processor_module1=ProcessorModule1.bind(),
    processor_module2=ProcessorModule2.bind(),
)
core_module_with_store = CoreModuleWithStore.bind(
    processor_module1=ProcessorModule1WithStore.bind(),
    processor_module2=ProcessorModule2WithStore.bind(),
)
app_graph = IngressModule.bind(
    core_module=core_module,
    processor_module4=ProcessorModule4.bind(),
    core_module_with_store=core_module_with_store,
    processor_module4_with_store=ProcessorModule4WithStore.bind(),
)

Load test script

#!/usr/bin/env python3

# send_requests.py
"""Send multiple requests to the Ray Serve FastAPI ingress with async pattern."""

import argparse
import asyncio
import json
import sys
from time import time
from typing import Any, Dict, List, Tuple

import aiohttp

DEFAULT_URL = "http://localhost:8000/process"


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Send test payloads to Ray Serve ingress.")
    parser.add_argument("--test", default=0, type=int, choices=[0, 1, 2], help="Value for the 'test' field in request body")
    parser.add_argument("--url", default=DEFAULT_URL, help=f"Ingress URL (default: {DEFAULT_URL})")
    parser.add_argument("--timeout", type=float, default=600.0, help="HTTP request timeout in seconds")
    parser.add_argument("-n", "--num-requests", type=int, default=1, help="Number of requests to send")
    parser.add_argument("-s", "--interval", type=float, default=0.0, help="Sleep interval between request dispatches in seconds")
    return parser.parse_args()


async def send_request(session: aiohttp.ClientSession, url: str, payload: Dict[str, Any], request_id: int) -> Tuple[int, Dict[str, Any], float]:
    """Send a single request and return the response data and timing."""
    req_start = time()
    try:
        async with session.post(url, json=payload) as response:
            response.raise_for_status()
            data = await response.json()
            req_end = time()
            return (request_id, data, req_end - req_start)
    except aiohttp.ClientError as exc:
        print(f"Request {request_id} failed: {exc}", file=sys.stderr)
        return (request_id, {"error": str(exc)}, time() - req_start)
    except json.JSONDecodeError:
        print(f"Request {request_id} returned invalid JSON", file=sys.stderr)
        return (request_id, {"error": "Invalid JSON response"}, time() - req_start)


async def main_async(args: argparse.Namespace) -> None:
    payload: Dict[str, Any] = {"test": args.test}
    overall_start = time()
    
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=args.timeout)) as session:
        tasks: List[asyncio.Task] = []
        print(f"Dispatching {args.num_requests} request(s) with {args.interval}s interval...")
        
        for i in range(args.num_requests):
            task = asyncio.create_task(send_request(session, args.url, payload, i + 1))
            tasks.append(task)
            if i < args.num_requests - 1:  # Don't sleep after the last task
                await asyncio.sleep(args.interval)
        
        print(f"All requests dispatched. Waiting for responses...")
        results = await asyncio.gather(*tasks)
    
    overall_end = time()
    
    # Print results
    print(f"\n{'='*60}")
    print(f"Total time (dispatch + all responses): {overall_end - overall_start:.3f} seconds")
    print(f"{'='*60}\n")
    
    for request_id, data, duration in results:
        print(f"Request {request_id} (took {duration:.3f}s):")
        print(json.dumps(data, indent=2))
        print()


def main() -> None:
    args = parse_args()
    asyncio.run(main_async(args))


if __name__ == "__main__":
    main()

Warmup

python3 send_requests.py --url http://localhost:8000/process_with_store -n 30 -s 20

Heavy Load

python3 send_requests.py --url http://localhost:8000/process_with_store -n 10 -s 5

As you can see memory usage after heavy load gets higher and not release to previous levels.
In ray dashboard after the requests (and processing) are done we can see some ray::IDLE processes still consuming memory

1 Like

Ray’s ray::IDLE processes not releasing memory after task completion is a known issue, especially under heavy or long-running workloads. This is due to Ray’s worker processes retaining memory allocations even when idle, and the memory is only reclaimed when the worker is killed—typically when the node approaches the RAY_memory_usage_threshold. Tuning parameters like kill_idle_workers_interval_ms or RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS often has little effect, and manual gc.collect() calls do not guarantee memory release. This behavior is by design and is widely reported, especially when large objects or high concurrency are involved. The issue is exacerbated by object references lingering in idle workers, and Ray’s memory recycling is not aggressive to avoid performance penalties. There is no configuration to force immediate memory release from idle workers without risking premature worker termination and 500 errors under load. See Ray documentation on memory debugging, GitHub issue 52174, and discussions on idle memory.

Regarding performance regression after upgrading, NumpyStore and increased ray.get/put calls can slow down processing, but other factors include changes in Ray’s internal scheduling, memory management, or object store behavior. Network load (e.g., Triton inference) and library versions can also impact performance, but the main cause of persistent memory usage is Ray’s worker and object store management, not your code or most third-party libraries. Unless you can reduce the size or lifetime of objects, or lower parallelism, there is no general workaround except to accept higher baseline memory usage or tune the memory threshold with caution. For more, see Ray memory troubleshooting and related GitHub issues.

Would you like a step-by-step breakdown of how to further diagnose or mitigate this, or more detail on specific Ray parameters?

Sources:

Hint: Mention @RunLLM in the post for followups.

Can I get help from any of the contributors please?

Let’s try this @RunLLM

Certainly! Here’s a step-by-step breakdown to diagnose and mitigate Ray memory leaks and high memory usage from ray::IDLE processes, as well as guidance on relevant Ray parameters:

Step 1: Diagnose Memory Usage

  • Use the Ray Dashboard to monitor per-process memory (look for ray::IDLE processes with high RSS).
  • Use htop or top on your node and check the RES (resident set size) and SHR (shared memory) columns. Calculate per-process heap usage as RSS - SHR (Ray memory debugging guide).
  • Run ray memory to see which ObjectRefs are still in scope and may be pinning objects in the object store (Ray memory management).

Step 2: Identify Causes

  • Check if large objects are being held in memory by lingering references or by idle workers.
  • Review your code for unnecessary retention of ObjectRefs or large objects.
  • If using NumpyStore or similar patterns, ensure objects are dereferenced as soon as possible.

Step 3: Mitigation Strategies

  • Lower parallelism: Reduce the number of concurrent tasks/actors to decrease total memory pressure (Ray memory debugging guide).
  • Increase num_cpus per task/actor to limit concurrency.
  • Use ray.remote(max_calls=1) or similar to force worker recycling after each task, which can help release memory (GitHub issue 49757).
  • If possible, tune RAY_memory_usage_threshold to a lower value, but be aware this may cause premature worker killing and 500 errors under load.
  • For Ray Serve, experiment with RAY_SERVE_ENABLE_PROXY_GC_OPTIMIZATIONS=0 and RAY_SERVE_PROXY_GC_THRESHOLD, but note these may have limited effect (GitHub issue 52174).
  • If using detached actors or custom object stores, ensure they are properly cleaned up.

Step 4: Advanced Profiling

  • Use the Ray Dashboard’s memory profiling tools to analyze memory leaks at the task/actor level (Ray memory debugging guide).
  • Consider using memray or similar Python memory profilers for deeper inspection.

Step 5: Last Resort

  • If memory is not released and you cannot lower the threshold, consider periodically restarting the Ray cluster or relevant workers as a workaround.

Would you like more detail on any of these steps or specific Ray parameters?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM How to configure max_calls for ray.remote decorator using ray.serve app config or env variable?

It seems likely that you cannot configure max_calls for a ray.remote function or actor via the Ray Serve app config file or any environment variable. The max_calls option can only be set directly in the @ray.remote decorator at function/class definition time, and not through the Serve deployment config or ray_actor_options (Ray Serve deployment config docs, Ray remote_function.py). There is no mention in the data sources of an environment variable or Serve config field that sets max_calls.

If you need to set max_calls, you must do so in the Python code using @ray.remote(max_calls=...) and not via config or env. Would you like more detail or code examples?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM What are default_options ray/python/ray/remote_function.py at master · ray-project/ray · GitHub. Where they are initialized? Are any global constants are used for re init

default_options in ray.remote_function.py are the default task options for a remote function, such as num_cpus, num_gpus, max_calls, etc. They are initialized in the RemoteFunction.__init__ method, where the task_options argument is processed and defaults are set using values from ray_option_utils.task_options (source). Each option’s default value comes from the default_value attribute in ray_option_utils.task_options, which may reference global constants (e.g., environment variables or hardcoded defaults) (source).

No global constant is used to re-initialize default_options after the RemoteFunction is created; they are set once at initialization based on the current environment and code context. Would you like a step-by-step breakdown of how these defaults are set?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM what may be the cause of ray object store memory leakage? I see that in grafana mmap_shm is leaking.
Between serve.deployment’s numpy arrays and big lists are transeferred as:

module1

arr1: ObjectRef = foo.remote()
arr2: ObjectRef = foo2.remote(arr1)
return {"arr1": arr1, "arr2": arr2}

module2

arr1 = await module1_result["arr1"]
arr2 = await module1_result["arr2"]
....

The most common cause of Ray object store (MMAP_SHM) memory “leakage” is that ObjectRefs to large objects (like numpy arrays or big lists) remain in scope somewhere in your application, preventing Ray from evicting or freeing them. This can happen if references are held in Python variables, returned in dictionaries, or passed between Serve deployments without being dereferenced or deleted. In your pattern, returning ObjectRefs in a dict and then awaiting them in another module can easily lead to lingering references if not all are dereferenced or deleted promptly (Ray memory management docs, Ray memory debugging guide).

Additionally, if you pass large objects by value (not by reference) or repeatedly create new ObjectRefs without cleaning up old ones, the object store will fill up and may not release memory even after tasks complete. This is especially true in long-running Serve deployments, where idle workers or lingering references can pin objects in memory. Use ray memory to check for in-scope ObjectRefs, and ensure you delete or dereference objects as soon as they are no longer needed (Ray Discourse: object store memory issues, Ray memory debugging guide).

Would you like a step-by-step guide to track down which references are causing the leak?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM
How to properly release memory in my case?

To properly release Ray object store memory in your case, ensure that all ObjectRefs (such as those for numpy arrays or big lists) are deleted or go out of scope as soon as you no longer need them. This includes removing references from dictionaries, variables, and any Python data structures. After deleting the ObjectRefs, you can optionally call gc.collect() to force Python garbage collection, but the key is to remove all references so Ray’s distributed reference counting can evict the objects from the object store (Ray memory management, Ray memory debugging guide).

If you are using Ray Serve and passing ObjectRefs between deployments, make sure to dereference (i.e., await or ray.get) and then delete them as soon as possible. Avoid holding onto large objects or ObjectRefs in global variables or long-lived objects. Use the ray memory command to check for lingering references that may be pinning objects in the store. If you still see memory not being released, check for nested references or objects captured inside other objects (Ray memory debugging guide).

Would you like a code example or more detail on using ray memory to debug?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM What is the proper way to pass objects between deployments?
The situation may be: I use autoscale, the replica returns ObjectRef, but then it get’s downscaled. Will the object be lost int that case because Owner Died? How to pass objects in that case? I think if I will use manual ray.put the issue will persist

If a deployment returns an ObjectRef and the replica (owner) is downscaled or terminated, the object will be lost and you will get an OwnerDiedError when trying to access it. This is because Ray objects fate-share with their owner (the worker or actor that created them), and if the owner dies, the object is deleted from the object store (Ray fault tolerance docs, Ray Discourse). Using manual ray.put does not solve this, as the owner is still the process that called ray.put.

The proper way to pass objects between deployments in an autoscaling environment is to store the object with an owner that is guaranteed to persist, such as a detached actor. You can create a detached actor and use ray.put(data, _owner=actor_handle) to ensure the object fate-shares with the actor, not the ephemeral replica (Ray Discourse: object ownership, Ray Discourse: sharing objects with deployment). This way, as long as the detached actor is alive, the object will not be lost even if the original deployment is downscaled.

Would you like a code example of this pattern?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM
Do that, please

Here’s a code example showing how to use a detached actor as the owner of objects you want to persist across deployment downscaling or replica termination. This ensures the object is not lost if the original deployment is removed (Ray Discourse: sharing objects with deployment, Ray Discourse: is it possible to share objects between different driver processes?):

import ray

# Create a detached actor to own shared objects
@ray.remote
class ObjectStore:
    def __init__(self):
        self.store = {}

    def put(self, key, value):
        self.store[key] = value

    def get(self, key):
        return self.store.get(key)

# Start Ray and create the detached actor
ray.init(address="auto", namespace="shared")
store = ObjectStore.options(name="shared_store", lifetime="detached", get_if_exists=True).remote()

# In your deployment or task, put the object with the actor as owner
data = [1, 2, 3]
obj_ref = ray.put(data, _owner=store)
ray.get(store.put.remote("my_data", obj_ref))

# Later, from any deployment or process, retrieve the object
retrieved_ref = ray.get(store.get.remote("my_data"))
print(ray.get(retrieved_ref))  # Output: [1, 2, 3]

This pattern ensures the object’s lifetime is tied to the detached actor, not the ephemeral deployment worker, so it survives autoscaling and downscaling events.

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM
Why ray::IDLE is leaking when using autoscale? I set max_calls=2 for all remote functions in my code. When I use that without autoscale IDLE is not leaking, but it starts leaking when autoscale enabled

When using Ray autoscaling, ray::IDLE processes can “leak” (i.e., persist and retain memory) because autoscaler-created workers may not be killed immediately after becoming idle, especially if the cluster is not scaled down aggressively. Even with max_calls=2, autoscaler logic may keep idle workers alive to avoid frequent worker startup costs, and these idle workers can retain memory from previous tasks (Ray autoscaler docs, Ray Discourse: idle workers not releasing resources). Without autoscale, Ray is more likely to kill idle workers promptly, but with autoscale, the cluster manager may keep them around in anticipation of new work, leading to memory not being released.

This is expected Ray behavior and is not directly controlled by max_calls. Autoscaler policies and idle worker thresholds determine when idle workers are terminated, and these may be tuned, but aggressive downscaling can cause cold start latency for new tasks.

Would you like more detail on tuning autoscaler or worker cleanup parameters?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM Do please…