To tune Ray autoscaler and worker cleanup parameters for better idle worker cleanup and reduced memory leakage, focus on these key settings:
idle_timeout_minutes: Controls how long a worker node must be idle before the autoscaler removes it. Lowering this value (e.g., to 1 or 2) makes downscaling more aggressive, so idle workers are terminated sooner and their memory is released (Ray autoscaler docs).
kill_idle_workers_interval_ms and idle_worker_killing_time_threshold_ms: These Ray system configs (set via --system-config or environment variables) control how often Ray checks for idle workers and how long a worker must be idle before being killed. Lowering these values can help clean up idle workers more quickly (ray_config_def.h).
Example YAML config for aggressive cleanup:
idle_timeout_minutes: 1
upscaling_speed: 1.0
Example Ray start command with system config:
ray start --head ... --system-config='{"kill_idle_workers_interval_ms":10000,"idle_worker_killing_time_threshold_ms":10000}'
Be cautious: setting these values too low can increase cold start latency, as new workers will need to be started more frequently.
Would you like a step-by-step example of how to apply these settings in your cluster config?
In your head_start_ray_commands and worker_start_ray_commands, add the --system-config flag to set worker cleanup intervals:
head_start_ray_commands:
- ray stop
- ray start --head --port=6379 --system-config='{"kill_idle_workers_interval_ms":10000,"idle_worker_killing_time_threshold_ms":10000}'
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP:6379 --system-config='{"kill_idle_workers_interval_ms":10000,"idle_worker_killing_time_threshold_ms":10000}'
@RunLLM What is the cause of
RayTaskError(ReferenceCountingAssertionError)(ReferenceCountingAssertionError(‘00f7ac668d3063a5ffffffffffffffffffffffff0200000002e1f505’, ‘’))
?
RayTaskError(ReferenceCountingAssertionError) occurs when Ray’s distributed reference counting protocol detects that an object has been deleted while there is still a reference to it. This should not happen in normal operation, but a known edge case can trigger it: if an ObjectRef is passed through an intermediate worker that dies before it can notify its parent of the new reference, the reference count can be undercounted, leading to premature deletion and this error (Ray fault tolerance docs, GitHub issue 18456).
This is most likely to happen when ObjectRefs are passed through multiple tasks or actors and an intermediate process crashes or exits unexpectedly. The workaround is to avoid passing ObjectRefs through intermediate workers that may die, and instead pass them directly from the driver or a persistent actor (Ray fault tolerance docs, GitHub issue 18456).
Would you like a step-by-step example or more detail on how to avoid this?
The ReferenceCountingAssertionError in your case is caused by the use of max_calls=2 on your remote function. When a Ray worker reaches its max_calls limit, it is terminated and any objects it owns (including ObjectRefs it created) are also deleted—even if other tasks or actors still hold references to those objects. This leads to the ReferenceCountingAssertionError when you try to access an ObjectRef whose owner (the worker) has already exited (GitHub issue 18456, Ray Discourse: getting reference counting assertion error).
Workaround:
To avoid this, you should ensure that the owner of the ObjectRef is a long-lived actor (such as your PlasmaStoreActor) and that the ObjectRef is passed to and stored by this actor before the worker that created it exits. Specifically, after creating the ObjectRef with ray.put(data, _owner=actor), call a method on the actor to store the reference, and only then allow the worker to exit. This ensures the reference count is properly updated and the object is not prematurely deleted. Avoid using max_calls on remote functions that create and return ObjectRefs unless you are certain the references will not be needed after the worker exits.
Here’s a code example that demonstrates how to safely manage ObjectRefs with a long-lived actor (PlasmaStoreActor) to avoid ReferenceCountingAssertionError when using max_calls or short-lived workers. The key is to store the ObjectRef in the actor before the worker exits, ensuring the object’s lifetime is tied to the actor, not the ephemeral worker (GitHub issue 18456, Ray Discourse):
import ray
import numpy as np
# Long-lived actor to own and store ObjectRefs
@ray.remote
class PlasmaStoreActor:
def __init__(self):
self.refs = {}
def store(self, key, obj_ref):
self.refs[key] = obj_ref
def retrieve(self, key):
return self.refs.get(key)
# Start Ray and the actor
ray.init()
store_actor = PlasmaStoreActor.options(name="PlasmaStoreActor", lifetime="detached", get_if_exists=True).remote()
# Remote function with max_calls that puts object and stores ref in actor
@ray.remote(max_calls=2)
def generate_and_store_array(key, size):
arr = np.random.random(size)
# Put the array in the object store, owned by the actor
obj_ref = ray.put(arr, _owner=store_actor)
# Store the ObjectRef in the actor before the worker exits
ray.get(store_actor.store.remote(key, obj_ref))
return key
# Usage
key = "my_array"
size = (100, 100)
ray.get(generate_and_store_array.remote(key, size))
# Retrieve the ObjectRef from the actor and get the data
obj_ref = ray.get(store_actor.retrieve.remote(key))
arr = ray.get(obj_ref)
print(arr.shape) # (100, 100)
This pattern ensures the ObjectRef is not lost when the worker exits, preventing ReferenceCountingAssertionError.
Your NumpyStore implementation is close, but it does not guarantee the object will persist if the worker that created the ObjectRef exits before the reference is safely stored in the actor. The critical difference is that you must explicitly call a method on the long-lived actor (e.g., store_actor.store.remote(key, obj_ref)) to store the ObjectRef before the worker exits (Ray Discourse: reference counting assertion error). Simply assigning the ObjectRef to a class variable in NumpyStore is not enough—if the worker dies, the reference count may not be updated correctly, and the object can be lost.
To ensure safety, always:
Use ray.put(data, _owner=actor) to create the ObjectRef.
Immediately call a method on the actor to store the ObjectRef.
Only then allow the worker to exit.
If you skip step 2, or if the worker exits before the actor receives the reference, the object may be lost due to reference counting protocol limitations (GitHub issue 18456).
Would you like a revised NumpyStore pattern that guarantees safety?
Here’s a revised NumpyStore pattern that guarantees safety by ensuring the ObjectRef is stored in a long-lived, detached actor before the worker exits. This prevents ReferenceCountingAssertionError and object loss, even with max_calls or autoscaling (Ray Discourse: reference counting assertion error, GitHub issue 18456):
import ray
import numpy as np
# Detached actor to persistently own and store ObjectRefs
@ray.remote
class PlasmaStoreActor:
def __init__(self):
self.refs = {}
def store(self, key, obj_ref):
self.refs[key] = obj_ref
def retrieve(self, key):
return self.refs.get(key)
# Start Ray and the actor (ensure it's detached and persistent)
ray.init()
store_actor = PlasmaStoreActor.options(
name="PlasmaStoreActor", namespace="PlasmaStore", lifetime="detached", get_if_exists=True
).remote()
class NumpyStore:
def __init__(self, data: np.ndarray, key: str):
if not isinstance(data, np.ndarray):
raise TypeError(type(data))
self.shape = data.shape
self.size = data.size
self.ndim = data.ndim
self.dtype = data.dtype
self.length = len(data)
# Put the array in the object store, owned by the actor
self.data_ref = ray.put(data, _owner=store_actor)
# Store the ObjectRef in the actor before the worker exits
ray.get(store_actor.store.remote(key, self.data_ref))
self.key = key
def retrieve(self) -> np.ndarray:
# Retrieve the ObjectRef from the actor and get the data
obj_ref = ray.get(store_actor.retrieve.remote(self.key))
return ray.get(obj_ref)
def __len__(self) -> int:
return self.length
# Example usage in a remote function
@ray.remote(max_calls=2)
def generate_and_store_array(key, size):
arr = np.random.random(size)
store = NumpyStore(arr, key)
return key
# Usage
key = "my_array"
size = (100, 100)
ray.get(generate_and_store_array.remote(key, size))
# Retrieve the array later
store = NumpyStore(np.empty((0,)), key) # Dummy data, just to use retrieve
arr = store.retrieve()
print(arr.shape) # (100, 100)
Key points:
The ObjectRef is stored in the actor before the worker exits.
Retrieval always goes through the actor, ensuring the object is not lost if the original worker is gone.
Let me know if you want a breakdown of each step or further explanation!
@RunLLM Is there parameter in ray that tells the minimum and maximum number of IDLE workers that standstill, waiting for execution. Describe the killing policy of IDLE processes
Yes, Ray controls the number of idle workers using the soft limit parameter num_workers_soft_limit, which defaults to the number of available CPUs. Idle workers above this limit are eligible for termination. The killing policy is governed by two main parameters: kill_idle_workers_interval_ms (how often Ray checks for idle workers) and idle_worker_killing_time_threshold_ms (how long a worker must be idle before being killed). If the number of idle workers exceeds the soft limit and they have been idle longer than the threshold, Ray will kill them in FIFO order until the soft limit is met (ray_config_def.h, worker_pool.cc).
There is no explicit parameter for a minimum number of idle workers to keep, but the soft limit acts as the maximum. The killing policy ensures that workers are only killed if they are idle beyond the threshold and the number of idle workers exceeds the soft limit. IO workers are not counted toward this limit.
Here are relevant issues where num_workers_soft_limit is mentioned:
GitHub issue 12525: Discusses slow worker startup when the number of running workers is greater than or equal to num_cpus, and how num_workers_soft_limit affects this behavior.
GitHub issue 29162: Mentions that prestarting workers is limited by maximum_startup_concurrency and that num_workers_soft_limit is set to num_cpus by default, affecting how many workers are prestarted.
Would you like more detail or a summary of the discussions in these issues?