Memory not released to default levels: `ray::IDLE` Processes Not Released**

To tune Ray autoscaler and worker cleanup parameters for better idle worker cleanup and reduced memory leakage, focus on these key settings:

  1. idle_timeout_minutes: Controls how long a worker node must be idle before the autoscaler removes it. Lowering this value (e.g., to 1 or 2) makes downscaling more aggressive, so idle workers are terminated sooner and their memory is released (Ray autoscaler docs).
  2. kill_idle_workers_interval_ms and idle_worker_killing_time_threshold_ms: These Ray system configs (set via --system-config or environment variables) control how often Ray checks for idle workers and how long a worker must be idle before being killed. Lowering these values can help clean up idle workers more quickly (ray_config_def.h).

Example YAML config for aggressive cleanup:

idle_timeout_minutes: 1
upscaling_speed: 1.0

Example Ray start command with system config:

ray start --head ... --system-config='{"kill_idle_workers_interval_ms":10000,"idle_worker_killing_time_threshold_ms":10000}'

Be cautious: setting these values too low can increase cold start latency, as new workers will need to be started more frequently.

Would you like a step-by-step example of how to apply these settings in your cluster config?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM Yes, please…

Here’s a step-by-step example of how to apply aggressive idle worker cleanup and autoscaler tuning in your Ray cluster config:

Step 1: Edit your cluster YAML file

Add or adjust these parameters in your cluster config YAML:

# Example: ray-cluster.yaml
max_workers: 10
idle_timeout_minutes: 1        # Remove idle workers after 1 minute
upscaling_speed: 1.0           # Standard upscaling speed

# ... other config fields ...

(Ray autoscaler config docs)

Step 2: Set Ray system config for worker cleanup

In your head_start_ray_commands and worker_start_ray_commands, add the --system-config flag to set worker cleanup intervals:

head_start_ray_commands:
  - ray stop
  - ray start --head --port=6379 --system-config='{"kill_idle_workers_interval_ms":10000,"idle_worker_killing_time_threshold_ms":10000}'
worker_start_ray_commands:
  - ray stop
  - ray start --address=$RAY_HEAD_IP:6379 --system-config='{"kill_idle_workers_interval_ms":10000,"idle_worker_killing_time_threshold_ms":10000}'

(ray_config_def.h)

Step 3: Launch or update your cluster

Use ray up ray-cluster.yaml to launch or update your cluster with the new settings.

Step 4: Monitor

Use the Ray Dashboard or ray status to confirm that idle workers are being removed more quickly and memory is released.

Would you like more detail on any of these steps or troubleshooting tips?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM What is the cause of
RayTaskError(ReferenceCountingAssertionError)(ReferenceCountingAssertionError(‘00f7ac668d3063a5ffffffffffffffffffffffff0200000002e1f505’, ‘’))
?

RayTaskError(ReferenceCountingAssertionError) occurs when Ray’s distributed reference counting protocol detects that an object has been deleted while there is still a reference to it. This should not happen in normal operation, but a known edge case can trigger it: if an ObjectRef is passed through an intermediate worker that dies before it can notify its parent of the new reference, the reference count can be undercounted, leading to premature deletion and this error (Ray fault tolerance docs, GitHub issue 18456).

This is most likely to happen when ObjectRefs are passed through multiple tasks or actors and an intermediate process crashes or exits unexpectedly. The workaround is to avoid passing ObjectRefs through intermediate workers that may die, and instead pass them directly from the driver or a persistent actor (Ray fault tolerance docs, GitHub issue 18456).

Would you like a step-by-step example or more detail on how to avoid this?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM

How to workaround this if we have remote functions, where we:

ray.remote
class PlasmaStoreActor:
    def __init__(self):
        self.alive = True

    def ensure_alive(self) -> bool:
        return self.alive


class NumpyStore:
    def __init__(self, data: np.ndarray):
        if not isinstance(data, np.ndarray):
            raise TypeError(type(data))
        self.shape = data.shape
        self.size = data.size
        self.ndim = data.ndim
        self.dtype = data.dtype
        self.length = len(data)
        actor = ray.get_actor(name="PlasmaStoreActor", namespace="PlasmaStore")
        self.data_ref = [ray.put(data, _owner=actor)]

    def retrieve(self) -> Any:
        return ray.get(self.data_ref[0])

    def __len__(self) -> int:
        return self.length


@ray.remote(max_calls=2)
def generate_array(size: Tuple[int, int, int], store: bool = False) -> np.ndarray:
    image = np.random.random(size) * 10
    if store:
        return NumpyStore(image)
    return image

@serve.deployment()
class ProcessorModule1:
    async def __call__(self, shape: Tuple[int, int, int]) -> tuple[ObjectRef, ObjectRef]:
         # here countingreferecne error is raised 
        array_ref: ObjectRef = generate_array.remote(shape)
        cc_ref: ObjectRef = find_cc.remote(array_ref)
        return array_ref, cc_ref

Note that we have set max_calls, those errors start to kick in after second function call. How to fix this?

The ReferenceCountingAssertionError in your case is caused by the use of max_calls=2 on your remote function. When a Ray worker reaches its max_calls limit, it is terminated and any objects it owns (including ObjectRefs it created) are also deleted—even if other tasks or actors still hold references to those objects. This leads to the ReferenceCountingAssertionError when you try to access an ObjectRef whose owner (the worker) has already exited (GitHub issue 18456, Ray Discourse: getting reference counting assertion error).

Workaround:
To avoid this, you should ensure that the owner of the ObjectRef is a long-lived actor (such as your PlasmaStoreActor) and that the ObjectRef is passed to and stored by this actor before the worker that created it exits. Specifically, after creating the ObjectRef with ray.put(data, _owner=actor), call a method on the actor to store the reference, and only then allow the worker to exit. This ensures the reference count is properly updated and the object is not prematurely deleted. Avoid using max_calls on remote functions that create and return ObjectRefs unless you are certain the references will not be needed after the worker exits.

Would you like a code example of this pattern?

Sources:

Hint: Mention @RunLLM in the post for followups.

yes, do please…

@RunLLM
Yes, do please …

Here’s a code example that demonstrates how to safely manage ObjectRefs with a long-lived actor (PlasmaStoreActor) to avoid ReferenceCountingAssertionError when using max_calls or short-lived workers. The key is to store the ObjectRef in the actor before the worker exits, ensuring the object’s lifetime is tied to the actor, not the ephemeral worker (GitHub issue 18456, Ray Discourse):

import ray
import numpy as np

# Long-lived actor to own and store ObjectRefs
@ray.remote
class PlasmaStoreActor:
    def __init__(self):
        self.refs = {}

    def store(self, key, obj_ref):
        self.refs[key] = obj_ref

    def retrieve(self, key):
        return self.refs.get(key)

# Start Ray and the actor
ray.init()
store_actor = PlasmaStoreActor.options(name="PlasmaStoreActor", lifetime="detached", get_if_exists=True).remote()

# Remote function with max_calls that puts object and stores ref in actor
@ray.remote(max_calls=2)
def generate_and_store_array(key, size):
    arr = np.random.random(size)
    # Put the array in the object store, owned by the actor
    obj_ref = ray.put(arr, _owner=store_actor)
    # Store the ObjectRef in the actor before the worker exits
    ray.get(store_actor.store.remote(key, obj_ref))
    return key

# Usage
key = "my_array"
size = (100, 100)
ray.get(generate_and_store_array.remote(key, size))

# Retrieve the ObjectRef from the actor and get the data
obj_ref = ray.get(store_actor.retrieve.remote(key))
arr = ray.get(obj_ref)
print(arr.shape)  # (100, 100)

This pattern ensures the ObjectRef is not lost when the worker exits, preventing ReferenceCountingAssertionError.

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM my implementation of NumpyStore does the same i think. Or no? Even with this, objects being lost

Your NumpyStore implementation is close, but it does not guarantee the object will persist if the worker that created the ObjectRef exits before the reference is safely stored in the actor. The critical difference is that you must explicitly call a method on the long-lived actor (e.g., store_actor.store.remote(key, obj_ref)) to store the ObjectRef before the worker exits (Ray Discourse: reference counting assertion error). Simply assigning the ObjectRef to a class variable in NumpyStore is not enough—if the worker dies, the reference count may not be updated correctly, and the object can be lost.

To ensure safety, always:

  1. Use ray.put(data, _owner=actor) to create the ObjectRef.
  2. Immediately call a method on the actor to store the ObjectRef.
  3. Only then allow the worker to exit.

If you skip step 2, or if the worker exits before the actor receives the reference, the object may be lost due to reference counting protocol limitations (GitHub issue 18456).

Would you like a revised NumpyStore pattern that guarantees safety?

Sources:

Hint: Mention @RunLLM in the post for followups.

yes i would like revised pattern
@RunLLM

yes i would like revised pattern
@RunLLM

Here’s a revised NumpyStore pattern that guarantees safety by ensuring the ObjectRef is stored in a long-lived, detached actor before the worker exits. This prevents ReferenceCountingAssertionError and object loss, even with max_calls or autoscaling (Ray Discourse: reference counting assertion error, GitHub issue 18456):

import ray
import numpy as np

# Detached actor to persistently own and store ObjectRefs
@ray.remote
class PlasmaStoreActor:
    def __init__(self):
        self.refs = {}

    def store(self, key, obj_ref):
        self.refs[key] = obj_ref

    def retrieve(self, key):
        return self.refs.get(key)

# Start Ray and the actor (ensure it's detached and persistent)
ray.init()
store_actor = PlasmaStoreActor.options(
    name="PlasmaStoreActor", namespace="PlasmaStore", lifetime="detached", get_if_exists=True
).remote()

class NumpyStore:
    def __init__(self, data: np.ndarray, key: str):
        if not isinstance(data, np.ndarray):
            raise TypeError(type(data))
        self.shape = data.shape
        self.size = data.size
        self.ndim = data.ndim
        self.dtype = data.dtype
        self.length = len(data)
        # Put the array in the object store, owned by the actor
        self.data_ref = ray.put(data, _owner=store_actor)
        # Store the ObjectRef in the actor before the worker exits
        ray.get(store_actor.store.remote(key, self.data_ref))
        self.key = key

    def retrieve(self) -> np.ndarray:
        # Retrieve the ObjectRef from the actor and get the data
        obj_ref = ray.get(store_actor.retrieve.remote(self.key))
        return ray.get(obj_ref)

    def __len__(self) -> int:
        return self.length

# Example usage in a remote function
@ray.remote(max_calls=2)
def generate_and_store_array(key, size):
    arr = np.random.random(size)
    store = NumpyStore(arr, key)
    return key

# Usage
key = "my_array"
size = (100, 100)
ray.get(generate_and_store_array.remote(key, size))

# Retrieve the array later
store = NumpyStore(np.empty((0,)), key)  # Dummy data, just to use retrieve
arr = store.retrieve()
print(arr.shape)  # (100, 100)

Key points:

  • The ObjectRef is stored in the actor before the worker exits.
  • Retrieval always goes through the actor, ensuring the object is not lost if the original worker is gone.

Let me know if you want a breakdown of each step or further explanation!

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM Is there parameter in ray that tells the minimum and maximum number of IDLE workers that standstill, waiting for execution. Describe the killing policy of IDLE processes

Yes, Ray controls the number of idle workers using the soft limit parameter num_workers_soft_limit, which defaults to the number of available CPUs. Idle workers above this limit are eligible for termination. The killing policy is governed by two main parameters: kill_idle_workers_interval_ms (how often Ray checks for idle workers) and idle_worker_killing_time_threshold_ms (how long a worker must be idle before being killed). If the number of idle workers exceeds the soft limit and they have been idle longer than the threshold, Ray will kill them in FIFO order until the soft limit is met (ray_config_def.h, worker_pool.cc).

There is no explicit parameter for a minimum number of idle workers to keep, but the soft limit acts as the maximum. The killing policy ensures that workers are only killed if they are idle beyond the threshold and the number of idle workers exceeds the soft limit. IO workers are not counted toward this limit.

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM Search for issues where num_workers_soft_limit is mentioned

Here are relevant issues where num_workers_soft_limit is mentioned:

  • GitHub issue 12525: Discusses slow worker startup when the number of running workers is greater than or equal to num_cpus, and how num_workers_soft_limit affects this behavior.
  • GitHub issue 29162: Mentions that prestarting workers is limited by maximum_startup_concurrency and that num_workers_soft_limit is set to num_cpus by default, affecting how many workers are prestarted.

Would you like more detail or a summary of the discussions in these issues?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM What may be the cause of ray idle consuming a lot of cpu?