Actors pool - process stuck / tasks lost on a long run

we are using Ray core - actors in an actors pool to run long calculations. using ray 1.9, with an on premise cluster.

the actor pool is created using this code:

# create all the actors that will execute the train and eval commands.
# since the actors live and serve throughout the entire process lifetime, we get pseudo data locality:
# the input files can be cached by the file system
ray.init(address=f'ray://{self.ray_head_node}:10001')
self.actors = [TrainAndEvaluateModelActor.remote(self.manager_actor, i) for i in range(0, self.batch_size)]
self.actors_pool = ActorPool(self.actors)

the TrainAndEvaluateModelActor gets a list of shell commands to execute (the real calculation is done using a binary). this actor gets a reference to a ‘manager’ actor and sends notifications on process start / end to it.
the process iterates over X iterations, where each iteration is composed of Y model calculation tasks. the actors live throughout the entire process in order to always run the calculations on the same hosts.

send task to actors code:

def evaluate(self, iteration: [], eval_metric):
        task_defs = [self.create_task_def(instance, eval_metric) for instance in iteration]
        results = self.actors_pool.map(lambda a, t: a.run.remote(cmd_data=t), task_defs)
        [self.update_in_pool(result) for result in results]

The problem:
after a while, the driver code is stuck waiting for tasks sent to the pool for execution.
the actors are alive (hence - there is a connection between the cluster and the driver) but idle - the tasks are never executed.
we do not see any error in the logs.

Hey @nir-laor thanks for the report! The symptoms do sound like a bug, but I am not aware of any similar issue.

Is it possible for you to create a Github issue https://github.com/ray-project/ray/issues ideally with a reproduction script? Adding a repro script will help us identify the issue as soon as possible.

If reproduction is not possible, I’d love to do pair debugging to find the problem together.

Hello
This is a slice of code we’ve created with Nir that reproduces the state
Please note that each task executing some command line,
Such a command line might take a couple of hours to do the work. For privacy reasons, we can’t provide the real data and our model executable. But if you put any long execution command line on the machine, we hope it should be enough

import ray
from ray.util import ActorPool
import socket
import subprocess 
import random

@ray.remote
class RaySearchManager:
    """
    tracks the sub processes (fw, vw, etc.) opened by the search tasks, running on all the cluster nodes.
    used in order to clean the cluster from processes in case of early termination.
    """
    def __init__(self):
        self.processes = {}
​
    def notify_process_start(self, host: str, pid: str):
        self.processes[host] = [pid] if not self.processes.get(host) else self.processes[host] + [pid]
​
    def notify_process_end(self, host: str, pid: str):
        try:
            self.processes.get(host).remove(pid)
        except ValueError:
            # todo: log it somehow
            pass
​
​
@ray.remote(num_cpus=1)
class TrainAndEvaluateModelActor:
​
    def __init__(self, manager, actor_id):
        self.hostname = socket.gethostname()
        self.actor_id = actor_id
        self.manager = manager
​
    def run(self, command):
        process = subprocess.Popen(command, shell=True, encoding="utf-8", stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                                   executable='/bin/bash')
​
        self.manager.notify_process_start.remote(self.hostname, process.pid)
        # wait for the process to end.
        # capture stdout and return it
        all_output = []
        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            else:
                all_output.append(output)
​
        self.manager.notify_process_end.remote(self.hostname, process.pid)
        return self.actor_id, all_output
​
​
if __name__ == "__main__":
    ray_head_node = ""  # todo: insert actual node name
    num_actors = 100    # todo: set the default according to the cluster capacity. each actor requires 1 cpu
    # todo: replace with a bash cmd that imitates the training process.
    # our long running command runs between 1 and 10 hours (depending on the data size), utilizing 1 CPU.
    long_running_cmd = ""
    ray.init(address=f'ray://{ray_head_node}:10001')
​
    manager_actor = RaySearchManager.options(num_cpus=0, name="manager", lifetime="detached").remote()
    actors = [TrainAndEvaluateModelActor.remote(manager_actor, i) for i in range(0, num_actors)]
    actors_pool = ActorPool(actors)
​
    while True:
        results = actors_pool.map(lambda a, _: a.run.remote(command=long_running_cmd),
                                  list(range(0, random.randint(20,300))))

One of possibilities is your actor task is stuck for some reason. Since the actor can run only 1 task at a time, if this happens you can have an issue.

Is there any way to debug if the run function stuck somehow?

Hello
It doesn’t seem stuck
I’ll try to add logs to the task for start and end
Although, if i change the code to simple version,
without actors.

@ray.remote
def run_commands(cmd_data):
     # do work here

#====== in main flow 
 ray_res_futures = [run_commands.remote(cmd_data) for cmd_data in mutations_for_search]
    try:      
        ray_res = ray.get(ray_res_futures)
        [self.update_res(r) for r in ray_res]
        logger.info(f'Finished Evaluation for {len(mutations_for_search)} mutations')
    except Exception as e:
        print(f"Error in ray.get exception: {e}")
        raise e

It works and not stucks