Ray Out of Memory Issue

We are trying ray tuning using ray version 2.9.0. Ray cluster is formed using 10 instance of 8cpu, 64GiB memory so totally we have 80 CPUs and 640GiB memory.

Issue:

We’ve encountered out-of-memory errors in Ray, which we suspect may be due to the simultaneous execution of multiple trials on a single node. This observation was made evident from the attached screenshot, where it appears that two trials were being executed concurrently on the same node, even when we have set {“cpu”: 7, “gpu”: 0} as the resource requirements for each trial running on nodes having 8 cores each. This concurrent execution likely resulted in resource contention, leading to insufficient memory availability and subsequent out-of-memory errors.

And also the Ray status is indicating that out of the available CPU resources, only 10/80 CPUs are actively being utilized(attached the screenshot below). Shouldn’t this actually be 70/80, since each trial would be using 7 CPUs

Is there any issue with the resource allocation in this Ray version or is there any issue with the tune config that has to be changed ?

Code snippet:

import uuid
import mlflow
import pandas as pd
import xgboost as xgb
from optuna.integration.mlflow import MLflowCallback
import psutil
import numpy as np
import ray
from ray import tune, train
from ray.tune.search import ConcurrencyLimiter
from ray.tune.search.optuna import OptunaSearch
from loguru import logger
from sklearn.model_selection import StratifiedKFold
import gc
import time
import subprocess


def objective(model_params):
    """
    ray's objective function for tuning
    Args:
        model_params (dict): model parameters
    """
    mlflow_params = model_params.pop("mlflow")
    mlflow_tracking_uri = mlflow_params.pop("tracking_uri")
    experiment_name = mlflow_params.pop("experiment_name")
    cv_params = model_params.pop('cv_params')
    n_splits_cv = cv_params.pop('n_splits_cv')
    num_boost_round = cv_params.pop('num_boost_round')
    early_stopping_rounds = cv_params.pop('early_stopping_rounds')
    mlflow.set_tracking_uri(mlflow_tracking_uri)
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run():
        s3_backup_loc = model_params["external_file"]["s3_backup_loc"]
        dv = model_params["external_file"]["dv"]
        exp_id = model_params["external_file"]["exp_id"]
        log_trail = model_params["external_file"]["log_trail"]
        project_dir = model_params["external_folder"]["project_dir"]
        processed_folder = model_params["external_folder"]["processed_folder"]
        prev_s3_loc = model_params["prev_file"]["prev_s3_loc"]
        prev_preds = model_params["prev_file"]["prev_preds"]

        train_set = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/train_set.pkl")
        train_sample_weight = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/train_sample_weight.pkl")

        # split data into IDVs and DV
        train_inputs = train_set.drop(dv, axis=1)
        train_output = train_set[dv]
        del train_set
        gc.collect()
        time.sleep(1)

        val_set = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/val_set.pkl")
        val_sample_weight = pd.read_pickle(f"{s3_backup_loc}preprocessed_dataset/val_sample_weight.pkl")
        logger.info(f"Length of VAL Sample weight: {len(val_sample_weight)}")
        val_inputs = val_set.drop(dv, axis=1)
        val_output = val_set[dv]

        del val_set
        gc.collect()
        time.sleep(1)

        logger.info(f"Shape of train_inputs: {train_inputs.shape}")
        logger.info(f"Shape of val_inputs: {val_inputs.shape}")

        pred_dev1_full = np.zeros(train_inputs.shape[0])
        pred_dev2_full = np.zeros(train_inputs.shape[0])
        kfolds = StratifiedKFold(n_splits=n_splits_cv, shuffle=True, random_state=42)
        max_best_iteration = []
        for dev_index, val_index in kfolds.split(train_inputs, train_output):
            dev1_x, dev2_x = train_inputs.iloc[dev_index], train_inputs.iloc[val_index]
            dev1_y, dev2_y = train_output.iloc[dev_index], train_output.iloc[val_index]
            dtrain_cv = xgb.DMatrix(dev1_x, label=dev1_y)
            dval_cv = xgb.DMatrix(dev2_x, label=dev2_y)
            watchlist_cv = [(dtrain_cv, 'train'), (dval_cv, 'eval')]
            model_cv = xgb.train(model_params, dtrain_cv, evals=watchlist_cv, num_boost_round=num_boost_round,
                                 early_stopping_rounds=early_stopping_rounds, verbose_eval=0)
            max_best_iteration.append(model_cv.best_iteration + 1)
            print('best_iteration:', model_cv.best_iteration + 1)
            dev1_preds = model_cv.predict(xgb.DMatrix(dev1_x))
            dev2_preds = model_cv.predict(xgb.DMatrix(dev2_x))
            pred_dev1_full[dev_index] += dev1_preds
            pred_dev2_full[val_index] = dev2_preds
            del dev1_x, dev2_x, dev1_y, dev2_y
            del dtrain_cv, dval_cv, model_cv
            gc.collect()
            time.sleep(1)
        print(max_best_iteration)
        print('Cross Val Completed')
        logger.info('Cross Validation Complete')
        max_best_iteration = round(np.max(max_best_iteration)) + 1
        if max_best_iteration <= 50:
            max_best_iteration = num_boost_round
        pred_dev1_full /= 4

        dtrain = xgb.DMatrix(train_inputs, label=train_output, weight=train_sample_weight)
        dval = xgb.DMatrix(val_inputs, label=val_output, weight=val_sample_weight)
        watchlist = [(dtrain, 'train'), (dval, 'eval')]
        logger.info("Model Training Started")
        model = xgb.train(model_params, dtrain, evals=watchlist, num_boost_round=max_best_iteration,
                          early_stopping_rounds=early_stopping_rounds)
        logger.info("Model Training Complete")
        mlflow.xgboost.log_model(
            model, artifact_path=f"{dv}_Model"
        )

        # metrics
        train_predictions = model.predict(xgb.DMatrix(train_inputs))
        val_predictions = model.predict(xgb.DMatrix(val_inputs))
        train_score = score(train_output.values, train_predictions)
        val_score = score(val_output.values, val_predictions)

        ## CV score
        train_score_cv = score(train_output.values, pred_dev1_full)
        val_score_cv = score(train_output.values, pred_dev2_full)

        uuid_trial_name = str(uuid.uuid4())
        logger.info(f'trial_name:  {uuid_trial_name}')
        mlflow.set_tag('mlflow.runName', uuid_trial_name)
        mlflow.log_text(','.join(train_inputs.columns.tolist()), "IDV_sequence.txt")

        del train_output, train_inputs, val_output, val_inputs, pred_dev1_full, pred_dev2_full
        gc.collect()
        time.sleep(1)

        cv_score_diff = abs(train_score_cv - val_score_cv)
        val_score_diff = abs(train_score - val_score)

        mlflow.log_params(model_params)
        mlflow.log_params({'best_iteration': model.best_iteration})

        mlflow.log_metrics(
            {"1_train_score_cv": train_score_cv, "2_val_score_cv": val_score_cv, "3_cv_score_diff": cv_score_diff,
             "4_train_score": train_score, "5_val_score": val_score, "7_val_score_diff": val_score_diff})

        feature_imp_df = feature_importance(model)
        mlflow.log_artifact("feature_importance.html", feature_imp_df.to_html("feature_importance.html", index=False))

        # report to mlflow through ray tune
        train.report({'val_score_diff': val_score_diff, 'cv_score_diff': cv_score_diff, 'val_score_cv': val_score_cv,
                      'val_score': val_score, 'train_score': train_score, 'best_iteration': model.best_iteration,
                      'log_trail': log_trail, 'project_dir': project_dir, 'processed_folder': processed_folder,
                      'dv': dv, 'experiment_name': experiment_name, 'prev_preds': prev_preds,
                      'prev_s3_loc': prev_s3_loc, 's3_backup_loc': s3_backup_loc,
                      'uuid_run_name': uuid_trial_name})

        gc.collect()
        time.sleep(1)


def ray_tune(config, concurrent_trials):
    """Tune function for ray"""

    # search space definition
    mlflow.set_tracking_uri(config.tracking_uri)
    experiment_name = config.mlflow_experiment_name
    mlflow.set_experiment(experiment_name)
    experiment = mlflow.get_experiment_by_name(name=experiment_name)
    exp_id = experiment.experiment_id
    model_params = {
        "seed": config.seed,
        "n_jobs": psutil.cpu_count() - 2,
        "objective": config.objective,
        "eta": tune.quniform(*config.eta),
        "lambda": tune.quniform(*config.lamda),
        "alpha": tune.quniform(*config.alpha),
        "subsample": tune.quniform(*config.subsample),
        "colsample_bytree": tune.quniform(*config.colsample_bytree),
        "max_depth": tune.qrandint(*config.max_depth),
        "min_child_weight": tune.quniform(*config.min_child_weight),
        "gamma": tune.quniform(*config.gamma),
        'eval_metric': config.metric,

        "mlflow": {
            "experiment_name": config.mlflow_experiment_name,
            "tracking_uri": mlflow.get_tracking_uri()},

        "external_file": {"s3_backup_loc": config.s3_backup_loc,
                          "dv": config.dv,
                          "exp_id": exp_id,
                          "log_trail": config.log_trail},

        "external_folder": {"project_dir": config.project_dir,
                            "processed_folder": config.processed_folder, },

        "prev_file": {"prev_s3_loc": config.prev_s3_loc,
                      "prev_preds": config.prev_preds
                      },
        "cv_params": {
            "n_splits_cv": config.n_splits_cv,
            "early_stopping_rounds": config.early_stopping_rounds,
            "num_boost_round": config.num_boost_round
        }
    }

    algo = OptunaSearch(metric=["cv_score_diff", "val_score_cv"], mode=["min", "max"])

    # scheduler = AsyncHyperBandScheduler()
    algo = ConcurrencyLimiter(searcher=algo, max_concurrent=10)
    tuner = tune.Tuner(
        tune.with_resources(objective, resources={"cpu": 7, "gpu": 0}),
        tune_config=tune.TuneConfig(search_alg=algo, num_samples=100, max_concurrent_trials=10, ),
        run_config=train.RunConfig(name="mlflow"),
        param_space=model_params,
    )
    tuner.fit()


def main(cfg):
    config = cfg
    try:
        concurrent_trials = 10
        num_instances = 9
        head_node = 'ip0'
        worker_nodes = ['ip1', 'ip2', 'ip3', 'ip4', 'ip5', 'ip6', 'ip7', 'ip8', 'ip9']
        head_address = f'ray://{head_node}:10001'
        # ray init and attaching all the worker nodes to ray cluster
        ray.init(address=head_address, ignore_reinit_error=True,
                 runtime_env={"working_dir": f"{config.project_dir}/src/"})

        create_ray_cluster(head_node, worker_nodes)
        ray_tune(config, concurrent_trials)

    except Exception:
        logger.exception("Some error in ray tune")
        try:
            ray.shutdown()
        except NameError:
            pass
    finally:
        ray.shutdown()

logs:
please find the screenshots of ray status and out of memory error

Have attached ray status output here: