Hello @sjl,
Took some time to get back to this case but here are more details.
Somewhat redacted code (example.py
):
import pyarrow
import ray
from xgboost_ray import RayDMatrix
from xgboost_ray import RayXGBClassifier
N_ACTORS = 10
def get_hdfs_data():
hdfs = pyarrow.fs.HadoopFileSystem('default')
path = '/HDFS_PATH/'
print("Reading parquet from HDFS...")
dataset = ray.data.read_parquet(path, filesystem=hdfs)
bad_cols = [COL1, COL2....]
print("Dropping cols...")
dataset.drop_columns(bad_cols)
sample = dataset.take(limit=5)
print(f"Dataset instantiated. Sampled {len(sample)} rows successfully")
return dataset
if __name__ == "__main__":
ray.init(_temp_dir='/tmp')
ray.data.DataContext.get_current().execution_options.verbose_progress = True
print(f'Instantiating RayDMatrix ...')
train_dmatrix = RayDMatrix(get_hdfs_data(),
label='outcome',
num_actors=N_ACTORS)
print(f'Instantiating RayXGBClassifier...')
clf = RayXGBClassifier(n_jobs=N_ACTORS, num_class=2)
print(f'Training RayXGBClassifier...')
clf.fit(train_dmatrix, None, ray_params={'num_actors': N_ACTORS})
print(f'Finished training RayXGBClassifier')
Skein job YAML (skein_job.yaml
):
name: ray_yarn
queue: Q
services:
ray-head:
instances: 1
resources:
vcores: 10
memory: 32 GiB
files:
example.py: example.py
ray_conda_env:
source: ray_conda_env.tar.gz
type: archive
script: |
source ray_conda_env/bin/activate
skein kv put current --key=RAY_HEAD_ADDRESS --value=$(hostname -i)
# 8GB object store; 32GB RAM
ray start --head --port=6379 --object-store-memory=8000000000 --memory 32000000000 --num-cpus=10
python example.py
ray stop
skein application shutdown current
ray-worker:
files:
example.py: example.py
ray_conda_env:
source: /home/USER/ray_conda_env.tar.gz
type: archive
instances: 10
resources:
vcores: 2
memory: 16 GiB
depends:
- ray-head
script: |
source ray_conda_env/bin/activate
RAY_HEAD_ADDRESS=$(skein kv get --key=RAY_HEAD_ADDRESS current)
# 8GB object store; 16GB RAM
ray start --object-store-memory=8000000000 --memory 16000000000 --num-cpus=2 --address=$RAY_HEAD_ADDRESS:6379 --block; ray stop
This is submitted to Yarn by: skein submit skein_job.yaml
.
Then, after executing on the cluster we get the following outputs:
ray-head.log
:
...
2023-10-14 16:22:29,004 SUCC scripts.py:781 -- --------------------
2023-10-14 16:22:29,005 SUCC scripts.py:782 -- Ray runtime started.
2023-10-14 16:22:29,008 SUCC scripts.py:783 -- --------------------
...
Dataset instantiated. Sampled 5 rows successfully
Instantiating RayXGBClassifier...
Training RayXGBClassifier...
(_execute_read_task_split pid=399645) 23/10/14 16:23:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(_execute_read_task_split pid=399645) 23/10/14 16:23:15 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Read progress 0: 0%| | 0/23 [00:00<?, ?it/s]
End of LogType:ray-head.log
application.master.log
:
...
23/10/14 16:22:09 INFO skein.ApplicationMaster: RUNNING: ray-worker_7 on container_e2964_1695734152961_32120_01_000010
23/10/14 16:22:09 INFO skein.ApplicationMaster: Starting container_e2964_1695734152961_32120_01_000011...
23/10/14 16:22:09 INFO skein.ApplicationMaster: RUNNING: ray-worker_8 on container_e2964_1695734152961_32120_01_000011
23/10/14 16:22:09 INFO skein.ApplicationMaster: Starting container_e2964_1695734152961_32120_01_000012...
23/10/14 16:22:09 INFO skein.ApplicationMaster: RUNNING: ray-worker_9 on container_e2964_1695734152961_32120_01_000012
23/10/14 16:23:24 DEBUG skein.ApplicationMaster: Received 1 completed containers
23/10/14 16:23:24 WARN skein.ApplicationMaster: FAILED: ray-head_0 - Container killed by YARN for exceeding memory limits
23/10/14 16:23:24 INFO skein.ApplicationMaster: Shutting down: Failure in service ray-head, see logs for more information.
23/10/14 16:23:24 INFO skein.ApplicationMaster: Unregistering application with status FAILED
23/10/14 16:23:24 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
23/10/14 16:23:25 INFO skein.ApplicationMaster: Deleted application directory hdfs:/XXXX
23/10/14 16:23:25 INFO skein.ApplicationMaster: WebUI server shut down
23/10/14 16:23:25 INFO skein.ApplicationMaster: gRPC server shut down
So, judging by the above, it looks like:
- The data can be accessed
RayDMatrix
instantiation succeeds
- At some point inside
clf.fit()
data read results in ray-head_0 - Container killed by YARN for exceeding memory limits
. No useful stack trace though.
Regarding the dataset being loaded, both dataset.stats()
and hdfs
are in agreement that it’s ~17GB on disk (there’s been some changes since the original post but the problem remains):
du
output:
hdfs dfs -du -s -h /HDFS_PATH
17.7 G 53.1 G /HDFS_PATH
Partitions on HDFS:
hdfs dfs -ls -h /HDFS_PATH
Found 24 items
-rw-rw----+ 3 XX hive 0 2023-08-XX 10:10 /HDFS_PATH/_SUCCESS
-rw-rw----+ 3 XX hive 789.3 M 2023-08-XX 10:09 /HDFS_PATH/part-00000-abe25889-4961-431f-9cf3-e249a6e89c63-c000.snappy.parquet
-rw-rw----+ 3 XX hive 787.2 M 2023-08-XX 10:09 /HDFS_PATH/part-00001-abe25889-4961-431f-9cf3-e249a6e89c63-c000.snappy.parquet
...
Dataset stats from an interactive ipython
session seems to agree on the total bytes:
In [8]: dataset.stats()
Out[8]: "Stage 1 ReadParquet->SplitBlocks(64): 64/64 blocks executed in 60.83s
* Remote wall time: 93.01ms min, 3.31s max, 618.34ms mean, 39.57s total
* Remote cpu time: 121.44ms min, 7.41s max, 1.18s mean, 75.51s total
* Peak heap memory usage (MiB): 7619.55 min, 20669.95 max, 16961 mean
* Output num rows: 1321 min, 16725 max, 15563 mean, 996037 total
* Output size bytes: 22808287 min, 288746235 max, 268686580 mean, 17195941141 total
* Tasks per node: 64 min, 64 max, 64 mean; 1 nodes used
* Extra metrics: {'obj_store_mem_alloc': 17195941141,
'obj_store_mem_freed': 17316948255,
'obj_store_mem_peak': 17316948255,
'ray_remote_args': {'num_cpus': 1,
'scheduling_strategy': 'SPREAD'}}
Dataset iterator time breakdown:
* Total time user code is blocked: 93.72ms
* Total time in user code: 636.83ms
* Num blocks local: 0
* Num blocks remote: 0
* Num blocks unknown location: 0
* Batch iteration time breakdown (summed across prefetch threads):
* In ray.get(): 87.5ms min, 657.49ms max, 372.49ms avg, 744.99ms total
* In batch creation: 3.78us min, 3.97us max, 3.88us avg, 7.75us total
* In batch formatting: 5.23us min, 8.15us max, 6.69us avg, 13.38us total
EDIT: addendum to the above:
To sanity check the general setup, loading a smaller (80 M over 10 partitions; 10M rows) table from HDFS works fine:
hdfs dfs -ls -h '/SMALL_TABLE_PATH'
Found 10 items
-rw-rw----+ 3 XX hive 8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000000_000000.parquet
-rw-rw----+ 3 XX hive 8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000001_000000.parquet
-rw-rw----+ 3 XX hive 8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000002_000000.parquet
-rw-rw----+ 3 XX hive 8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000003_000000.parquet
...
Tail of ray-head.log
after training on the smaller table:
...
Training RayXGBClassifier...
(_execute_read_task_split pid=326192) 23/10/14 17:54:21 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
(_execute_read_task_split pid=326191) 23/10/14 17:54:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)
(_execute_read_task_split pid=326191) 23/10/14 17:54:22 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. [repeated 3x across cluster]
2023-10-14 17:54:23,407 INFO main.py:1191 -- [RayXGBoost] Starting XGBoost training.
(_RemoteRayXGBoostActor pid=330293) [17:54:23] task [xgboost.ray]:140147885452640 got new rank 0
2023-10-14 17:54:34,967 INFO main.py:1708 -- [RayXGBoost] Finished XGBoost training on training data with total N=10,000,000 in 14.98 seconds (11.55 pure XGBoost training time).
Finished training RayXGBClassifier
application.master.log
also indicates a healthy termination:
...
23/10/14 17:54:44 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
23/10/14 17:54:44 DEBUG skein.ApplicationMaster: Stopping allocator thread
23/10/14 17:54:44 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
23/10/14 17:54:44 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
23/10/14 17:54:45 INFO skein.ApplicationMaster: Deleted application directory hdfs://xxxx
23/10/14 17:54:45 INFO skein.ApplicationMaster: WebUI server shut down
23/10/14 17:54:45 INFO skein.ApplicationMaster: gRPC server shut down
It would appear that the size of the original table is the cause of the problem.
Finally, versions being used:
# python env:
ray==2.7.1
raydp==1.6.0
skein==0.8.2
xgboost-ray==0.1.19
python==3.9.17
# Yarn:
Hadoop 3.1.1