Ray worker dies when reading multiple parquet files

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

HI all, following my previous post [Dataset] Ray Dataset reading multiple parquet files with different columns crashes due to TProtocolException: Exceeded size limit. I resolved this problem by using tensors as one aggregated column containing all features.

However, I am now experiencing another error that I do not quite understand.

The inital parquet files contain 1 row and two coluns (value tensor and id). I am aggregating batches of 1000 files together with ray.data.read_parquet_bulk(list_of_files) before writing them to a temporary folder. This process is repeated until there is less than 1000 files into the temporary folder and the aggregated parquet dataset is written to it’s official directory.

This worked well (and fast) for datasets of 3 000 and 60 000 rows, but when scaling to a dataset of 600 000 rows I get the following error during files read :

Read progress:  48%|████▊     | 61/128 [00:11<00:28,  2.38it/s][2022-11-16 02:21:18,511 E 3303412 3310082] core_worker.cc:528: :info_message: Attempting to recover 14943 lost objects by resubmitting their tasks. To disable object reconstruction, set @ray.remote(max_retries=0).
2022-11-16 02:21:25,730 WARNING worker.py:1839 -- The node with node id: 3ac9adb35b49cf63385cc372619491d1d4504926f781c2ea0f50fc16 and address: 10.80.46.16 and node name: 10.80.46.16 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a       (1) raylet crashes unexpectedly (OOM, preempted node, etc.) 
        (2) raylet has lagging heartbeats due to slow network or busy workload.
2022-11-16 02:21:27,092 WARNING worker.py:1839 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 9bb9a38887e223f2b92fe652f420eedb7d2f6ff301000000 Worker ID: 1bf8d49808270947c93bb98969383ef15c3e01ccdcc669146e4de1d3 Node ID: 3ac9adb35b49cf63385cc372619491d1d4504926f781c2ea0f50fc16 Worker IP address: 10.80.46.16 Worker port: 40321 Worker PID: 3310084 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2022-11-16 02:21:27,094 WARNING worker.py:1839 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 0e1936ac6bfb37d3024a1161c15fa2852271df3e01000000 Worker ID: ef9c9e55b0a662bf728dc7bd5874bc493f0e439ce9cee89aea1e37d6 Node ID: 3ac9adb35b49cf63385cc372619491d1d4504926f781c2ea0f50fc16 Worker IP address: 10.80.46.16 Worker port: 44791 Worker PID: 3310163 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2022-11-16 02:21:27,094 WARNING worker.py:1839 -- A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 20cd39be7fccf774e272d3538932c7ddfa5b1a6301000000 Worker ID: ced6257b58225cfa57e7f285cb7343d60d73f6efe3563e945da88492 Node ID: 3ac9adb35b49cf63385cc372619491d1d4504926f781c2ea0f50fc16 Worker IP address: 10.80.46.16 Worker port: 44161 Worker PID: 3310166 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
(raylet) [2022-11-16 02:21:27,161 C 3310002 3310064] (raylet) node_manager.cc:173: This node has beem marked as dead.
(raylet) *** StackTrace Information ***
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x49beda) [0x5621a585deda] ray::operator<<()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x49d9b2) [0x5621a585f9b2] ray::SpdLogMessage::Flush()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x49dcc7) [0x5621a585fcc7] ray::RayLog::~RayLog()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x241d74) [0x5621a5603d74] std::_Function_handler<>::_M_invoke()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x375bd4) [0x5621a5737bd4] std::_Function_handler<>::_M_invoke()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x3cabe0) [0x5621a578cbe0] ray::rpc::GcsRpcClient::ReportHeartbeat()::{lambda()#2}::operator()()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x373a42) [0x5621a5735a42] ray::rpc::ClientCallImpl<>::OnReplyReceived()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x228285) [0x5621a55ea285] std::_Function_handler<>::_M_invoke()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x47fcf6) [0x5621a5841cf6] EventTracker::RecordExecution()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x42031e) [0x5621a57e231e] std::_Function_handler<>::_M_invoke()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x420796) [0x5621a57e2796] boost::asio::detail::completion_handler<>::do_complete()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9adbcb) [0x5621a5d6fbcb] boost::asio::detail::scheduler::do_run_one()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9af391) [0x5621a5d71391] boost::asio::detail::scheduler::run()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9af5c0) [0x5621a5d715c0] boost::asio::io_context::run()
(raylet) /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9fe2d0) [0x5621a5dc02d0] execute_native_thread_routine
(raylet) /lib/x86_64-linux-gnu/libpthread.so.0(+0x8609) [0x14df3a05a609] start_thread
(raylet) /lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x14df39c29163] __clone
(raylet) 
2022-11-16 02:21:28,339 WARNING worker.py:1839 -- Raylet is terminated: ip=10.80.46.16, id=3ac9adb35b49cf63385cc372619491d1d4504926f781c2ea0f50fc16. Termination is unexpected. Possible reasons include: (1) SIGKILL by the user or system OOM killer, (2) Invalid memory access from Raylet causing SIGSEGV or SIGBUS, (3) Other termination signals. Last 20 lines of the Raylet logs:
    [2022-11-16 02:21:27,161 C 3310002 3310064] (raylet) node_manager.cc:173: This node has beem marked as dead.
    *** StackTrace Information ***
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x49beda) [0x5621a585deda] ray::operator<<()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x49d9b2) [0x5621a585f9b2] ray::SpdLogMessage::Flush()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x49dcc7) [0x5621a585fcc7] ray::RayLog::~RayLog()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x241d74) [0x5621a5603d74] std::_Function_handler<>::_M_invoke()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x375bd4) [0x5621a5737bd4] std::_Function_handler<>::_M_invoke()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x3cabe0) [0x5621a578cbe0] ray::rpc::GcsRpcClient::ReportHeartbeat()::{lambda()#2}::operator()()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x373a42) [0x5621a5735a42] ray::rpc::ClientCallImpl<>::OnReplyReceived()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x228285) [0x5621a55ea285] std::_Function_handler<>::_M_invoke()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x47fcf6) [0x5621a5841cf6] EventTracker::RecordExecution()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x42031e) [0x5621a57e231e] std::_Function_handler<>::_M_invoke()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x420796) [0x5621a57e2796] boost::asio::detail::completion_handler<>::do_complete()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9adbcb) [0x5621a5d6fbcb] boost::asio::detail::scheduler::do_run_one()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9af391) [0x5621a5d71391] boost::asio::detail::scheduler::run()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9af5c0) [0x5621a5d715c0] boost::asio::io_context::run()
    /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet(+0x9fe2d0) [0x5621a5dc02d0] execute_native_thread_routine
    /lib/x86_64-linux-gnu/libpthread.so.0(+0x8609) [0x14df3a05a609] start_thread
    /lib/x86_64-linux-gnu/libc.so.6(clone+0x43) [0x14df39c29163] __clone

This feels like a Worker OOM kill event. However, I am runing this code on a cluster of 64 CPU cores with 249G RAM. I have been able to lookup the overall memory usage of the compute job and only 27.76 GB (11.15% of the cluster’s capacity) has been used.

My python 3.8.10 environment is located in a Singularity container.
This environment uses Ray 2.1.0 and pyarrow 6.0.1 as well as a lot of other python packages.

Does anyone have any idea what this error means and how to resolve it?

When investigating the size of the memory available to ray in this cluster using ray, I get a size of 7312362702 which I do not know how to interpret on a GB scale…

Also, I am currently running a test to aggregate all files at once when using ray.data.read_parquet_bulk() to see if the error is still produced.

Aggregating all files at ounce using ray.data.read_parquet_bulk() has worked for aggregating 600 000 files into a ray dataset in this way, maybe splitting the read by batches saturated the worker’s memory at some point?

Hi @nicdemon, thanks for pushing on this! To ensure that I understand your use case:

  1. you have a file per row, totaling in ~6 million samples/files and we’re working with a 600k subset here,
  2. each row can contain some subset of ~1 trillion potential columns,
  3. you’re wanting to read each of these single-sample files, concatenate them such that the full span of columns in the ~6 million samples is represented in the table, then perform some parallel transformations on them.

First, let me know if that’s right, second, I have some questions:

  1. Do you know how many columns are contained in those 6 million samples? I’m sure you’re going to run into a bunch of performance, memory, and API issues with anything beyond 100k-1M columns at the Parquet, Arrow, and Datasets levels. Once you go past 2^31 columns, you’re going to be running into issues with e.g. Arrow representing the number of columns in a 32 bit integer, and misc. things like that.
  2. Can you give any more details on the transformation that requires a global view of all columns?

Hi @Clark_Zinzow you have perfectly described the problem I encountered in the first part of your reply.

For the second part of your message, I have around 1 million feature columns in any datasets I have processed so far. The transformations I am applying are from the ray.data.preprocessors module, I apply a Chain of SimpleImputer, MinMaxScaler and Concatenator on the features and a Chain of LabelEncoder and OneHotEncoder on the classes in a classifying context.

Are you saying that it is possible to apply those preprocessors to a tensor column in ray?