Fire and Forget

Good day,

Apologies if this has been asked before, I couldn’t find any definitive answers from previous discussions. I want to know what the best way is to effect a true “fire_and_forget” method within Ray, without creating memory pressure within Ray’s object store after a few thousand fire-and-forget methods. I have tried numerous methods but it all seems to eventually cause my Sync Ray Actor to crash on segmentation faults after a few hours of execution.

The methods I’ve tried thus far:

  1. Simple “actor.method.remote()” without awaiting or ray.get the result. This (I believe) eventually causes memory build-up because the result is never retrieved to clear the ref from the Object Store
  2. “_ = actor.method.remote() del _”, which only deletes the object ref in the calling script but not from Ray’s object store.
  3. Tried using “ray._private.worker.global_worker.core_worker.free_objects([obj_ref], False)” and “ray.experimental.internal_kv._internal_kv_del(obj_ref.hex(), namespace=ray_namespace)”, both seem to create internal raylet issues
  4. Lastly creating a custom ObjectRefWaiter where a “fire_and_forget” method adds a Object Ref to a “set” within the waiter class which then “waits” on the object refs within a custom threading.Thread with “ray.wait”.

I’m using Ray 2.7.2, I believe there was previous “fire-and-forget” implementation in ray.util, but has since been removed.

Please let me know what the correct way is to implement a “fire_and_forget” methodology as it normally functions like ray.util.Queue “put” and “put_nowait” that I want to fire into this method to speed things up in my program. Any help will be appreciated.

Hi,

Could you provide minimal repros of your approach(es) so I can troubleshoot them?

Thanks!
Ibrahim

Option 1:

def fire_and_forget(actor_name: str, namespace: str, method, *args, **kwargs):
actor = ray.get_actor(actor_name, namespace=namespace)
actor.method.remote(*args, **kwargs)

Option 2:
def fire_and_forget(actor_name: str, namespace: str, method, *args, **kwargs):
actor = ray.get_actor(actor_name, namespace=namespace)
object_ref = actor.method.remote(*args, **kwargs)
del object_ref

Option 3:
def fire_and_forget(actor_name: str, namespace: str, method, *args, **kwargs):
actor = ray.get_actor(actor_name, namespace=namespace)
ray._private.worker.global_worker.core_worker.free_objects([actor.method.remote(*args, **kwargs)], False)

or,
def fire_and_forget(actor_name: str, namespace: str, method, *args, **kwargs):
actor = ray.get_actor(actor_name, namespace=namespace)
object_ref = actor.method.remote(*args, **kwargs)
ray.experimental.internal_kv._internal_kv_del(object_ref.hex(), namespace=namespace)

Option 4:
class _ObjectRefWaiter:

def __init__(self, wait_timeout: float = 10.0, poll_interval: float = 2):
    self._refs: Set[ray.ObjectRef] = set()
    self._lock = Lock()
    self._wait_timeout = wait_timeout
    self._poll_interval = poll_interval
    self._stop_event = Event()
    self._thread = Thread(target=self._wait_loop, daemon=True)
    self._thread.start()

def add(self, obj_ref: ray.ObjectRef):
    """
    Add a Ray ObjectRef to the internal wait set.
    """
    if not isinstance(obj_ref, ray.ObjectRef):
        return
    with self._lock:
        self._refs.add(obj_ref)

def _wait_loop(self):
    """
    Background loop to wait on refs. Fully ray.gets ready refs, and keeps pending ones.
    """
    while not self._stop_event.is_set():
        refs_copy = []
        with self._lock:
            refs_copy = list(self._refs)

        if refs_copy:
            try:
                ready, not_ready = ray.wait(
                    refs_copy,
                    timeout=self._wait_timeout,
                    num_returns=len(refs_copy))
                if ready:
                    try:
                        ray.get(ready)
                    except Exception:
                        pass  # Optional: Log exception or ignore
                with self._lock:
                    for r in ready:
                        self._refs.discard(r)  # Only discard fully resolved ones
            except RayError:
                pass  # Optional: Log internal Ray error

        self._stop_event.wait(self._poll_interval)

def shutdown(self):
    """
    Clean shutdown of the internal thread.
    """
    self._stop_event.set()
    self._thread.join(timeout=5)

then add object refs with:

def _get_ray_waiter() → _AsyncObjectRefWaiter:
global _ray_waiter_instance
if _ray_waiter_instance is None:
_ray_waiter_instance = _AsyncObjectRefWaiter()
return _ray_waiter_instance

def fire_and_forget(obj_ref: ray.ObjectRef):
_get_ray_waiter().add(obj_ref)