1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.
2. Environment:
- Ray version: 2.48.0
- Python version: 3.12.3
- OS:
- Cloud/Infrastructure: Azure Databricks
- Other libs/tools (if relevant): Databricks Runtime ML 16.4, xgboost 3.0.2
3. What happened vs. what you expected:
- Expected: Being able to train XGBoost models on large datasets (123 features, from 9 000 000 rows up to 1 300 000 000 rows or more) and use Ray Tune
- Actual: Does not work, getting OOM issues, or Ray changes schema of dataset
Summary
Assuming you do not need to any complicated preprocesssing steps on your large datasets and the preprocessing you need to do at most is:
- selecting specific columns
- and maybe recast int32 datatypes to int8 to (reduce memory usage)
How do you run train XGBoost models in Ray on datasets that have > 100 features and > 9 000 000 rows?
Detailed information of what I have done, my setup, errors warnings etc
Hello,
I am trying to train XGBoost models on large datasets using Ray Tune, but I cannot get it to work. I have managed to get it working for datasets smaller than 9 000 000 rows (and 123 features), but higher than that I run into various problems.
My setup
I am working in Azure Databricks, and I have been using 1 worker node and 1 driver node, always of the same cluster type, using DBR ML 16.4 LTS.
I have tried with the following 2 cluster types:
Standard_NC64as_T4_v3 [T4] (440 GB RAM and 4 T4 GPUs per node)
Standard_NV36adms_A10_v5 [A10] (880 GB RAM and 1 A10 GPU per node)
I run an init script to install xgboost==3.0.2 and the latest version of Ray, as the Ray version that comes with DBR ML 16.4 LTS kills ray workers as soon as there is an OOM error.
I do all my preprocessing using SQL, so my data is ready for training. The only thing I need to do, is to drop some columns in the Ray dataset.
I read the tables into a Ray dataset using ray.data.read_delta_sharing_tables(), as I had problems back in January with ray.data.read_databricks_tables() not being able to read in all the data (there was some upper limit to it if I recall correctly).
My data
I have 18 tables with the same type of features, but different sizes (I have split my original data into 18 different clusters, of similar data points). As I am doing time series prediction, I also want to to timeseries cross validation, so I have have created 3 splits for each table.
I have 123 different features; 73 of datatype FLOAT, 50 of datatype INT. Of the 50 INT features, 44 features are one hot encodings (i.e. they only take on the values 1 or 0).
The amount of rows for my 18 tables varies with the lowest one being 15000 rows, and the largest one being 1 200 000 000 rows.
The first CV split of the largest table has 538 000 000 rows
The largest table that I have managed to succesfully complete all 50 Ray Tune trials on has approximately 228 550 000 rows.
BUT lately I have been having issues with table that has approximately 9 000 000 rows.
What I have tried
To find the best hyperparameters for each of my 18 datasets, I run 50 Ray Tune trials, together with OptunaSearch.
As my data varies in sizes I have created 2 notebooks that run XGBoost in different ways:
- For smaller datasets (< 9 000 000 rows) I convert the Ray datasets to pandas and then to DMatrix and then I train the XGBoost model. Here I used the cluster with 4 T4 GPUs.
a. This works fine if the initial dataset is < 9 000 000 rows
b. This does not work if I try to subsample < 9 000 000 rows out of the larger datasets using ray_ds.random_sample(). - For larger datasets I avoid using pandas, and instead I use XGBoost’s QuantileDMatrix. For this I used the cluster with the 1 A10 GPU and 880 GB RAM.
a. Ray tune trials worked for datasets up to 228 550 000 rows, but I can not get it to work for the dataset with 538 000 000 rows
After running the Ray Tune trials for the 228 550 000 rows dataset, I tried the 538 000 000 rows dataset, but it did not work. The trials failed because they requested more memory than what was available, which varied depending on the amount of resources I provided for each trial.
When I set resources={“CPU”: 30, “GPU”: 1}, a trial would request 61.55 GB of memory, while only 20.66 GB of RAM was available
When I set resources={“CPU”: 7, “GPU”: 0.25}, a trial would request around 1-2 GB of memory, while only < 1 GB of RAM was available
I assumed it was because I did not store the data in the Object Store Memory beforehand, which lead to each trial materializing their own dataset (which also seems unnecessary and slows downs the execution). So instead of passing the ray dataset into the training function, I decided to materialize the ray datasets beforehand, run ray.put() on them and pass the references into the trials and their corresponding training function and run ray.get() to retrieve the dataset in the training function of each trial.
For the big datasets where I use QuantileDMatric I have tried different things:
Reducing number of trials from 50 to 2
Reducing number of max_bin_size from 256 to 64
Reducing batch_size
Reducing batches that are prefetched to 1
My code (with QuantileDMatrix)
Ray setup for Standard_NV36adms_A10_v5 [A10]
restart = True
if restart is True:
try:
shutdown_ray_cluster()
except:
print("Failed to run shutdown_ray_cluster()")
pass
try:
ray.shutdown()
except:
print("Failed to run ray.shutdown()")
pass
setup_ray_cluster(
min_worker_nodes=1,
max_worker_nodes=1,
num_cpus_per_node=32,
num_gpus_per_node=1,
num_cpus_head_node=32,
num_gpus_head_node=1,
object_store_memory_worker_node=400 * 1024 * 1024 * 1024,
object_store_memory_head_node=400 * 1024 * 1024 * 1024,
collect_log_to_path="logs/gpu/ray-clusters/ml"
)
# Pass any custom Ray configuration with ray.init
ray.init(ignore_reinit_error=True)
print(ray.cluster_resources())
The code for Ray Tune (with materialization before starting Ray Tune)
import os
import mlflow
import mlflow.spark
import xgboost
import cupy as cp
from mlflow.utils.databricks_utils import get_databricks_env_vars
from mlflow.tracking import MlflowClient
from mlflow.entities import Metric
import ray
import ray.data
import ray.tune
from ray.train.xgboost import XGBoostTrainer, RayTrainReportCallback
from ray.tune.search.optuna import OptunaSearch
from ray.util.spark import setup_ray_cluster, shutdown_ray_cluster
import pyarrow as pa
# Customer Iter class
class RayDataIter(xgboost.core.DataIter):
def __init__(self, ray_dataset: ray.data.Dataset, label_col, batch_size, prefetch_batches):
super().__init__()
self.dataset = ray_dataset
self.label_col = label_col
self.batch_size = batch_size
self.prefetch_batches = prefetch_batches
self._generator = None
def reset(self):
self._generator = iter(self.dataset.iter_batches(
batch_size=self.batch_size,
prefetch_batches=self.prefetch_batches,
batch_format="numpy"
))
def next(self, input_data):
try:
batch = next(self._generator)
print("====== Got batch ======")
except StopIteration:
return False
y = cp.asarray(batch[self.label_col])
X = cp.column_stack([cp.asarray(batch[col]) for col in batch if col != self.label_col])
print(f"Batch shape: {X.shape}, label shape: {y.shape}")
input_data(data=X, label=y)
return True
# XGBoost train function
def train_xgboost_model(config):
train_dataset_ref = ray.get(config.pop("train_dataset_ref"))
val_dataset_ref = ray.get(config.pop("val_dataset_ref"))
train_iter = RayDataIter(
ray_dataset=train_dataset_ref,
label_col="target_col",
batch_size=2097152,
prefetch_batches=1,
)
val_iter = RayDataIter(
ray_dataset=val_dataset_ref,
label_col="target_col",
batch_size=2097152,
prefetch_batches=1,
)
# QuantileDMatrix
qdm_train = xgboost.QuantileDMatrix(train_iter, max_quantile_batches=2, max_bin=256
qdm_val = xgboost.QuantileDMatrix(val_iter, ref=qdm_train, max_quantile_batches=2, max_bin=256)
# Set early stopping rounds
early_stopping_rounds = config.pop("early_stopping_rounds")
evals_result = {}
booster = xgboost.train(
params=config,
dtrain=qdm_train,
evals=[(qdm_train, "train"), (qdm_val, "val")],
evals_result=evals_result,
num_boost_round=config["num_boost_round"],
early_stopping_rounds=early_stopping_rounds,
)
# Get best results
best_val_rmse = min(evals_result["val"]["rmse"])
best_val_mae = min(evals_result["val"]["mae"])
ray.tune.report({
"val-rmse": best_val_rmse,
"val-mae": best_val_mae,
"evals_result": evals_result,
})
# Ray tune function, which runs all the trials
def tune_with_callback(train_dataset, val_dataset):
# Materialize dataset and make them available to all trials
print("====== Materializing datasets ======")
train_dataset = train_dataset.materialize()
train_dataset_ref = ray.put(train_dataset)
val_dataset = val_dataset.materialize()
val_dataset_ref = ray.put(val_dataset)
# Start Ray tuning
print("====== Starting Ray Tune ======")
tuner = ray.tune.Tuner(
trainable=ray.tune.with_resources(trainable=train_xgboost_model, resources={"CPU": 30, "GPU": 1}),
run_config=ray.tune.RunConfig(
name="ray_tune_with_child_runs_v_4_streaming",
storage_path=ray_temp_storage_path,
),
tune_config=ray.tune.TuneConfig(
search_alg=OptunaSearch(
metric=["val-rmse", "val-mae"],
mode=["min", "min"],
),
num_samples=2
),
param_space={
"train_dataset_ref": train_dataset_ref,
"val_dataset_ref": val_dataset_ref,
"objective": "reg:squarederror",
"eval_metric": ["rmse", "mae"],
"device": "cuda",
"tree_method": "hist",
"early_stopping_rounds": 25,
"max_depth": ray.tune.randint(3, 10),
"eta": ray.tune.uniform(0.01, 0.2),
"num_boost_round": 150,
"alpha": ray.tune.uniform(0, 2),
"lambda": ray.tune.uniform(1, 10),
"gamma": ray.tune.uniform(0, 5),
"subsample": ray.tune.uniform(0.7, 1),
"min_child_weight": ray.tune.randint(1, 10),
"max_bin": 256,
}
)
results = tuner.fit()
return results
Errors I get
QuantileDMatrix error
Traceback (most recent call last):
File "/databricks/python/lib/python3.12/site-packages/ray/air/execution/_internal/event_manager.py", line 110, in resolve_future
result = ray.get(future)
^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/_private/auto_init_hook.py", line 22, in auto_init_wrapper
return fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/_private/worker.py", line 2858, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/_private/worker.py", line 958, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(XGBoostError): ray::ImplicitFunc.train() (pid=2610584, ip=10.0.22.10, actor_id=97f591e35c19854b6910cec002000000, repr=train_xgboost_model)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/tune/trainable/trainable.py", line 331, in train
raise skipped from exception_cause(skipped)
File "/databricks/python/lib/python3.12/site-packages/ray/air/_internal/util.py", line 107, in run
self._ret = self._target(*self._args, **self._kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/tune/trainable/function_trainable.py", line 45, in <lambda>
training_func=lambda: self._trainable_func(self.config),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/ray/tune/trainable/function_trainable.py", line 261, in _trainable_func
output = fn()
^^^^
File "/root/.ipykernel/2563/command-525148766324718-3946767004", line 14, in train_xgboost_model
File "/databricks/python/lib/python3.12/site-packages/xgboost/core.py", line 729, in inner_f
return func(**kwargs)
^^^^^^^^^^^^^^
File "/databricks/python/lib/python3.12/site-packages/xgboost/core.py", line 1614, in __init__
self._init(
File "/databricks/python/lib/python3.12/site-packages/xgboost/core.py", line 1680, in _init
_check_call(ret)
File "/databricks/python/lib/python3.12/site-packages/xgboost/core.py", line 310, in _check_call
raise XGBoostError(py_str(_LIB.XGBGetLastError()))
xgboost.core.XGBoostError: [13:37:52] /workspace/src/common/device_vector.cu:23:
Memory allocation error on worker 0:
std::bad_alloc: cudaErrorMemoryAllocation: out of memory
- Free memory: 20.6661GB
- Requested memory: 61.1558GB
or
xgboost.core.XGBoostError: [19:03:59] /workspace/src/common/device_vector.cu:23:
Memory allocation error on worker 0:
std::bad_alloc: cudaErrorMemoryAllocation: out of memory
- Free memory: 2.30676GB
- Requested memory: 3.09766GB
Errors when downsampling
When I downsample, I sometimes get OOM as above (seems random, especially as I work with the same dataset, and use the same seed). And I also get warnings that the schema changes, which is weird in itself because it seems random. Sometimes when I rerun the code, I dont get the warning, and sometimes I do. Here are the warning that I get:
Warning with pandas
2025-07-28 16:22:36,060 WARNING streaming_executor_state.py:764
-- Operator produced a RefBundle with a different schema than the previous one.
Previous schema:
PandasBlockSchema(names=[<list-of-my-columns>],
types=[dtype('int32'), dtype('float32'), dtype('int32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('int32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32'), dtype('float32')]),
new schema: . This may lead to unexpected behavior.
Another version of the warning
2025-07-28 16:48:20,823 WARNING streaming_executor_state.py:764 -- Operator produced a RefBundle with a different schema than the previous one. Previous schema: <my-column-1>: int32
<my-column-2>: float
...
<my-column-123>: float
-- schema metadata --
pandas: '{"index_columns": [], "column_indexes": [], "columns": [{"name":' + 18151, new schema: . This may lead to unexpected behavior.
And sometimes, when I was using ray_ds.drop_cols() instead of ray_ds.select_columns(), the warning would contain columns that were not part of my table in Databricks, nor in ray_ds.schema(). They columns it adds have names like this:
'_row-id-col-8b2dd88e-f49a-4e7f-aa1a-038ec3772299'
'_row-commit-version-col-42c239f6-a049-49ed-8a26-1184d4e4b509'.
This one happens on one of the larger tables, not the smaller ones that I have succeeded running Ray Trial on.
My questions
How do you train XGBoost models with large datasets using Ray Tune to get good hyperparameters?
How do you do it efficiently? What am I missing (considering I managed with 228 550 000 rows training dataset and 8 386 000 rows validation dataset)?
And why does Ray change randomly change the schema !?