We are trying ray tuning using ray version 2.9.0. Ray cluster is formed using 10 instance of 8cpu, 64GiB memory so totally we have 80 CPUs and 640GiB memory.
Issue:
We’ve encountered out-of-memory errors in Ray, which we suspect may be due to the simultaneous execution of multiple trials on a single node. This observation was made evident from the attached screenshot, where it appears that two trials were being executed concurrently on the same node, even when we have set {“cpu”: 7, “gpu”: 0} as the resource requirements for each trial running on nodes having 8 cores each. This concurrent execution likely resulted in resource contention, leading to insufficient memory availability and subsequent out-of-memory errors.
And also the Ray status is indicating that out of the available CPU resources, only 10/80 CPUs are actively being utilized(attached the screenshot below). Shouldn’t this actually be 70/80, since each trial would be using 7 CPUs
Is there any issue with the resource allocation in this Ray version or is there any issue with the tune config that has to be changed ?
Code snippet:
import uuid
import mlflow
import pandas as pd
import xgboost as xgb
from optuna.integration.mlflow import MLflowCallback
import psutil
import numpy as np
import ray
from ray import tune, train
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
from loguru import logger
from sklearn.model_selection import StratifiedKFold
import gc
import time
import subprocess
def objective(model_params):
"""
ray's objective function for tuning
Args:
model_params (dict): model parameters
"""
mlflow_params = model_params.pop("mlflow")
mlflow_tracking_uri = mlflow_params.pop("tracking_uri")
experiment_name = mlflow_params.pop("experiment_name")
cv_params = model_params.pop('cv_params')
n_splits_cv = cv_params.pop('n_splits_cv')
num_boost_round = cv_params.pop('num_boost_round')
early_stopping_rounds = cv_params.pop('early_stopping_rounds')
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment(experiment_name)
with mlflow.start_run():
s3_backup_loc = model_params["external_file"]["s3_backup_loc"]
dv = model_params["external_file"]["dv"]
exp_id = model_params["external_file"]["exp_id"]
log_trail = model_params["external_file"]["log_trail"]
project_dir = model_params["external_folder"]["project_dir"]
processed_folder = model_params["external_folder"]["processed_folder"]
prev_s3_loc = model_params["prev_file"]["prev_s3_loc"]
prev_preds = model_params["prev_file"]["prev_preds"]
train_set = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/train_set.pkl")
train_sample_weight = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/train_sample_weight.pkl")
# split data into IDVs and DV
train_inputs = train_set.drop(dv, axis=1)
train_output = train_set[dv]
del train_set
gc.collect()
time.sleep(1)
val_set = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/val_set.pkl")
val_sample_weight = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/val_sample_weight.pkl")
logger.info(f"Length of VAL Sample weight: {len(val_sample_weight)}")
val_inputs = val_set.drop(dv, axis=1)
val_output = val_set[dv]
del val_set
gc.collect()
time.sleep(1)
logger.info(f"Shape of train_inputs: {train_inputs.shape}")
logger.info(f"Shape of val_inputs: {val_inputs.shape}")
pred_dev1_full = np.zeros(train_inputs.shape[0])
pred_dev2_full = np.zeros(train_inputs.shape[0])
kfolds = StratifiedKFold(n_splits=n_splits_cv, shuffle=True, random_state=42)
max_best_iteration = []
for dev_index, val_index in kfolds.split(train_inputs, train_output):
dev1_x, dev2_x = train_inputs.iloc[dev_index], train_inputs.iloc[val_index]
dev1_y, dev2_y = train_output.iloc[dev_index], train_output.iloc[val_index]
dtrain_cv = xgb.DMatrix(dev1_x, label=dev1_y)
dval_cv = xgb.DMatrix(dev2_x, label=dev2_y)
watchlist_cv = [(dtrain_cv, 'train'), (dval_cv, 'eval')]
model_cv = xgb.train(model_params, dtrain_cv, evals=watchlist_cv, num_boost_round=num_boost_round,
early_stopping_rounds=early_stopping_rounds, verbose_eval=0)
max_best_iteration.append(model_cv.best_iteration + 1)
print('best_iteration:', model_cv.best_iteration + 1)
dev1_preds = model_cv.predict(xgb.DMatrix(dev1_x))
dev2_preds = model_cv.predict(xgb.DMatrix(dev2_x))
pred_dev1_full[dev_index] += dev1_preds
pred_dev2_full[val_index] = dev2_preds
del dev1_x, dev2_x, dev1_y, dev2_y
del dtrain_cv, dval_cv, model_cv
gc.collect()
time.sleep(1)
print(max_best_iteration)
print('Cross Val Completed')
logger.info('Cross Validation Complete')
max_best_iteration = round(np.max(max_best_iteration)) + 1
if max_best_iteration <= 50:
max_best_iteration = num_boost_round
pred_dev1_full /= 4
dtrain = xgb.DMatrix(train_inputs, label=train_output, weight=train_sample_weight)
dval = xgb.DMatrix(val_inputs, label=val_output, weight=val_sample_weight)
watchlist = [(dtrain, 'train'), (dval, 'eval')]
logger.info("Model Training Started")
model = xgb.train(model_params, dtrain, evals=watchlist, num_boost_round=max_best_iteration,
early_stopping_rounds=early_stopping_rounds)
logger.info("Model Training Complete")
mlflow.xgboost.log_model(
model, artifact_path=f"{dv}_Model"
)
# metrics
train_predictions = model.predict(xgb.DMatrix(train_inputs))
val_predictions = model.predict(xgb.DMatrix(val_inputs))
train_score = score(train_output.values, train_predictions)
val_score = score(val_output.values, val_predictions)
## CV score
train_score_cv = score(train_output.values, pred_dev1_full)
val_score_cv = score(train_output.values, pred_dev2_full)
uuid_trial_name = str(uuid.uuid4())
logger.info(f'trial_name: {uuid_trial_name}')
mlflow.set_tag('mlflow.runName', uuid_trial_name)
mlflow.log_text(','.join(train_inputs.columns.tolist()), "IDV_sequence.txt")
del train_output, train_inputs, val_output, val_inputs, pred_dev1_full, pred_dev2_full
gc.collect()
time.sleep(1)
cv_score_diff = abs(train_score_cv - val_score_cv)
val_score_diff = abs(train_score - val_score)
mlflow.log_params(model_params)
mlflow.log_params({'best_iteration': model.best_iteration})
mlflow.log_metrics(
{"1_train_score_cv": train_score_cv, "2_val_score_cv": val_score_cv, "3_cv_score_diff": cv_score_diff,
"4_train_score": train_score, "5_val_score": val_score, "7_val_score_diff": val_score_diff})
feature_imp_df = feature_importance(model)
mlflow.log_artifact("feature_importance.html", feature_imp_df.to_html("feature_importance.html", index=False))
# report to mlflow through ray tune
train.report({'val_score_diff': val_score_diff, 'cv_score_diff': cv_score_diff, 'val_score_cv': val_score_cv,
'val_score': val_score, 'train_score': train_score, 'best_iteration': model.best_iteration,
'log_trail': log_trail, 'project_dir': project_dir, 'processed_folder': processed_folder,
'dv': dv, 'experiment_name': experiment_name, 'prev_preds': prev_preds,
'prev_s3_loc': prev_s3_loc, 's3_backup_loc': s3_backup_loc,
'uuid_run_name': uuid_trial_name})
gc.collect()
time.sleep(1)
def ray_tune(config, concurrent_trials):
"""Tune function for ray"""
# search space definition
mlflow.set_tracking_uri(config.tracking_uri)
experiment_name = config.mlflow_experiment_name
mlflow.set_experiment(experiment_name)
experiment = mlflow.get_experiment_by_name(name=experiment_name)
exp_id = experiment.experiment_id
model_params = {
"seed": config.seed,
"n_jobs": psutil.cpu_count() - 2,
"objective": config.objective,
"eta": tune.quniform(*config.eta),
"lambda": tune.quniform(*config.lamda),
"alpha": tune.quniform(*config.alpha),
"subsample": tune.quniform(*config.subsample),
"colsample_bytree": tune.quniform(*config.colsample_bytree),
"max_depth": tune.qrandint(*config.max_depth),
"min_child_weight": tune.quniform(*config.min_child_weight),
"gamma": tune.quniform(*config.gamma),
'eval_metric': config.metric,
"mlflow": {
"experiment_name": config.mlflow_experiment_name,
"tracking_uri": mlflow.get_tracking_uri()},
"external_file": {"s3_backup_loc": config.s3_backup_loc,
"dv": config.dv,
"exp_id": exp_id,
"log_trail": config.log_trail},
"external_folder": {"project_dir": config.project_dir,
"processed_folder": config.processed_folder, },
"prev_file": {"prev_s3_loc": config.prev_s3_loc,
"prev_preds": config.prev_preds
},
"cv_params": {
"n_splits_cv": config.n_splits_cv,
"early_stopping_rounds": config.early_stopping_rounds,
"num_boost_round": config.num_boost_round
}
}
algo = OptunaSearch(metric=["cv_score_diff", "val_score_cv"], mode=["min", "max"])
# scheduler = AsyncHyperBandScheduler()
algo = ConcurrencyLimiter(searcher=algo, max_concurrent=10)
tuner = tune.Tuner(
tune.with_resources(objective, resources={"cpu": 7, "gpu": 0}),
tune_config=tune.TuneConfig(search_alg=algo, num_samples=100, max_concurrent_trials=10, ),
run_config=train.RunConfig(name="mlflow"),
param_space=model_params,
)
tuner.fit()
def main(cfg):
config = cfg
try:
concurrent_trials = 10
num_instances = 9
head_node = 'ip0'
worker_nodes = ['ip1', 'ip2', 'ip3', 'ip4', 'ip5', 'ip6', 'ip7', 'ip8', 'ip9']
head_address = f'ray://{head_node}:10001'
# ray init and attaching all the worker nodes to ray cluster
ray.init(address=head_address, ignore_reinit_error=True,
runtime_env={"working_dir": f"{config.project_dir}/src/"})
create_ray_cluster(head_node, worker_nodes)
ray_tune(config, concurrent_trials)
except Exception:
logger.exception("Some error in ray tune")
try:
ray.shutdown()
except NameError:
pass
finally:
ray.shutdown()
logs:
please find the screenshots of ray status and out of memory error