Ray actor with detached lifetime error, Job finishes as driver exits. Marking all non-terminal tasks as failed

I have create Detached actor and ran a function meanwhile exited driver.

mport ray


@ray.remote
class Greeter:
    def __init__(self, value):
        self.value = value
        import time
        for i in range(100):
            time.sleep(1)
            print('########', i)

    def say_hello(self):
        import time
        for i in range(100):
            time.sleep(1)
            print('########', i)
        return self.value


a = Greeter.options(lifetime="detached", name="g1").remote("Old Greeting")
a.say_hello.remote()
# Driver exits

Is it supported?

On the dashboard it give error,

Error Type: WORKER_DIED

Job finishes (01000000) as driver exits. Marking all non-terminal tasks as failed.

1 Like

same problem. Is it fixed?

So, I’ve caught to the same issue. I know why it happens, but not sure if it is actually the intention, so, I made a good reprex here. As a note, it has to connect to any cluster, not use the local automatically created by ray.

A failing case:

address = "ray://localhost:10001"
@ray.remote
class myactor:
    def __init__(self):
        self.state = 0

    def increment(self):
        self.state += 1
        sleep(10)
        with open("/tmp/simple_test.txt", "w") as f:
            f.write(f"done {self.state}\n")
        return self.state

conn = ray.init(address = address)

acthdl = myactor.options(name= "testact", lifetime="detached").remote()

### we are not waiting for this future, we just want it to do it's thing after we disconnect
fut = acthdl.increment.remote()

## sleep(6) ## if you uncomment, it works as intended

conn.disconnect()

Notice in the Dashboard that it fails, because we never waited for the future handle. Interestingly enough, if you add a sleep after the future and wait for it (not ray.wait), it actually concludes as intended, but defeats the purpose because then this call becomes blocking.

In a more complex case we can actually use an actor to initiate other actors in its __init__. All of the “downstream actors” will conclude their thing. But at the moment the starting actor has to return the “i am ready” to some internal, it gets the same fail as above, but at least kinda “works”. You will see that the file is written.

import ray
from time import sleep
address = "ray://localhost:10001"

@ray.remote
class myactor():
    def __init__(self, starter = False):
        self.state = 0
        self.starter = starter
        if starter:
            hdls = [myactor.options(num_cpus = 0 ).remote() for i in range(10)]
            states = [hdl.increment.remote() for hdl in hdls]
            st = sum(ray.get(states))
            with open("/tmp/test.txt", "w") as f:
                f.write(f"done {st}\n")
            allkilled= [ray.kill(act) for act in hdls]

    def increment(self):
        self.state += 1
        sleep(10)
        return self.state
    

cli = ray.init(address = address)

acthdl = myactor.options(name= "testact", lifetime="detached").remote(starter = True)

cli.disconnect()