How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
I have a RAY cluster setup on an SGE cluster that I basically manage manual, but each ray worker runs on a separate node in our SGE cluster, with some cap on the memory the worker can use (But NEVER is the same as the memory (or the cores, for that matter) that are ON the physical node)
I have a farily complicated set of ray tasks, which take a lot of memory and occasionally seem to result in the SGE job that runs the ray worker to be killed.
Ray does seem to notice that, and informs me in the logs like this:
The node with node id: c8ecf1ba58c847c9d4a24de1d6001c28565e68d40c39559e9d4287bd and ip: 10.112.80.130 has been marked dead because the detector has missed too many heartbeats from it.
This can happen when a raylet crashes unexpectedly or has lagging heartbeats.
But this does seem to result in the ray task to be marked as failed and the task that waits for that task to be finished, never finishes and seems to be stuck and never times out…
This is the toy example of the workflow I am doing, and each of the tasks in this list runs on a DIFFERENT node (I use placement groups)…I used the fault tolerance from the docs to test, but a worker dying is NOT the same behavior (I think) as the RAYLEY or complete NODE dying, but it shows the workflow that I want to run…
import ray, time
failure_probability = 0.3
max_retries = 5
@ray.remote(max_retries=max_retries)
def third(file_list, c):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return file_list[0] + '__' + file_list[1] + '__' + str(c)
@ray.remote(max_retries=max_retries)
def second(p, s):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
return p + str(s)
@ray.remote(max_retries=max_retries)
def first(p, snps):
time.sleep(0.2)
if np.random.random() < failure_probability:
os._exit(0)
res = []
for s in snps:
res.append(second.remote(p,s))
file_list = ray.get(res)
#Second loop
res = []
for c in range(10):
res.append(third.remote(file_list,c))
return ray.get(res)
def main(phenos, snps):
res = []
for p in phenos:
res.append(first.remote(p, snps))
return ray.get(res)
print(main(phenos=['a','b','c'], snps=[1,10]))
Results in
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e494c7d3c0068ea4c7c1776720910b831040fbd911000000 Worker ID: 2bf54ae7c100622293c2c80281b050a1827074cd40a04d9c572e63ac Node ID: c33f2a707fa8394987e26b47e99efc3cb475d0ced41635f707fe6565 Worker IP address: 10.112.80.183 Worker port: 10079 Worker PID: 185886
A worker [ETC..]
.
.
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e6b9016859782817b3463310f9551941dec7b83f11000000 Worker ID: dfbe122b005e2e3ce632ef2a5bf666e400a319950a9afc04365f7474 Node ID: c33f2a707fa8394987e26b47e99efc3cb475d0ced41635f707fe6565 Worker IP address: 10.112.80.183 Worker port: 10086 Worker PID: 223938
[['a1__a10__0', 'a1__a10__1', 'a1__a10__2', 'a1__a10__3', 'a1__a10__4', 'a1__a10__5', 'a1__a10__6', 'a1__a10__7', 'a1__a10__8', 'a1__a10__9'], ['b1__b10__0', 'b1__b10__1', 'b1__b10__2', 'b1__b10__3', 'b1__b10__4', 'b1__b10__5', 'b1__b10__6', 'b1__b10__7', 'b1__b10__8', 'b1__b10__9'], ['c1__c10__0', 'c1__c10__1', 'c1__c10__2', 'c1__c10__3', 'c1__c10__4', 'c1__c10__5', 'c1__c10__6', 'c1__c10__7', 'c1__c10__8', 'c1__c10__9']]
This runs fine in most cases, being able to restart the failed jobs, but I don’t know how I can simulate the node dying, other than just killing the job…
Is there any timeout that can be set for the ray.get step to restart or give up on getting results for a task?