Loading large datasets from HDFS for xgboost on Yarn

Hello,

I am trying to use xgboost_ray (or XGBoostTrainer from ray) to train a model on data stored in HDFS/Hive on a yarn-managed cluster (using skein).

Before anything else, loading data from a “local” source (e.g. a randomly generated Spark DF) works fine so the general spark+yarn+ray appears to be functional.

The dataset is not very large but quite sparse (HDFS footprint ~30GB across tens of partitions but loads of values are missing so expect that snappy compression is a factor when estimating what the footprint is when the data is loaded into memory). Point is, I anticipate that a single partition should be OK to fit into a single worker’s RAM, but not more. Using xgboost’s SparkXGBClassifier works fine in this case.

As far as I can tell, there are multiple ways of loading the data:

# Load Hive via Spark into RayDmatrix:
spark_session = raydp.init_spark(enable_hive=True)
sdf = spark_session.read.table('my_table')
ray_dmatrix = RayDmatrix(ray.data.from_spark(sdf))
# Load parquet via Apache Arrow FS into :
arrow_hdfs = pyarrow.fs.HadoopFileSystem('default')
ray_dataset = ray.data.read_parquet('/hdfs/path/to/my_table', 
                                    filesystem=arrow_hdfs)
ray_dmatrix = RayDmatrix(ray_dataset)

For training, I can do something like:

# xgboost_ray's function train
from xgboost_ray import train
train(
        {
            "objective": "binary:logistic",
            "eval_metric": ["logloss", "error"],
        },
        dtrain=ray_dmatrix,
        ...
)

Or I could leave the data as ray.data.Dataset and use the trainer:

from ray.train.xgboost import XGBoostTrainer
XGBoostTrainer(
       ...
        datasets={"train": ray_dataset},
       ....
).fit()

However, regardless of the approach, I haven’t been able to load the data successfully thus far. The failure modes seem to be:

  • Loading via RayDMatrix results in missing columns when shards are being accessed:
    the ultimate failure is within pandas but the traceback looks something like:
    ray/data/_internal/planner/map_batches.py", line 79, in process_next_batchray/data/dataset.py", line 728, in <lambda>pandas/util/_decorators.py", line 331, in wrapper return func(*args, **kwargs)… → KeyError: "['colX', 'colY'] not found in axis" (seems to be related to RayShardingMode because different columns go “missing” based on the sharding mode )
  • Loading via ray.data.Dataset results in OOM kill.

Appreciate that there’s a a fair amount of missing so for starters wanted to ask for some guidance on what would be a recommended data flow in this case and which classes/conversions ?

Hi @nka_wtg , thanks for the initial details. The general guidance to load data here would be to use ray.data.read_parquet() as you do in the second example above. If you aren’t already doing so, you can also specify compression codecs supported by Arrow (read more here).

To help dig into the issue, can you provide some more details:

  • For the OOM issues, is there a useful stack trace?
  • Do you observe any strange behavior on the Ray Dashboard, related to CPU / object store usage, etc?
  • In terms of the data, you mentioned the data can be sparse. Roughly how many total files are being read?