Ray Actors cannot stop

I am creating a remote model training class to run in a while loop to train the model. Here is the dummy code

import time

import ray


@ray.remote(num_cpus=1)
class A(object):
    def __init__(self, b) -> None:
        self.b = b
    
    def run(self):
        return ray.get(self.b.get_data.remote())


@ray.remote(num_cpus=1)
class B(object):
    def __init__(self) -> None:
        self.data = []
        self.stop = False

    def append(self, d):
        self.data.append(d)
    
    def size(self):
        return len(self.data)
    
    def get_data(self):
        return self.data

    def set_stop(self):
        self.stop = True

    def get_stop(self):
        return self.stop

    def run(self):
        iter = 0
        while True:
            # print(f'B size: {self.data}, iter: {iter}')
            iter += 1
            if self.stop:
                print(f'Stop B size: {self.data}, iter: {iter}')
                break


if __name__ == '__main__':
    ray.init()

    b = B.remote()
    a = A.remote(b)
    ws = [b.run.remote()]

    for i in range(10):
        b.append.remote(i)
    
    print(f'start to stop!')
    print(f'start to stop!, now: {ray.get(b.get_stop.remote())}')
    b.set_stop.remote()
    print(f'stop!, now: {ray.get(b.get_stop.remote())}')
    a_data = ray.get(a.run.remote())
    b_data = ray.get(b.get_data.remote())
    print(f'show a data: {a_data}')
    print(f'show b data: {b_data}')
    ray.wait(ws)
    ray.shutdown()

However, it seems the code is getting stuck at print(f'start to stop!, now: {ray.get(b.get_stop.remote())}'). How can I fix this issue?

Can anyone help to solve my question?

@GoingMyWay the reason this program hangs is because b.run.remote() never exits. For the default setup of an Actor, at the given time, there is only one thread running there and it’ll execute the next one only when the first one is finished.

For your case, either using async actor or thread actor (AsyncIO / Concurrency for Actors — Ray 1.12.1)

If using async actor, you’ll need to change the run into an async version (check + async sleep)

If using the threaded actor, please add the lock to prevent the race conditions.

1 Like