Hello,
I am trying to use xgboost_ray
(or XGBoostTrainer
from ray
) to train a model on data stored in HDFS/Hive on a yarn
-managed cluster (using skein
).
Before anything else, loading data from a “local” source (e.g. a randomly generated Spark DF) works fine so the general spark+yarn+ray
appears to be functional.
The dataset is not very large but quite sparse (HDFS footprint ~30GB across tens of partitions but loads of values are missing so expect that snappy
compression is a factor when estimating what the footprint is when the data is loaded into memory). Point is, I anticipate that a single partition should be OK to fit into a single worker’s RAM, but not more. Using xgboost
’s SparkXGBClassifier
works fine in this case.
As far as I can tell, there are multiple ways of loading the data:
# Load Hive via Spark into RayDmatrix:
spark_session = raydp.init_spark(enable_hive=True)
sdf = spark_session.read.table('my_table')
ray_dmatrix = RayDmatrix(ray.data.from_spark(sdf))
# Load parquet via Apache Arrow FS into :
arrow_hdfs = pyarrow.fs.HadoopFileSystem('default')
ray_dataset = ray.data.read_parquet('/hdfs/path/to/my_table',
filesystem=arrow_hdfs)
ray_dmatrix = RayDmatrix(ray_dataset)
For training, I can do something like:
# xgboost_ray's function train
from xgboost_ray import train
train(
{
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
},
dtrain=ray_dmatrix,
...
)
Or I could leave the data as ray.data.Dataset
and use the trainer:
from ray.train.xgboost import XGBoostTrainer
XGBoostTrainer(
...
datasets={"train": ray_dataset},
....
).fit()
However, regardless of the approach, I haven’t been able to load the data successfully thus far. The failure modes seem to be:
- Loading via
RayDMatrix
results in missing columns when shards are being accessed:
the ultimate failure is withinpandas
but the traceback looks something like:
ray/data/_internal/planner/map_batches.py", line 79, in process_next_batch
→ray/data/dataset.py", line 728, in <lambda>
→pandas/util/_decorators.py", line 331, in wrapper return func(*args, **kwargs)
… →KeyError: "['colX', 'colY'] not found in axis"
(seems to be related toRayShardingMode
because different columns go “missing” based on the sharding mode ) - Loading via
ray.data.Dataset
results in OOM kill.
Appreciate that there’s a a fair amount of missing so for starters wanted to ask for some guidance on what would be a recommended data flow in this case and which classes/conversions ?