I have create Detached actor and ran a function meanwhile exited driver.
mport ray
@ray.remote
class Greeter:
def __init__(self, value):
self.value = value
import time
for i in range(100):
time.sleep(1)
print('########', i)
def say_hello(self):
import time
for i in range(100):
time.sleep(1)
print('########', i)
return self.value
a = Greeter.options(lifetime="detached", name="g1").remote("Old Greeting")
a.say_hello.remote()
# Driver exits
Is it supported?
On the dashboard it give error,
Error Type: WORKER_DIED
Job finishes (01000000) as driver exits. Marking all non-terminal tasks as failed.
1 Like
same problem. Is it fixed?
So, I’ve caught to the same issue. I know why it happens, but not sure if it is actually the intention, so, I made a good reprex here. As a note, it has to connect to any cluster, not use the local automatically created by ray.
A failing case:
address = "ray://localhost:10001"
@ray.remote
class myactor:
def __init__(self):
self.state = 0
def increment(self):
self.state += 1
sleep(10)
with open("/tmp/simple_test.txt", "w") as f:
f.write(f"done {self.state}\n")
return self.state
conn = ray.init(address = address)
acthdl = myactor.options(name= "testact", lifetime="detached").remote()
### we are not waiting for this future, we just want it to do it's thing after we disconnect
fut = acthdl.increment.remote()
## sleep(6) ## if you uncomment, it works as intended
conn.disconnect()
Notice in the Dashboard that it fails, because we never waited for the future handle. Interestingly enough, if you add a sleep after the future and wait for it (not ray.wait), it actually concludes as intended, but defeats the purpose because then this call becomes blocking.
In a more complex case we can actually use an actor to initiate other actors in its __init__
. All of the “downstream actors” will conclude their thing. But at the moment the starting actor has to return the “i am ready” to some internal, it gets the same fail as above, but at least kinda “works”. You will see that the file is written.
import ray
from time import sleep
address = "ray://localhost:10001"
@ray.remote
class myactor():
def __init__(self, starter = False):
self.state = 0
self.starter = starter
if starter:
hdls = [myactor.options(num_cpus = 0 ).remote() for i in range(10)]
states = [hdl.increment.remote() for hdl in hdls]
st = sum(ray.get(states))
with open("/tmp/test.txt", "w") as f:
f.write(f"done {st}\n")
allkilled= [ray.kill(act) for act in hdls]
def increment(self):
self.state += 1
sleep(10)
return self.state
cli = ray.init(address = address)
acthdl = myactor.options(name= "testact", lifetime="detached").remote(starter = True)
cli.disconnect()