Does Ray Tune restore ignore max_concurrent_trials when restarting errored trials?

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Does Ray Tune restore ignore max_concurrent_trials when restarting errored trials?

I noticed when running a test experiment with Ray tune that when an experiment is restored with the flag restart_errored=True, it reschedules and runs all of the previously errored trials ignoring the max_concurrent_trials value.
Is there a way to ensure restarting errored trials respects max_concurrent_trials on restore?

Example:

  1. Run experiment
import time
# ray==2.4.0
from ray import air, tune

def train_test(config):
    time.sleep(10)
    if config["value3"]:
        value = config["value1"] + config["value2"]
    else:
        value = -1

    raise ValueError('Mock error...')
    return {"score": value}

# Define trial parameters as a single grid sweep.
trial_space = {
    "value1": tune.grid_search(range(0, 2)),
    "value2": tune.grid_search(range(0, 2)),
    "value3": tune.grid_search([True, False]),
}

train_model = tune.with_resources(train_test, {"cpu": 1})

# Start a Tune run and print the best result.
# max_concurrent_trials=2
tuner = tune.Tuner(train_model,
             param_space=trial_space, 
             run_config=air.RunConfig(local_dir="some_path", log_to_file=True),
             tune_config=tune.TuneConfig(num_samples=1, max_concurrent_trials=2))
results = tuner.fit()
  1. Cancel / pause experiment after two trials errored

  2. Restore experiment

# restore the experiment
tuner = tune.Tuner.restore(path="some_path/experiment_run",
                           trainable=train_model,
                           restart_errored=True)
results = tuner.fit()
  1. I notice 4 trials queued and run at the same time

Thank you!

Hi @looni,

the concurrency limiter should be recovered correctly (it is for me). However, because the trials have already been sampled from the underlying searcher, they are all enqueued on restore.

The main use case for the concurrency limiter is to avoid sampling too often from a searcher before reporting any results. As you can imagine, this will lead to a random search in most cases as the searcher can only make better suggestions after receiving results and fitting its internal model.

Thus, when restoring, this is not a concern so we just restore all trials that previously existed and enqueue them. Concurrency is then usually limited by the available cluster resources.

I understand that this is not necessary what is expected in the case of resume. We can put this on our backlog to resolve if this is a pain.

Hey @kai,

Thanks for the reply

I am using the max_concurrent_trials feature not specifically for searcher functionality of the experiment, but as an attempt to limit resources used for my trials.

I am not using Ray Tune how it is ideally intended to be used, so I guess that’s why this issue I am facing is not much a concern for Ray Tune.
Essentially I am using a local ray cluster on my machine to run the ray tune experiment, and using a second ray CLI cluster within each trial to run some compute heavy tasks in parallel to get the results for the trial.
To limit the amount of CPU resources load on the remote cluster, I am using the max_concurrent_trials value. The reason I am using this non ideal approach with a second ray CLI cluster within the trial, as opposed to just using one remote ray cluster, and throwing all the ray tune and parallel execution tasks into the one cluster, is because I want to save some custom files and results from the computation of the trials locally on my machine after they complete, which is possible when using ray CLI.

If any of the trials fail, and end up resuming and running concurrently over the number value max_concurrent_trials, the remote cluster would then be overloaded with the amount of tasks it could handle in parallel.

You can see an example of what I am trying to do below:

import time
# ray==2.4.0
from ray import air, tune

@ray.remote(num_cpus=1)
def compute_heavy_task(config) -> int:
    time.sleep(600)
    some_computed_value = config["value1"]
    return some_computed_value

def train_test(config):
    # use remote cluster with Ray Client: Interactive Development
    # total CPU resources on remote cluster = 8CPU
    ray_trial_remote_session = ray.init("ray://localhost:10001", allow_multiple=True)
    
    # split up tasks that take a long time in 4CPU parallel chunks to compute a required result
    # since max_concurrent_trials = 2 in the experiment, the remote cluster never goes over 8 CPU usage at a time
    with ray_trial_remote_session:        
        tasks = []
        for index in range(0, 4):
            tasks.append(compute_heavy_task.remote(config))

        computed_tasks = ray.get(tasks)

    # combine results from parallel execution to get final result for trial
    value = computed_tasks.sum()

    # save some files on local machine based on results (we are using Ray CLI, so we have access to local machine)
    save_to_local_files(value)

    # disconnect from remote cluster
    ray_trial_remote_session.disconnect()

    return {"score": value}

# run ray tune experiment on local machine
ray.init()

# Define trial parameters as a single grid sweep.
trial_space = {
    "value1": tune.grid_search(range(0, 2)),
    "value2": tune.grid_search(range(0, 2)),
    "value3": tune.grid_search([True, False]),
}

train_model = tune.with_resources(train_test, {"cpu": 1})

# Start a Tune run and print the best result.
# max_concurrent_trials=2
tuner = tune.Tuner(train_model,
             param_space=trial_space, 
             run_config=air.RunConfig(local_dir="some_path", log_to_file=True),
             tune_config=tune.TuneConfig(num_samples=1, max_concurrent_trials=2))
results = tuner.fit()

Now this does not seem to be the ideal usage of ray tune, so I understand that fixing the use of max_concurrent_trials to work as I am hoping it would on resume is not a priority. But if it could be put into the backlog, or you have any other recommendations on my approach, that would be much appreciated!

Thank you so much for the help, and the awesome framework!