Loading large datasets from HDFS for xgboost on Yarn

Hello,

I am trying to use xgboost_ray (or XGBoostTrainer from ray) to train a model on data stored in HDFS/Hive on a yarn-managed cluster (using skein).

Before anything else, loading data from a “local” source (e.g. a randomly generated Spark DF) works fine so the general spark+yarn+ray appears to be functional.

The dataset is not very large but quite sparse (HDFS footprint ~30GB across tens of partitions but loads of values are missing so expect that snappy compression is a factor when estimating what the footprint is when the data is loaded into memory). Point is, I anticipate that a single partition should be OK to fit into a single worker’s RAM, but not more. Using xgboost’s SparkXGBClassifier works fine in this case.

As far as I can tell, there are multiple ways of loading the data:

# Load Hive via Spark into RayDmatrix:
spark_session = raydp.init_spark(enable_hive=True)
sdf = spark_session.read.table('my_table')
ray_dmatrix = RayDmatrix(ray.data.from_spark(sdf))
# Load parquet via Apache Arrow FS into :
arrow_hdfs = pyarrow.fs.HadoopFileSystem('default')
ray_dataset = ray.data.read_parquet('/hdfs/path/to/my_table', 
                                    filesystem=arrow_hdfs)
ray_dmatrix = RayDmatrix(ray_dataset)

For training, I can do something like:

# xgboost_ray's function train
from xgboost_ray import train
train(
        {
            "objective": "binary:logistic",
            "eval_metric": ["logloss", "error"],
        },
        dtrain=ray_dmatrix,
        ...
)

Or I could leave the data as ray.data.Dataset and use the trainer:

from ray.train.xgboost import XGBoostTrainer
XGBoostTrainer(
       ...
        datasets={"train": ray_dataset},
       ....
).fit()

However, regardless of the approach, I haven’t been able to load the data successfully thus far. The failure modes seem to be:

  • Loading via RayDMatrix results in missing columns when shards are being accessed:
    the ultimate failure is within pandas but the traceback looks something like:
    ray/data/_internal/planner/map_batches.py", line 79, in process_next_batchray/data/dataset.py", line 728, in <lambda>pandas/util/_decorators.py", line 331, in wrapper return func(*args, **kwargs)… → KeyError: "['colX', 'colY'] not found in axis" (seems to be related to RayShardingMode because different columns go “missing” based on the sharding mode )
  • Loading via ray.data.Dataset results in OOM kill.

Appreciate that there’s a a fair amount of missing so for starters wanted to ask for some guidance on what would be a recommended data flow in this case and which classes/conversions ?

Hi @nka_wtg , thanks for the initial details. The general guidance to load data here would be to use ray.data.read_parquet() as you do in the second example above. If you aren’t already doing so, you can also specify compression codecs supported by Arrow (read more here).

To help dig into the issue, can you provide some more details:

  • For the OOM issues, is there a useful stack trace?
  • Do you observe any strange behavior on the Ray Dashboard, related to CPU / object store usage, etc?
  • In terms of the data, you mentioned the data can be sparse. Roughly how many total files are being read?

Hello @sjl,

Took some time to get back to this case but here are more details.
Somewhat redacted code (example.py):

import pyarrow
import ray
from xgboost_ray import RayDMatrix
from xgboost_ray import RayXGBClassifier

N_ACTORS = 10


def get_hdfs_data():
    hdfs = pyarrow.fs.HadoopFileSystem('default')
    path = '/HDFS_PATH/'
    print("Reading parquet from HDFS...")
    dataset = ray.data.read_parquet(path, filesystem=hdfs)

    bad_cols = [COL1, COL2....]
    print("Dropping cols...")
    dataset.drop_columns(bad_cols)

    sample = dataset.take(limit=5)
    print(f"Dataset instantiated. Sampled {len(sample)} rows successfully")

    return dataset


if __name__ == "__main__":
    ray.init(_temp_dir='/tmp')
    ray.data.DataContext.get_current().execution_options.verbose_progress = True

    print(f'Instantiating RayDMatrix ...')
    train_dmatrix = RayDMatrix(get_hdfs_data(),
                               label='outcome',
                               num_actors=N_ACTORS)

    print(f'Instantiating RayXGBClassifier...')
    clf = RayXGBClassifier(n_jobs=N_ACTORS, num_class=2)

    print(f'Training RayXGBClassifier...')
    clf.fit(train_dmatrix, None, ray_params={'num_actors': N_ACTORS})
    print(f'Finished training RayXGBClassifier')

Skein job YAML (skein_job.yaml):

name: ray_yarn
queue: Q
services:
  ray-head:
    instances: 1
    resources:
      vcores: 10
      memory: 32 GiB
    files:
      example.py: example.py
      ray_conda_env:
        source: ray_conda_env.tar.gz
        type: archive

    script: |
      source ray_conda_env/bin/activate
      skein kv put current --key=RAY_HEAD_ADDRESS --value=$(hostname -i)
      # 8GB object store; 32GB RAM
      ray start --head --port=6379 --object-store-memory=8000000000 --memory 32000000000 --num-cpus=10
      python example.py
      ray stop
      skein application shutdown current

  ray-worker:
    files:
      example.py: example.py
      ray_conda_env:
        source: /home/USER/ray_conda_env.tar.gz
        type: archive
    instances: 10
    resources:
      vcores: 2
      memory: 16 GiB
    depends:
      - ray-head
    script: |
      source ray_conda_env/bin/activate
      RAY_HEAD_ADDRESS=$(skein kv get --key=RAY_HEAD_ADDRESS current)
      # 8GB object store; 16GB RAM
      ray start --object-store-memory=8000000000 --memory 16000000000 --num-cpus=2 --address=$RAY_HEAD_ADDRESS:6379 --block; ray stop

This is submitted to Yarn by: skein submit skein_job.yaml.

Then, after executing on the cluster we get the following outputs:
ray-head.log:

...
2023-10-14 16:22:29,004	SUCC scripts.py:781 -- --------------------
2023-10-14 16:22:29,005	SUCC scripts.py:782 -- Ray runtime started.
2023-10-14 16:22:29,008	SUCC scripts.py:783 -- --------------------
...
Dataset instantiated. Sampled 5 rows successfully
Instantiating RayXGBClassifier...
Training RayXGBClassifier...
(_execute_read_task_split pid=399645) 23/10/14 16:23:15 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
(_execute_read_task_split pid=399645) 23/10/14 16:23:15 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
Read progress 0:   0%|          | 0/23 [00:00<?, ?it/s]
End of LogType:ray-head.log

application.master.log:

...
23/10/14 16:22:09 INFO skein.ApplicationMaster: RUNNING: ray-worker_7 on container_e2964_1695734152961_32120_01_000010
23/10/14 16:22:09 INFO skein.ApplicationMaster: Starting container_e2964_1695734152961_32120_01_000011...
23/10/14 16:22:09 INFO skein.ApplicationMaster: RUNNING: ray-worker_8 on container_e2964_1695734152961_32120_01_000011
23/10/14 16:22:09 INFO skein.ApplicationMaster: Starting container_e2964_1695734152961_32120_01_000012...
23/10/14 16:22:09 INFO skein.ApplicationMaster: RUNNING: ray-worker_9 on container_e2964_1695734152961_32120_01_000012
23/10/14 16:23:24 DEBUG skein.ApplicationMaster: Received 1 completed containers
23/10/14 16:23:24 WARN skein.ApplicationMaster: FAILED: ray-head_0 - Container killed by YARN for exceeding memory limits
23/10/14 16:23:24 INFO skein.ApplicationMaster: Shutting down: Failure in service ray-head, see logs for more information.
23/10/14 16:23:24 INFO skein.ApplicationMaster: Unregistering application with status FAILED
23/10/14 16:23:24 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
23/10/14 16:23:25 INFO skein.ApplicationMaster: Deleted application directory hdfs:/XXXX
23/10/14 16:23:25 INFO skein.ApplicationMaster: WebUI server shut down
23/10/14 16:23:25 INFO skein.ApplicationMaster: gRPC server shut down

So, judging by the above, it looks like:

  • The data can be accessed
  • RayDMatrix instantiation succeeds
  • At some point inside clf.fit() data read results in ray-head_0 - Container killed by YARN for exceeding memory limits. No useful stack trace though.

Regarding the dataset being loaded, both dataset.stats() and hdfs are in agreement that it’s ~17GB on disk (there’s been some changes since the original post but the problem remains):

du output:

hdfs dfs -du -s -h /HDFS_PATH
  17.7 G  53.1 G  /HDFS_PATH

Partitions on HDFS:

hdfs dfs -ls -h /HDFS_PATH
Found 24 items
 -rw-rw----+  3 XX hive          0 2023-08-XX 10:10 /HDFS_PATH/_SUCCESS
 -rw-rw----+  3 XX hive    789.3 M 2023-08-XX 10:09 /HDFS_PATH/part-00000-abe25889-4961-431f-9cf3-e249a6e89c63-c000.snappy.parquet
 -rw-rw----+  3 XX hive    787.2 M 2023-08-XX 10:09 /HDFS_PATH/part-00001-abe25889-4961-431f-9cf3-e249a6e89c63-c000.snappy.parquet
 ...

Dataset stats from an interactive ipython session seems to agree on the total bytes:

    In [8]: dataset.stats()
    Out[8]: "Stage 1 ReadParquet->SplitBlocks(64): 64/64 blocks executed in 60.83s
    * Remote wall time: 93.01ms min, 3.31s max, 618.34ms mean, 39.57s total
    * Remote cpu time: 121.44ms min, 7.41s max, 1.18s mean, 75.51s total
    * Peak heap memory usage (MiB): 7619.55 min, 20669.95 max, 16961 mean
    * Output num rows: 1321 min, 16725 max, 15563 mean, 996037 total
    * Output size bytes: 22808287 min, 288746235 max, 268686580 mean, 17195941141 total
    * Tasks per node: 64 min, 64 max, 64 mean; 1 nodes used
    * Extra metrics: {'obj_store_mem_alloc': 17195941141, 
                      'obj_store_mem_freed': 17316948255, 
                      'obj_store_mem_peak': 17316948255, 
                      'ray_remote_args': {'num_cpus': 1, 
                      'scheduling_strategy': 'SPREAD'}}
                      
    Dataset iterator time breakdown:
    * Total time user code is blocked: 93.72ms
    * Total time in user code: 636.83ms
    * Num blocks local: 0
    * Num blocks remote: 0
    * Num blocks unknown location: 0
    * Batch iteration time breakdown (summed across prefetch threads):
        * In ray.get(): 87.5ms min, 657.49ms max, 372.49ms avg, 744.99ms total
        * In batch creation: 3.78us min, 3.97us max, 3.88us avg, 7.75us total
        * In batch formatting: 5.23us min, 8.15us max, 6.69us avg, 13.38us total

EDIT: addendum to the above:

To sanity check the general setup, loading a smaller (80 M over 10 partitions; 10M rows) table from HDFS works fine:

hdfs dfs -ls -h '/SMALL_TABLE_PATH'
Found 10 items
-rw-rw----+  3 XX hive      8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000000_000000.parquet
-rw-rw----+  3 XX hive      8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000001_000000.parquet
-rw-rw----+  3 XX hive      8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000002_000000.parquet
-rw-rw----+  3 XX hive      8.1 M 2023-10-xx 17:51 /SMALL_TABLE_PATH/11442b0d8deb4d61a02d32731be1b793_000003_000000.parquet
...

Tail of ray-head.log after training on the smaller table:

...
Training RayXGBClassifier...
(_execute_read_task_split pid=326192) 23/10/14 17:54:21 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
(_execute_read_task_split pid=326191) 23/10/14 17:54:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable [repeated 3x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)
(_execute_read_task_split pid=326191) 23/10/14 17:54:22 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. [repeated 3x across cluster]
2023-10-14 17:54:23,407 INFO main.py:1191 -- [RayXGBoost] Starting XGBoost training.
(_RemoteRayXGBoostActor pid=330293) [17:54:23] task [xgboost.ray]:140147885452640 got new rank 0
2023-10-14 17:54:34,967	INFO main.py:1708 -- [RayXGBoost] Finished XGBoost training on training data with total N=10,000,000 in 14.98 seconds (11.55 pure XGBoost training time).
Finished training RayXGBClassifier

application.master.log also indicates a healthy termination:

...
23/10/14 17:54:44 INFO skein.ApplicationMaster: Shutting down: Shutdown requested by user.
23/10/14 17:54:44 DEBUG skein.ApplicationMaster: Stopping allocator thread
23/10/14 17:54:44 INFO skein.ApplicationMaster: Unregistering application with status SUCCEEDED
23/10/14 17:54:44 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
23/10/14 17:54:45 INFO skein.ApplicationMaster: Deleted application directory hdfs://xxxx
23/10/14 17:54:45 INFO skein.ApplicationMaster: WebUI server shut down
23/10/14 17:54:45 INFO skein.ApplicationMaster: gRPC server shut down

It would appear that the size of the original table is the cause of the problem.

Finally, versions being used:

# python env:
ray==2.7.1
raydp==1.6.0
skein==0.8.2
xgboost-ray==0.1.19
python==3.9.17
# Yarn:
Hadoop 3.1.1