Scheduler early stopping causes resource lock when starting next trial for DDP

1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.

2. Environment:

  • Ray version: 2.44.1
  • Python version: 3.10.12
  • OS: Linux
  • Cloud/Infrastructure: Local

3. What happened vs. what you expected:

  • Expected: Ray Tune to start next train after Scheduler terminates a train
  • Actual: get stuck trying to run next trial. Looping error:
The worker group startup timed out after 30.0 seconds waiting for 6 workers. Potential causes include: 
(1) temporary insufficient cluster resources while waiting for autoscaling (ignore this warning in this case), 
(2) infeasible resource request where the provided `ScalingConfig` cannot be satisfied), and 
(3) transient network issues. Set the RAY_TRAIN_WORKER_GROUP_START_TIMEOUT_S environment variable to increase the timeout.

I am using Tuner to start a ray.train.torch.TorchTrainer that uses ray.train.torch.prepare_model and ray.train.torch.prepare_data_loader with a custom sampler. Need to do this for DDP.

I have tried the following to resolve:
1. os.environ['RAY_kill_worker_timeout_milliseconds'] = '60000'
2. resources_per_worker={"CPU": 1} in the ScalingConfig for the TorchTrainer
3. Remove the num_workers parameter from my data loader creation

current workaround is having the grace period for the scheduler be higher than the epochs.

Current Setup to reproduce:

Tuner

# Create a Ray Tune scheduler
scheduler = ASHAScheduler(
    metric="iou",
    mode="max",
    max_t=num_epochs,
    grace_period=1,
    reduction_factor=2
)
os.makedirs("/workspace/ray_tune_results", exist_ok=True)
tuner = tune.Tuner(
    tune.with_parameters(tune_train_driver_fn, train_config=train_config),
    param_space=param_space,
    tune_config=tune.TuneConfig(
        num_samples=15,
        max_concurrent_trials=1,
        scheduler=scheduler,
    ),
    run_config=ray.tune.RunConfig(
        name=f"{model_name}_tune",
    )
)

Trainer

def tune_train_driver_fn(config, train_config):
    train_config["encoder_name"] = config["encoder_name"]
    train_config["loss_weights"] = config["loss_weights"]
    train_config["learning_rate"] = config["lr"]

    model_name = train_config["model_name"]

    trainer = ray.train.torch.TorchTrainer(
        train_fn_per_worker,
        train_loop_config=train_config,
        scaling_config=ray.train.ScalingConfig(
            num_workers=train_config["num_workers"],
            use_gpu=True,
            resources_per_worker={"CPU": 1},
        ),
        run_config=ray.train.RunConfig(
            name=f"{model_name}_train_trial_id={ray.tune.get_context().get_trial_id()}",
            storage_path="/workspace/ray_train_results",
            callbacks=[TuneReportCallback()],
            checkpoint_config=ray.train.CheckpointConfig(
                num_to_keep=5,
                checkpoint_score_attribute="iou",
                checkpoint_score_order="max",
            ),
        ),
        torch_config=ray.train.torch.TorchConfig(
            backend="gloo",
        )
    )
    trainer.fit()

Similar setup to Hyperparameter Tuning with Ray Tune — Ray 2.44.1