Hi there,
I built a Ray cluster on K8s using Helm then I was running hundreds of tasks on K8s.
example-values.yaml
image: rayproject/ray:1.12.0-py38
headPodType: rayHeadType
podTypes:
rayHeadType:
CPU: 2
memory: 4Gi
rayResources: {}
nodeSelector: {}
rayWorkerType:
minWorkers: 1
maxWorkers: 4
memory: 16Gi
CPU: 4
GPU: 0
rayResources: {}
nodeSelector: {}
Each task read a file from the Google Cloud Storage bucket and does some ETL then wrote back to the Google Cloud Storage bucket. The remote function simply returns a string that shows which file has been processed. The example code shows below:
@ray.remote
def etl_function(file_path: str, destination_path: str):
df_raw = pd.read_csv(
file_path,
lineterminator="\n",
names=["data"],
)
......some ETL......
df.to_parquet(path=f"{destination_path}.parquet", index=False)
return f"{file_path} has been processed."
if __name__ == "__main__":
ray.init(address="ray://xxxxxx:10001")
files = [
......,
......,
......,
]
obj_ids = []
for file in files:
file_path = f"gs://{source_bucket_name}/{file}"
destination_path = f"gs://{dest_bucket_name}/{dest_prefix}/{file.stem}"
obj_ids.append(etl_function.remote(file_path, destination_path))
logging.info(f"Number of tasks: {len(obj_ids)}")
logging.info(ray.get(obj_ids))
After 100 tasks are done, I can see 100 new files are uploaded to GCS bucket. However, when I tried to get the result using ray.get()
, it shows ray.exceptions.WorkerCrashedError
.
Traceback (most recent call last):
File "some_etl_process.py", line 146, in <module>
logging.info(ray.get(obj_ids))
File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
return getattr(ray, func.__name__)(*args, **kwargs)
File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/util/client/api.py", line 43, in get
return self.worker.get(vals, timeout=timeout)
File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 433, in get
res = self._get(to_get, op_timeout)
File "/opt/conda/envs/ray/lib/python3.8/site-packages/ray/util/client/worker.py", line 461, in _get
raise err
ray.exceptions.WorkerCrashedError: The worker died unexpectedly while executing this task. Check python-core-worker-*.log files for more information.
In addition, I noted if the task was assigned to the Head node, the work runs that task would easily die or be killed. Maybe, this is because the CPU/Memory of the Head node I set is too small. I am not sure.
10.160.3.56
is the IP of the Head node.
...
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c8c474d3228ce27a1eb1c3821e228fa62647546410000000 Worker ID: ea959ae4525f127711a25e26cb1db3a516028827c2aeb452cdc55128 Node ID: 978149570dc118a35cc3eeaf143c9461dc43322e946b7acc90109760 Worker IP address: 10.160.3.56 Worker port: 10140 Worker PID: 33061
A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: c3c38091160dbe22f45d32f6821e5c862b558c3910000000 Worker ID: 2f35daa948ac49dd9c54fb0dee74cdc8d5e4c836c945fea406970d1f Node ID: 978149570dc118a35cc3eeaf143c9461dc43322e946b7acc90109760 Worker IP address: 10.160.3.56 Worker port: 10141 Worker PID: 33209
...
The log is pretty long…
logs/python-core-worker-2f35daa948ac49dd9c54fb0dee74cdc8d5e4c836c945fea406970d1f_33209.log
1[2022-04-21 23:14:27,197 I 33209 33209] core_worker_process.cc:120: Constructing CoreWorkerProcess. pid: 332092[2022-04-21 23:14:27,203 I 33209 33209] grpc_server.cc:105: worker server started, listening on port 10141.3[2022-04-21 23:14:27,208 I 33209 33209] core_worker.cc:175: Initializing worker at address: 10.160.3.56:10141, worker ID 2f35daa948ac49dd9c54fb0dee74cdc8d5e4c836c945fea406970d1f, raylet 978149570dc118a35cc3eeaf143c9461dc43322e946b7acc901097604[2022-04-21 23:14:27,208 I 33209 33232] gcs_server_address_updater.cc:32: GCS Server updater thread id: 1397327847646725[2022-04-21 23:14:27,311 I 33209 33209] io_service_pool.cc:35: IOServicePool is running with 1 io_service.6[2022-04-21 23:14:27,312 I 33209 33236] core_worker.cc:494: Event stats:789Global stats: 17 total (10 active)10Queueing time: mean = 315.819 us, max = 1.320 ms, min = 249.762 us, total = 5.369 ms11Execution time: mean = 32.074 us, total = 545.265 us12Event stats:13 PeriodicalRunner.RunFnPeriodically - 7 total (2 active, 1 running), CPU time: mean = 49.815 us, total = 348.707 us14 UNKNOWN - 3 total (3 active), CPU time: mean = 0.000 s, total = 0.000 s15 WorkerInfoGcsService.grpc_client.AddWorkerInfo - 1 total (0 active), CPU time: mean = 13.717 us, total = 13.717 us16 GcsClient.deadline_timer.check_gcs_service_address - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s17 InternalPubSubGcsService.grpc_client.GcsSubscriberCommandBatch - 1 total (0 active), CPU time: mean = 182.841 us, total = 182.841 us18 NodeManagerService.grpc_client.ReportWorkerBacklog - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s19 InternalPubSubGcsService.grpc_client.GcsSubscriberPoll - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s20 NodeInfoGcsService.grpc_client.GetAllNodeInfo - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s21 CoreWorker.deadline_timer.flush_profiling_events - 1 total (1 active), CPU time: mean = 0.000 s, total = 0.000 s2223
I simply want to check the result using ray.get()
, expecting a list of strings. Can anyone share hints?