Interruptible recursion with Ray Actor

I have a long running Actor that produces data that is then consumed by other Actors. For now, let’s mock this in a simple fashion:

@ray.remote
class Actor:
    def __init__(self):
        self.msg = "Hello, World!"
    def say(self):
        while True:
            print(self.msg)

However, sometimes I want to change how this actor produces data. Naively, I write:

@ray.remote
class Actor:
    def __init__(self):
        self.msg = "Hello, World!"

    def update(self):
        self.msg = "Goodbye, World!"

    def say(self):
        print(self.msg)
        # recur as a task so other tasks can run
        self.say.remote()

This errors as self.say is a normal function. So instead I write a way to capture the special say.remote function:

import ray, time

ray.init()

@ray.remote
class Actor:
    def __init__(self):
        self.msg = "Hello, World!"
        self._say = None

    def update(self):
        self.msg = "Goodbye, World!"

    def say(self, say_remote):
        print(self.msg)
        say_remote(say_remote)

actor = Actor.remote()
actor.say.remote(actor.say.remote)
time.sleep(2)
actor.update.remote()
time.sleep(5)

And this works! The terminal prints “Hello, World!” repeatedly for 2 seconds, and then prints “Goodbye, World!” for 4 seconds.

Questions:

  1. Is this okay to do for long running processes (i.e. no stackoverflow or issues?). in my real-world use-case, my say function takes about 1ms.
  2. is there a better way to do this? I know could do with threads or async, but that seems messier imho.

This is admittedly a bit of a weird pattern, so would love any thoughts!

This will not cause a stack overflow. Specifically, say() spawns say_remote(), but say_remote() will not run until say() returns. Thus, there will only be 1-2 instances of say() that ever exist at once. There will be a single instance when say() is first called and before say_remote() is called. There will be two instances once say_remote() is called, and then the number of instances will drop back down to one once say() returns.

Before I address your second question, could you please elaborate on why you want the function to execute in an infinite loop? My first thought is that there may be a different issue we want to fix instead.

Thanks for the clear explanation! That’s what I figured / hoped.

As I attempted to mock, I have a Producer actor that is producing data in real-time from a sensor (e.g. webcam). It reads data as quickly as possible from hardware and then passes the data to another actor for processing. The actor should produce data infinitely.

Sometimes we want to change some settings on the sensor (e.g. exposure time). Another actor communicates with the Producer actor to request the setting change, and the stream of data coming out should reflect the new setting as soon as possible.

Found a much cleaner approach!

import ray, time

ray.init()


@ray.remote
class Actor:
    def __init__(self):
        self.msg = "Hello, World!"
        self._say = None

    def update(self):
        self.msg = "Goodbye, World!"

    def say(self):
        while True:
            print(self.msg)


actor = Actor.options(max_concurrency=2).remote()
actor.say.remote()
time.sleep(2)
actor.update.remote()
time.sleep(5)