Ray job is stuck when node worker runs on is killed

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I have a RAY cluster setup on an SGE cluster that I basically manage manual, but each ray worker runs on a separate node in our SGE cluster, with some cap on the memory the worker can use (But NEVER is the same as the memory (or the cores, for that matter) that are ON the physical node)

I have a farily complicated set of ray tasks, which take a lot of memory and occasionally seem to result in the SGE job that runs the ray worker to be killed.
Ray does seem to notice that, and informs me in the logs like this:

The node with node id: c8ecf1ba58c847c9d4a24de1d6001c28565e68d40c39559e9d4287bd and ip: 10.112.80.130 has been marked dead because the detector has missed too many heartbeats from it.
 This can happen when a raylet crashes unexpectedly or has lagging heartbeats.

But this does seem to result in the ray task to be marked as failed and the task that waits for that task to be finished, never finishes and seems to be stuck and never times out…

This is the toy example of the workflow I am doing, and each of the tasks in this list runs on a DIFFERENT node (I use placement groups)…I used the fault tolerance from the docs to test, but a worker dying is NOT the same behavior (I think) as the RAYLEY or complete NODE dying, but it shows the workflow that I want to run…

import ray, time
failure_probability = 0.3
max_retries = 5
@ray.remote(max_retries=max_retries)
def third(file_list, c):
    time.sleep(0.2)
    if np.random.random() < failure_probability:
        os._exit(0)
    return file_list[0] + '__' + file_list[1] + '__' + str(c)

@ray.remote(max_retries=max_retries)
def second(p, s):
    time.sleep(0.2)
    if np.random.random() < failure_probability:
        os._exit(0)
    return p + str(s)

@ray.remote(max_retries=max_retries)
def first(p, snps):
    time.sleep(0.2)
    if np.random.random() < failure_probability:
        os._exit(0)
    res = []
    for s in snps:
        res.append(second.remote(p,s))
    file_list = ray.get(res)
    
    #Second loop
    res = []
    for c in range(10):
        res.append(third.remote(file_list,c))
    return ray.get(res)

def main(phenos, snps):
    res = []
    for p in phenos:
        res.append(first.remote(p, snps))
    return ray.get(res)

print(main(phenos=['a','b','c'], snps=[1,10]))

Results in

A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e494c7d3c0068ea4c7c1776720910b831040fbd911000000 Worker ID: 2bf54ae7c100622293c2c80281b050a1827074cd40a04d9c572e63ac Node ID: c33f2a707fa8394987e26b47e99efc3cb475d0ced41635f707fe6565 Worker IP address: 10.112.80.183 Worker port: 10079 Worker PID: 185886
A worker [ETC..]
.
.
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e6b9016859782817b3463310f9551941dec7b83f11000000 Worker ID: dfbe122b005e2e3ce632ef2a5bf666e400a319950a9afc04365f7474 Node ID: c33f2a707fa8394987e26b47e99efc3cb475d0ced41635f707fe6565 Worker IP address: 10.112.80.183 Worker port: 10086 Worker PID: 223938
[['a1__a10__0', 'a1__a10__1', 'a1__a10__2', 'a1__a10__3', 'a1__a10__4', 'a1__a10__5', 'a1__a10__6', 'a1__a10__7', 'a1__a10__8', 'a1__a10__9'], ['b1__b10__0', 'b1__b10__1', 'b1__b10__2', 'b1__b10__3', 'b1__b10__4', 'b1__b10__5', 'b1__b10__6', 'b1__b10__7', 'b1__b10__8', 'b1__b10__9'], ['c1__c10__0', 'c1__c10__1', 'c1__c10__2', 'c1__c10__3', 'c1__c10__4', 'c1__c10__5', 'c1__c10__6', 'c1__c10__7', 'c1__c10__8', 'c1__c10__9']]

This runs fine in most cases, being able to restart the failed jobs, but I don’t know how I can simulate the node dying, other than just killing the job…

Is there any timeout that can be set for the ray.get step to restart or give up on getting results for a task?

You can try using the timeout parameter in ray.get, which will throw an exception if the results aren’t ready by the time the timeout passes. Here’s the documentation for that!

However, it does also seem like there may be an issue with your cluster setup or possibly a bug in Ray core. Ray should always guarantee that you receive an error for tasks once the max_retries has been used up. Also, node failures are treated the same way as worker process failures.

So we can also try to figure out why your tasks are hanging, as this is unexpected. It may have something to do with placement groups or resource utilization - perhaps after the node failures, there aren’t enough resources left to place the tasks? Do you see any other errors in your driver output? It would also help if you could provide the logs in /tmp/ray/session_latest and describe more about your use case, particularly around placement groups.

I can confirm that when killing a node, the job is indeed restart, so what is happening for my stuck jobs is not clear but not due to some issue in Ray with the node dying logic, which is good. I did a little test with my code where I kill the QSUB job and the job still finished correctly.

One more observation…In the (classic) task monitor, I can see that the number of hosts sometimes decreases but that the ray.nodes() call to the client still records the original number of nodes, so there is still something not quite right…

For my actual work, I can usually just restart my job and since I continues where it left off, I don’t need to rerun everything, but understanding why jobs get stuck is still good to know, althought I will use the timeout so at least that will hopefully solve the immediate issue…

Hmm it’s possible the ray.nodes() discrepancy is because the cluster state takes some time to converge.

If you can reproduce the issue right now, you could try checking where your application is getting stuck. This docs page on debugging might be useful to look at. Here are some relevant tools you can try:

  • ray memory CLI will tell you which ObjectRefs are currently in scope and which are still pending execution.
  • Passing the OS environment variable RAY_record_ref_creation_sites=1 to Ray will provide more information in the above output about which tasks created which ObjectRefs.
  • ray stack CLI will tell you where in Python the current processes are, including the application driver and any task workers.