How does Ray get over workers killing/revival?

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi guys,

During execution of my application I get multiple outputs as shown below but the application eventually finishes successfully.

2023-06-07 15:26:09,573 WARNING worker.py:1986 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 22179de6dc2c455333ceb9134808153a978130b701000000 Worker ID: 8f8a28f4577d835f654ca91d34a8be405673e391356328d674facf04 Node ID: 9bf0736de04e92d91400c935f3c91dcf59b2c7c79e380f52b23debeb Worker IP address: 127.0.0.1 Worker port: 34585 Worker PID: 3379557 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2023-06-07 15:26:26,015 WARNING worker.py:1986 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 60e10791a48f1099ed6eac1cf709f4dbe7276c9001000000 Worker ID: 0d1550078f210ce4879f29f04a6d76bf732a493f11a01a684fa3ddeb Node ID: 9bf0736de04e92d91400c935f3c91dcf59b2c7c79e380f52b23debeb Worker IP address: 127.0.0.1 Worker port: 38979 Worker PID: 3379502 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned.
2023-06-07 15:26:26,016 WARNING worker.py:1986 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 76c957a24fd2eebd6fe21843acce094cc441b94101000000 Worker ID: 9f79d0418c44641c973da97c12b4c68097b66ec276c91f7a0d476885 Node ID: 9bf0736de04e92d91400c935f3c91dcf59b2c7c79e380f52b23debeb Worker IP address: 127.0.0.1 Worker port: 35705 Worker PID: 3605410 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned.
(raylet) [2023-06-07 15:26:26,125 E 3379363 3379363] (raylet) node_manager.cc:3071: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 9bf0736de04e92d91400c935f3c91dcf59b2c7c79e380f52b23debeb, IP: 127.0.0.1) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 127.0.0.1`

My questions are: How does Ray handle a case when a worker dies, for instance, because of OOM? What if that worker kept some data? How is that data restored? Maybe there is some docs describing this?

To not get the outputs above I should probably rewrite my application but before that I would like to figure out how Ray is able to get over workers killing/revival.

Thanks in advance.

When a task fails, it is retried up to the configurable maximum number of retries [Task Fault Tolerance β€” Ray 2.4.0].

Moreover, regarding state,
" When a task returns a result in the Ray object store, it is possible for the resulting object to be lost after the original task has already finished. In these cases, Ray will also try to automatically recover the object by re-executing the tasks that created the object. This can be configured through the same max_retries option described here. See object fault tolerance for more information."

All the relevant state in the task should be kept as objects in the distributed object store [Objects β€” Ray 2.4.0]. If you need some additional global state, you should probably be using actors [Actors β€” Ray 2.4.0].

@vitsai, thanks for your response. I understand that when a task fails, it is retried up to the configurable maximum number of retries. My question is more about this clause.

β€œWhen a worker is executing a task, if the worker dies unexpectedly, either because the process crashed or because the machine failed, Ray will rerun the task until either the task succeeds or the maximum number of retries is exceeded.”

I.e., if a worker dies unexpectedly (completely exits if I understand correctly), will Ray create a new worker and submit the failed task to that worker?

I see. Workers themselves are stateless; state is stored in the Global Control Service (GCS).

When a worker dies, it is not recreated immediately. The tasks owned by the worker are canceled, but may be retried. When they are retried, a new worker may or may not be requested depending on several factors, such as whether there are currently idle workers for that particular task queue (task queues are determined by a variety of factors listed in SchedulingKey in direct_task_transport.h). If there are idle workers, or if the number of workers has reached some maximum, a new worker is not requested.

If you prefer to directly look at the source, it is in CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded in direct_task_transport.cc. The reply callback in the RequestWorkerLease call inside that function details the exact logic when a worker lease ends. (This is callback is ultimately triggered by the response sent from CancelAllTasksOwnedBy in cluster_task_manager after being triggered by HandleUnexpectedWorkerFailure in node_manager.cc, which is itself called based on a subscription to GCS).

You may also be interested in our architecture whitepaper, which has some details about fault tolerance for both nodes and workers: https://docs.google.com/document/d/1tBw9A4j62ruI5omIJbMxly-la5w4q_TjyJgJL_jN2fI/preview#

In particular,

Ray worker nodes are designed to be homogeneous, so that any single node may be lost without bringing down the entire cluster. The current exception to this is the head node, since it hosts the GCS. In 2.0, we have added experimental support for GCS fault tolerance, which allows the GCS to be restarted while minimizing disturbance to the rest of the cluster.

All nodes are assigned a unique identifier and communicate with each other through heartbeats. The GCS is responsible for deciding the membership of a cluster, i.e. which nodes are currently alive. The GCS tombstones any node ID that times out, meaning that a new raylet must be started on that node with a different node ID in order to reuse the physical resources. A raylet that is still alive exits if it hears that it has been timed out. Failure detection of a node currently does not handle network partitions: if a worker node is partitioned from the GCS, it will be timed out and marked as dead.

Each raylet reports the death of any local worker process to the GCS. The GCS broadcasts these failure events and uses them to handle actor death. All worker processes fate-share with the raylet on their node.

The raylets are responsible for preventing leaks in cluster resources and system state after individual worker process failures. For a worker process (local or remote) that has failed, each raylet is responsible for:

  • Freeing cluster resources, such as CPUs, needed for task execution. This is done by killing any workers that were leased to the failed worker (see Resource Fulfillment). Any outstanding resource requests made by the failed worker are also canceled.
  • Freeing any distributed object store memory used for objects owned by that worker (see Memory Management). This also cleans up the associated entries in the object directory.

cc: @jjyao and @Chen_Shen for anything I missed

1 Like

@vitsai, thank you for the detailed response. That makes sense to me.

@YarShev Now that you got all the information about how FT is handled, can we close this issue?

@Jules_Damji, sure, you can close it.