Error When Trying to Tune a Trainable Function

Yes, below are the relevant code and error traceback. If you look at the error, this is also related to the spilled objects not being cleaned up properly, which was already mentioned [here]( Troubleshooting Ray Train on Windows - Ray AIR (Data, Train, Tune, Serve) - Ray). You said to mention it in Ray Core, so should I create a separate issue for this?

Anyways below are the relevant code and full error traceback. Note that if I put in a config parameter in the trainable, which is the run_training() function, I get the previously mentioned error " raise SessionMisuseError(
ray.train.error.SessionMisuseError: prepare/accelerate utility functions should be called inside a training function executed by Trainer.run" which is not shown in the code below.

Code:

search_space = {“batch_size”: model_config.batch_size,“epochs”: model_config.epochs, “learning_rate”: tune.grid_search([1.4e-2,1.4e-3,1.4e-4]),
“T_max”: model_config.T_max, “weight_decay”: model_config.weight_decay, “eta_min”: model_config.eta_min, “binary_threshold”: tune.grid_search([0.1,0.3,0.5,0.7]), “alpha”: tune.grid_search([0.3,0.5,0.7]), “beta”: tune.sample_from(lambda x: 1 - x.config.alpha)}

def convert_batch_to_numpy(batch) → Dict[str, np.ndarray]:
images = np.stack([np.array(image) for image, _ in batch[“item”]])
masks = np.stack([np.array(masks) for _, masks in batch[“item”]])
return {“image”: images, “mask”: masks}

#load dataset
def load_dataset():
model_df_train = new_df_train.reset_index(drop=True)
model_df_val = new_df_val.reset_index(drop=True)
train_dataset = TrainDataSet(df=model_df_train, transforms=train_transforms)
val_dataset = TrainDataSet(df=model_df_val, transforms=val_transforms)
return train_dataset, val_dataset #return train and val datasets
def loss_func(y_pred, y_true): #weighted avg of the two, also explore different weighting and combinations if possible.
return hyp_config.alpha * dice_loss_func(y_pred,y_true) + hyp_config.beta * iou_loss_func(y_pred,y_true)

#code to train one epoch:
def epoch_train(model, optimizer, scheduler):
model.train() #set mode to train
dataset_size = 0 #initialize
running_loss = 0.0 #initialize
scaler = GradScaler() #enable GradScaler for gradient scaling, necessary for prevention of underflow of using fp16 using autocast below
train_dataset_shard = session.get_dataset_shard(“train”) #ray method of dataloaders, getting each batch
train_dataset_batches = train_dataset_shard.iter_torch_batches(batch_size=model_config.batch_size) #ray method of dataloaders, getting each batch
pbar = tqdm(enumerate(train_dataset_batches),colour=‘red’,desc=‘Training’)
for idx, batch in pbar:
batch_size = model_config.batch_size #return batch size N.
images,masks = batch[“image”],batch[“mask”]
with autocast(enabled=True, dtype=torch.float16): #enable autocast for fp16 training, faster forward pass
y_pred, _ = model(images) #forward pass
loss = loss_func(y_pred, masks) #compute losses from y_pred
loss = loss / model_config.iters_to_accumulate #need to normalize since accumulating gradients
scaler.scale(loss).backward() #backward pass, make sure it is not within autocast
if (idx + 1) % model_config.iters_to_accumulate == 0 : #scale updates should only happen at each # of iters to accumulate
scaler.step(optimizer) #take optimizer step
scaler.update() #update scale for next iteration
optimizer.zero_grad() #zero the accumulated scaled gradients
scheduler.step() #change lr,make sure to call this after scaler.step
running_loss += (loss.item() * batch_size) #update current running loss for all images in batch
dataset_size += batch_size #update current datasize
epoch_loss = running_loss / dataset_size #get current epoch average loss
current_lr = optimizer.param_groups[0][‘lr’]
pbar.set_postfix(train_loss=f’{epoch_loss:0.4f}‘,
lr=f’{current_lr:0.5f}') #print current epoch loss and lr
torch.cuda.empty_cache() #clear gpu memory after every epoch
gc.collect() #collect garbage
return epoch_loss #return loss for this epoch

@torch.no_grad() #disable gradient calc for validation
def epoch_valid(model):
model.eval() #set mode to eval
dataset_size = 0 #initialize
running_loss = 0.0 #initialize
valid_ap_history = #initialize
val_dataset_shard = session.get_dataset_shard(“valid”) #ray method of dataloaders, getting each batch
val_dataset_batches = val_dataset_shard.iter_torch_batches(batch_size=model_config.batch_size) #ray method of dataloaders, getting each batch
pbar = tqdm(enumerate(val_dataset_batches),colour=‘red’,desc=‘Validating’)
for idx, batch in pbar:
images,masks = batch[“image”],batch[“mask”]
y_pred, _ = model(images) #forward pass
loss = loss_func(y_pred, masks) #calculate loss
running_loss += (loss.item() * model_config.batch_size) #update current running loss
dataset_size += model_config.batch_size #update current datasize
epoch_loss = running_loss / dataset_size #divide epoch loss by current datasize
masks = masks.squeeze(0)
y_pred_prob = nn.Sigmoid()(y_pred) #get prob by applying sigmoid to logit y_pred
valid_ap = iou_map(masks.cpu().numpy(), y_pred_prob.cpu().numpy(), verbose=0)
valid_ap_history.append(valid_ap)
pbar.set_postfix(valid_loss=f’{epoch_loss:0.3f}')
valid_ap_history = np.mean(valid_ap_history, axis=0) #store mean AP
torch.cuda.empty_cache() #clear gpu memory after every epoch
gc.collect() #collect garbage
return epoch_loss, valid_ap_history #return loss and AP for this epoch

#function that utilizes above train and validation function to iterate them over training epochs, master train code.
#for ray, this is the “train_loop_per_worker” function to run in TorchTrainer()
def run_training():
start = time.time() #measure time
print(f"Training for Fold {model_config.current_fold}")
model = build_model() #build model
model = train.torch.prepare_model(model)
print(model)
best_model_wts = copy.deepcopy(model.state_dict())
best_ap = 0 #initial best AP
best_epoch = -1 #initial best epoch
history = defaultdict(list) #history defaultdict to store relevant variables
num_epochs = search_space[“epochs”]
optimizer = optim.Adam(model.parameters(),
lr=search_space[“learning_rate”],
weight_decay=search_space[“weight_decay”]) #initialize optimizer
scheduler = lr_scheduler.CosineAnnealingLR(optimizer,
T_max=search_space[“T_max”],
eta_min=search_space[“eta_min”]) #initialize LR scheduler

for epoch in range(1, num_epochs + 1): #iter over num total epochs
    gc.collect()
    print(f"Current Epoch {epoch} / Total Epoch {num_epochs}")
    print(f'Epoch {epoch}/{num_epochs}', end='')
    train_loss = epoch_train(model, optimizer, scheduler) #train one epoch
    valid_loss, valid_ap_history = epoch_valid(model) #valid one epoch
    valid_ap = valid_ap_history
    checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model=model.state_dict()))
    session.report(dict(loss=valid_loss,ap = valid_ap),checkpoint=checkpoint)
    history['Train Loss'].append(train_loss)
    history['Valid Loss'].append(valid_loss)
    history['Valid AP'].append(valid_ap)
    print(f'Valid AP: {valid_ap:0.4f}')
    #if AP improves, save the best model
    if valid_ap >= best_ap:
        print(f"Valid Score Improved ({best_ap:0.4f} ---> {valid_ap:0.4f})")
        best_ap = valid_ap
        best_epoch = epoch
        best_model_wts = copy.deepcopy(model.state_dict())
        PATH = os.path.join(model_config.model_save_directory, f"best_epoch-{model_config.current_fold:02d}.pt")
        if not os.path.exists(model_config.model_save_directory):
            os.makedirs(model_config.model_save_directory)
        torch.save(model.state_dict(), PATH)
        print("Model Saved!")
    print(f'Best AP so far: {best_ap:0.4f}')
    print(f'Best AP at epoch #: {best_epoch:d}')

    #also save the most recent model
    PATH = os.path.join(model_config.model_save_directory, f"latest_epoch-{model_config.current_fold:02d}.pt")
    if not os.path.exists(model_config.model_save_directory):
        os.makedirs(model_config.model_save_directory)
    torch.save(model.state_dict(), PATH)

end = time.time()
time_elapsed = end - start
print('Training complete in {:.0f}h {:.0f}m'.format(
    time_elapsed // 3600, (time_elapsed % 3600) // 60))
print("Best AP@ 0.6IOU: {:.4f}".format(best_ap))

#load best model weights
model.load_state_dict(best_model_wts)

pkl_save_path = os.path.join(model_config.model_save_directory, 'history.pickle')
#save history as pkl:
with open(pkl_save_path, 'wb') as file:
    pickle.dump(history, file)
print(f"Finished Training for fold {model_config.current_fold}")

train_dataset, val_dataset = load_dataset() #load datasets
train_dataset: ray.data.Dataset = ray.data.from_torch(train_dataset)
val_dataset: ray.data.Dataset = ray.data.from_torch(val_dataset)
train_dataset= train_dataset.map_batches(convert_batch_to_numpy).materialize()
val_dataset = val_dataset.map_batches(convert_batch_to_numpy).materialize()

sched = AsyncHyperBandScheduler()
tuner = Tuner(trainable = run_training, param_space = search_space,tune_config = TuneConfig(metric = “ap”,mode = “max”, scheduler = sched, num_samples = 2), run_config= RunConfig(name = “tune_trial_1”))
results = tuner.fit()
print(“Best config is:”, results.get_best_result().config)

Full error traceback:

Traceback (most recent call last):
File “C:\Users\Kevin\PycharmProjects\hubmap\models\unet++\tune.py”, line 482, in
results = tuner.fit()
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\tuner.py”, line 347, in fit
return self._local_tuner.fit()
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\impl\tuner_internal.py”, line 588, in fit
analysis = self._fit_internal(trainable, param_space)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\impl\tuner_internal.py”, line 703, in _fit_internal
analysis = run(
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\tune.py”, line 857, in run
experiments[i] = Experiment(
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\experiment\experiment.py”, line 219, in init
self._run_identifier = Experiment.register_if_needed(run)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\experiment\experiment.py”, line 411, in register_if_needed
register_trainable(name, run_object)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\registry.py”, line 105, in register_trainable
trainable = wrap_function(trainable, warn=warn)
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray\tune\trainable\function_trainable.py”, line 601, in wrap_function
raise ValueError(
ValueError: Unknown argument found in the Trainable function. The function args must include a ‘config’ positional parameter. Any other args must be ‘checkpoint_dir’. Found:
2023-08-16 13:29:36,898 ERROR external_storage.py:360 – Error cleaning up spill files. You might still have remaining spilled objects inside ray_spilled_objects directory.
Traceback (most recent call last):
File “C:\Users\Kevin.conda\envs\hubmap\lib\site-packages\ray_private\external_storage.py”, line 354, in _destroy_external_storage
shutil.rmtree(directory_path)
File “C:\Users\Kevin.conda\envs\hubmap\lib\shutil.py”, line 750, in rmtree
return _rmtree_unsafe(path, onerror)
File “C:\Users\Kevin.conda\envs\hubmap\lib\shutil.py”, line 620, in _rmtree_unsafe
onerror(os.unlink, fullname, sys.exc_info())
File “C:\Users\Kevin.conda\envs\hubmap\lib\shutil.py”, line 618, in _rmtree_unsafe
os.unlink(fullname)
PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: ‘C:\Users\Kevin\AppData\Local\Temp\ray\session_2023-08-16_13-26-56_752001_22152\ray_spilled_objects\7d139acfc7ff47669394a46290f93923-multi-12’