Hello, I’m trying to train a LightGBM model using Ray for distributed training.
Recently, I tested it with a dataset of approximately 1.11 million rows and 572 columns, but I noticed that even with workers having 2 cores and 12 GB of memory, OOM (Out of Memory) errors occurred.
I have a couple of questions:
- Does Ray load the entire dataset into each worker during training? As far as I know, it should use a Data Parallel approach, meaning each worker should only load the necessary parts of the dataset.
- Even if it loads the entire dataset, it doesn’t seem like it should use enough memory to exceed 12 GB. Is there any specific reason why it’s consuming so much memory?
my code looks like as below:
import pandas as pd
import awswrangler as wr
import os
import numpy as np
import modin.pandas as pd
from lightgbm import LGBMClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import confusion_matrix, accuracy_score, f1_score, precision_score, recall_score
import ray
from ray import train , tune
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.hyperopt import HyperOptSearch
from hyperopt import hp
from ray.tune.schedulers import ASHAScheduler
from ray.tune.integration.lightgbm import TuneReportCheckpointCallback
import lightgbm as lgb
from ray.train import ScalingConfig
from ray.train.lightgbm import LightGBMTrainer
import mlflow
from ray.air.integrations.mlflow import MLflowLoggerCallback , setup_mlflow
from ray.air.config import RunConfig
mlflow_tracking_uri = "url"
ray.init()
# 1114619 * 592
# I bring the data as format of ray dataset
df = wr.athena.read_sql_query(sql = """
SELECT *
FROM table
""",
database = "database",
use_threads = True ,
ctas_approach = True
)
df = pd.DataFrame(df)
df_len = len(df)
import random
random_list = [random.randint(0, 1) for _ in range(df_len)]
df_int = df.select_dtypes(int)
df_int["target"] = random_list
df_int = df_int.iloc[:,1:]
x_columns = list(df_int.columns[:-1])
X_train, X_val, y_train, y_val = train_test_split(df_int[x_columns], df_int['target'], stratify=df_int['target'], test_size=0.2)
X_train['target'] = y_train
X_val["target"] = y_val
train_set = ray.data.from_modin(X_train)
valid_set = ray.data.from_modin(X_val)
#ray.put(train_set)
#ray.put(valid_set)
trainer = LightGBMTrainer(
scaling_config = ScalingConfig(
num_workers =8 ,
use_gpu =False
),
label_column = "target",
num_boost_round = 100 ,
params = {
"objective": "binary",
"metric": ["binary_logloss", "binary_error"],
},
datasets = {"train": train_set , "valid": valid_set},
run_config=RunConfig(
name="my_lgbm_run",
storage_path = "s3",
callbacks=[
MLflowLoggerCallback(
save_artifact=True ,
tracking_uri = mlflow_tracking_uri,
experiment_name = "ray-test"
)
],
# You can also specify local_dir, checkpoint_config, etc. if you want
)
)
result = trainer.fit()
print("Training completed! Results:", result)