Nested Cross Validation with Ray and Tune

Dear Ray-Community,

I am estimating my model performance using nested cross validation, i.e. running an outer loop for model performance on different test sets and an inner loop for hyper-paramter optimization using different validation sets.
Due to its embarrassingly parallel nature, I would like to use Ray on the outer and inner loop. More specifically:

  1. For N validation sets I use Tune’s random search to sample M hyper-parameter configurations / sets

  2. After Tune has finished I select the best performing hyper-parameter config

  3. Using the best hyper-parameter config I evaluate on the according test set and average my performance over all K test sets

Running this on a single 32 core, 3 GPU machine I am not running in any problems. However, when trying on a local cluster with constraining the resources I run into IO and resource problems while performing the hyper-parameter optimisation.

I suspect that this is related to this and this issue. Unfortunately, I am not quite able to get the hang of it.

Any help is much appreciated.

Regards.

1 Like

cc @kai ------------

Hi @ThinkPad, this sounds interesting. I’m not completely sure from your description how the loops look like, can you help me there? Do you start the Ray Tune runs in parallel for each of the N validation sets?

Which version of Ray are you running? Running multiple Tune trials on the same cluster in parallel is not supported with the recent placement group support in latest master, you would have to disable placement group usage. What kind of errors are you running into?

Do your configurations or resource requirements differ for the validation sets? If not, you could just add the validation set to use as a hyperparameter. So something like:

config = {
    "validation_set": tune.grid_search(["set_a", "set_b", "set_c"]),
    "other_parameter": tune.randint(0, 8),
    # ...
}
tune.run(
    train_on_validation_set,
    config=config,
    num_samples=4  # M
    # ...
)

Adding a grid_search parameter will automatically multiply the number of trials you’re running by the number of grid search parameters. I.e. the num_samples=4 means that 4 samples are collected for each validation set, so 12 total trials in this example.

Would that approach work for you?

Hey @kai,

thank you for your reply! Your suggestion sounds good. Unfortunately, I need the same hyper-parameter configuration for each validation set. As far as I understand you, using the grid search parameter would lead to new hyper-parameter configurations for each validation set.

To clarify the nested loops situation a bit: In a sequential manner, I would do something like this:

for perf_train_set, perf_test_set in split_data_set(data_set, num_parts=K):
    for train_set, val_set in split_data_set(perf_train_set, num_parts=K):
        rng = np.random.default_rng(seed=123)
        config = {
            'hyper-param': tune.sample_from(lambda _: rng.uniform(a, b)),
            # ...
        }
        tune.run(
            partial(hyper_para_opt, train_set, val_set), 
            config=config, 
            num_sampls=4
        )

In this case, I would get N = K * K validation sets. In order to speed up the process, I serialized all N validation sets in advance and would start the Tune runs in parallel for each of the N validation sets loaded from disc, just as you guessed. However, as you can see from the rng variable, I would need the exact same hyper-parameter configuration on each validation set. Would your suggestion support this, too?

Currently, I am using Ray 1.2.0. In case of using CPU only I would run into an error regarding a closed socket. In the GPU case, there simply wouldn’t be some kind of queue, i.e. all available GPUs would already be in use but Ray (or Tune) would keep on pushing trials.

So tune.grid_search currently does not keep other parameters constant - you would thus end up with different other parameters. So this is probably not what you want.

Can you move the second split into the trainable? I.e. that part: for train_set, val_set in split_data_set(perf_train_set, num_parts=K)? This would also have the advantage that you could potentially employ something like early stopping (e.g. when using ASHA): When performance is bad for a number of splits (all with the same parameter configuration), the trial can be aborted.

For the error messages, if you could post the actual error messages or even a reproduction script this would be helpful so we can look into it further.

Wow, thank you for that fast reply!

Just to clarify, K is not a hyper-parameter I am trying to optimize. K will be set manually in advance (mostly K = 10).

I have a model with several hyper-parameters like an L2 regularization coefficient inside my trainable hyper_para_opt , which need to be optimized. Would you still suggest to move the inner loop into the trainable? As a matter of fact, I am using the ASHA scheduler to monitor my validation performance metric.

I will be posting the actual error messages.

Dear @kai,

unfortunately, I haven’t had the time to reproduce the errors, yet. However, I want to give you a better picture of what my current approach to parallelizing with Ray and Tune is. So on my local machine I run something similar to this with PyTorch:

def fit_hyper_parameter(model, optimizer, objective, device, idx):
    """Function containing the actual training loop"""
    # Get specific training and test set based on some idx I specified in advance
    train_loader, test_loader = get_inner_data_loaders(outer_inner_idx=idx)

    for itr in range(1, (batches_per_epoch * 100) + 1):
        # Perform training loop on one GPU

        with torch.no_grad():
            if itr % batches_per_epoch == 0:
               # Compute some accuracy metric of test set and report to tune
                tune.report(val_loss=accuracy_metric)

def init_model_selection(config, idx=None):
    """Function instantiating model, objective, optimizer"""
    device = torch.device('cuda')

    model = SomeModel(
        hyper_param_01=config['hyper_param_01'],
        hyper_param_02=config['hyper_param_02']
        ).to(device=device)    # Send model to GPU for training

    objective = objective_to_minimize.to(device)
    optimizer = some_optmization_method()
    
    # Call the training loop
    fit_hyper_parameter(
        model=model, 
        optimizer=optimizer, 
        objective=objective, 
        device=device, 
        idx=idx
        )


@ray.remote
def run_tune(num_trials, idx):
    """Run a single Tune experiment"""
    scheduler = ASHAScheduler(
        metric='val_loss',
        mode='min',
        max_t=5,
        grace_period=3,
        reduction_factor=2
    )
    rng = np.random.default_rng(seed=12345)
    config = {
        'hyper_param_01': tune.sample_from(lambda _: rng.integers(a, b)),
        'hyper_param_02': tune.sample_from(lambda _: rng.integers(a, b))
    }
    analysis = tune.run(
        partial(init_model_selection, idx=idx),
        scheduler=scheduler,
        resources_per_trial={'cpu': 1, 'gpu': 1},    # One GPU per trial
        num_samples=num_trials,
        config=config,
    )

    # Return the idx for each specific data set in order to differentiate
    # between them later on when evaluating the results
    return [analysis.results_df, idx]


def main():
    outer_num_folds = 10
    inner_num_folds = 10
    data, target = generate_data()    # Full data set
    split_serialize(
        data=data, 
        target=target, 
        outer_num_folds=outer_num_folds, 
        inner_num_folds=inner_num_folds
        )
    idx = [tuple(i) for i in product(range(outer_num_folds), range(inner_num_folds))]
    num_trials = 25  # Number of hyper-parameter configurations
    select_ids = []
    eval_ids = []

    # A validation metric that each Tune experiment over a validation set returns
    val_metric = np.zeros((num_trials, outer_num_folds))

    # Start several Tune experiments in parallel
    for i in range(len(idx)):
        future_id = run_tune.remote(num_trials, idx[i])
        select_ids.append(future_id)

    # Wait until an experiment is finished and process the results
    while select_ids:
        done_ids, select_ids = ray.wait(select_ids)
        val_metric = process_results(ray.get(done_ids[0]), val_metric)

This is able to run as expected on my local machine (32 core CPU, 3 GPUs). I get a lot of warnings, but in the end I get the results I would expect.