Tasks are completed but ray.exceptions.WorkerCrashedError

Hi there,

I built a Ray cluster on K8s using Helm then I was running hundreds of tasks on K8s.

example-values.yaml

image: rayproject/ray:1.12.0-py38
headPodType: rayHeadType
podTypes:
  rayHeadType:
    CPU: 2
    memory: 4Gi
    rayResources: {}
    nodeSelector: {}

  rayWorkerType:
    minWorkers: 1
    maxWorkers: 4
    memory: 16Gi
    CPU: 4
    GPU: 0
    rayResources: {}
    nodeSelector: {}

Each task read a file from the Google Cloud Storage bucket and does some ETL then wrote back to the Google Cloud Storage bucket. The remote function simply returns a string that shows which file has been processed. The example code shows below:

@ray.remote
def etl_function(file_path: str, destination_path: str):
    df_raw = pd.read_csv(
        file_path,
        lineterminator="\n",
        names=["data"],
    )
    ......some ETL......
    df.to_parquet(path=f"{destination_path}.parquet", index=False)
    return f"{file_path} has been processed."


if __name__ == "__main__":
    ray.init(address="ray://xxxxxx:10001")

    files = [
        ......,
        ......,
        ......,
    ]

    obj_ids = []
    for file in files:
        file_path = f"gs://{source_bucket_name}/{file}"
        destination_path = f"gs://{dest_bucket_name}/{dest_prefix}/{file.stem}"
        obj_ids.append(etl_function.remote(file_path, destination_path))

    logging.info(f"Number of tasks: {len(obj_ids)}")
    logging.info(ray.get(obj_ids))

After 100 tasks are done, I can see 100 new files are uploaded to GCS bucket. However, when I tried to get the result using ray.get(), it shows ray.exceptions.WorkerCrashedError.

Traceback (most recent call last):
  File "some_etl_process.py", line 146, in <module>
    logging.info(ray.get(obj_ids))
  File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/util/client/api.py", line 43, in get
    return self.worker.get(vals, timeout=timeout)
  File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 433, in get
    res = self._get(to_get, op_timeout)
  File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 461, in _get
    raise err
ray.exceptions.WorkerCrashedError: The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.

In addition, I noted if the task was assigned to the Head node, the work runs that task would easily die or be killed. Maybe, this is because the CPU/Memory of the Head node I set is too small. I am not sure.

10.160.3.56 is the IP of the Head node.

...
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c8c474d3228ce27a1eb1c3821e228fa62647546410000000 Worker ID: ea959ae4525f127711a25e26cb1db3a516028827c2aeb452cdc55128 Node ID: 978149570dc118a35cc3eeaf143c9461dc43322e946b7acc90109760 Worker IP address: 10.160.3.56 Worker port: 10140 Worker PID: 33061
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c3c38091160dbe22f45d32f6821e5c862b558c3910000000 Worker ID: 2f35daa948ac49dd9c54fb0dee74cdc8d5e4c836c945fea406970d1f Node ID: 978149570dc118a35cc3eeaf143c9461dc43322e946b7acc90109760 Worker IP address: 10.160.3.56 Worker port: 10141 Worker PID: 33209
...

The log is pretty long…

logs/python-core-worker-2f35daa948ac49dd9c54fb0dee74cdc8d5e4c836c945fea406970d1f_33209.log

1[2022-04-21 23:14:27,197 I 33209 33209] core_worker_process.cc:120: Constructing CoreWorkerProcess. pid: 332092[2022-04-21 23:14:27,203 I 33209 33209] grpc_server.cc:105: worker server started, listening on port 10141.3[2022-04-21 23:14:27,208 I 33209 33209] core_worker.cc:175: Initializing worker at address: 10.160.3.56:10141, worker ID 2f35daa948ac49dd9c54fb0dee74cdc8d5e4c836c945fea406970d1f, raylet 978149570dc118a35cc3eeaf143c9461dc43322e946b7acc901097604[2022-04-21 23:14:27,208 I 33209 33232] gcs_server_address_updater.cc:32: GCS Server updater thread id: 1397327847646725[2022-04-21 23:14:27,311 I 33209 33209] io_service_pool.cc:35: IOServicePool is running with 1 io_service.6[2022-04-21 23:14:27,312 I 33209 33236] core_worker.cc:494: Event stats:789Global stats: 17 total (10 active)10Queueing time: mean = 315.819 us, max = 1.320 ms, min = 249.762 us, total = 5.369 ms11Execution time:  mean = 32.074 us, total = 545.265 us12Event stats:13	PeriodicalRunner.RunFnPeriodically - 7 total (2 active, 1 running), CPU time: mean = 49.815 us, total = 348.707 us14	UNKNOWN - 3 total (3 active), CPU time: mean = 0.000 s, total = 0.000 s15	WorkerInfoGcsService.grpc_client.AddWorkerInfo - 1 total (0 active), CPU time: mean = 13.717 us, total = 13.717 us16	GcsClient.deadline_timer.check_gcs_service_address - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s17	InternalPubSubGcsService.grpc_client.GcsSubscriberCommandBatch - 1 total (0 active), CPU time: mean = 182.841 us, total = 182.841 us18	NodeManagerService.grpc_client.ReportWorkerBacklog - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s19	InternalPubSubGcsService.grpc_client.GcsSubscriberPoll - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s20	NodeInfoGcsService.grpc_client.GetAllNodeInfo - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s21	CoreWorker.deadline_timer.flush_profiling_events - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s2223

I simply want to check the result using ray.get(), expecting a list of strings. Can anyone share hints?

Update some trials.

After I scaled up the Head node type and added rayResources: { "CPU": 0 }, I could successfully complete all tasks and get results by ray.get(obj_ids).
Even though I still got two times A worker died or was killed while executing... while 733 tasks were running, I was able to get results by ray.get(obj_ids).

example-values-after-change.yaml

podTypes:
  rayHeadType:
    CPU: 4
    memory: 16Gi
    rayResources: { "CPU": 0 }
    nodeSelector: {}

I wonder why I couldn’t get results by ray.get(obj_ids) when a task is assigned to a worker on the Head node and this worker dies or was killed?

Also, why I could get results by ray.get(obj_ids) when a worker dies or was killed on a Worker node?

Does there have a best practice for the settings of the Ray Head node?

Although I could get my result now, I still want to get any suggestions for this scenario.

Referring here to @Dmitri

For heavy workloads, a large head node annotated with rayResources: { "CPU": 0 } is a good idea.

If you’re seeing workers crashing, it might be a good idea to reduce the number of tasks that run concurrently on the workers by increasing the resource annotations on the task.
For example if a task is annotated @ray.remote(num_cpus=2) there will be at most 3 concurrent instances of that task on a CPU:6 worker node.
@ray.remote is equivalent to @ray.remote(num_cpus=1), which would result in up to 6 concurrent instances of the task.

In general, I recommend using a few large Ray pods vs many small ones – if possible, size the Ray pods to take up entire Kubernetes nodes.

Thanks for your suggestion, it’s helpful.

1 Like