Reactive State for Actors...i.e. Callbacks without using ray.tune

Hi! I created a custom PriorityQueue…based off of ray.util.queue.Queue. Like Ray’s Queue, PriorityQueue uses an Actor to wrap an asyncio.PriorityQueue.

I’m working on a realtime app that frequently polls the qsize. This essentially spams remote calls to the PriorityQueueActor…which eventually overloads the Ray cluster & maxes out my CPU usage.

I’d like a reactive signal for qsize that will notify the caller whenever qsize changes. This would significantly reduce the number of remote calls. Since a remote call would occur when qsize changes, instead of n clients polling every ~1ms.

Are there available examples or patterns that implement reactivity with Ray actors? Where updates to the state notify the remote listeners. And each remote listener caches the state in it’s local process.

Callbacks would work. However all of the Callback examples I found relate to ray.tune…which I’m not using.

I see rayvens, which could work. It seems a bit heavy for my needs but I’ll give it try. I hope there’s a lightweight way to do this without adding another dependency.

Thank you,
Brian


I see that a Ray Actor can be an AsyncIterator. I think this will work. Where the a change to the state yields the new state.

Hi btak! Welcome to the Ray community :slight_smile:

So essentially what I’m understanding is that we want to reduce the # of calls to check qsize, so instead of continuous polling you want to have a listener on the queue instead.

I think this would be a great case to use asyncio.Event. You can create a Ray actor to manage it and will signal changes to the queue size. Whenever the queue size changes in your PriorityQueue actor, trigger the SignalActor to notify listeners. Each listener waits for a signal and updates its local cache with the new queue size.

I think your listener task could look something like this:

# Listener task
async def listener_task(signal, pq_actor):
    while True:
        await signal.wait.remote()
        size = ray.get(pq_actor.qsize.remote())
        print(f"Queue size updated: {size}")

Here’s some docs that might be helpful:

And this code sample might be helpful: