arrow::fs::FinalizeS3 was not called even though S3 was initialized. This could lead to a segmentation fault at exit

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

my trials are keep getting terminated and my models score zero with this error : C:\arrow\cpp\src\arrow\filesystem\s3fs.cc:2598: arrow::fs::FinalizeS3 was not called even though S3 was initialized. This could lead to a segmentation fault at exit.

i would appreciate some help . i use ray 2.4.0 and conda for windows

ray.init(_metrics_export_port=9191)
input_size = X_train.shape[1]
num_cores = 16

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers, dropout):
        super(LSTMModel, self).__init__()

        self.lstm = nn.LSTM(input_size,  hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
        self.linear = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = x.view(x.shape[0], -1, input_size)
        h_0, c_0 = self.init_hidden(x.shape[0], x.device)
        out, _ = self.lstm(x, (h_0, c_0))
        out = self.linear(out[:, -1])
        return out.squeeze()

    def init_hidden(self, batch_size, device):
        h_0 = torch.zeros(self.lstm.num_layers, batch_size, self.lstm.hidden_size).to(device)
        c_0 = torch.zeros(self.lstm.num_layers, batch_size, self.lstm.hidden_size).to(device)
        return h_0, c_0

def train_model(model, optimizer, criterion, data_loader, device, scaler, scheduler):
    model.train()
    total_loss = 0
    for x_batch, y_batch in data_loader:
        x_batch, y_batch = x_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            output = model(x_batch)
            loss = criterion(output, y_batch)

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)  # to check for any possible inf/nan gradients

        # step the optimizer manually
        optimizer.step()

        # update the scaler
        scaler.update()

        # step the scheduler after the optimizer
        scheduler.step()

        total_loss += loss.item()

    return total_loss



def evaluate_model(model, data_loader, device):
    model.eval()
    predictions = []
    with torch.no_grad():
        for x_batch, y_batch in data_loader:
            x_batch = x_batch.to(device)
            output = model(x_batch)
            predictions.extend(torch.sigmoid(output).detach().cpu().numpy().flatten())
    return predictions

def objective(trial, device):
    
    hidden_size = trial.suggest_int('hidden_size', 500, 2000)
    num_layers = trial.suggest_int('num_layers', 1, 5)
    dropout = trial.suggest_float('dropout', 0.0, 0.5)
    lr = trial.suggest_float('lr', 1e-5, 1.0, log=True)
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD', 'AdamW'])
    batch_size = trial.suggest_int('batch_size', 32, 256)
    scheduler_name = trial.suggest_categorical('lr_scheduler', ['StepLR', 'ExponentialLR'])
    gamma = trial.suggest_float('gamma', 0.05, 1.0)
    step_size = trial.suggest_int('step_size', 1, 100)

    model = LSTMModel(input_size, hidden_size, 1, num_layers, dropout)
    model.to(device)
    
    scaler = torch.cuda.amp.GradScaler()
    criterion = nn.MSELoss()

    optimizer_classes = {
        'Adam': torch.optim.Adam,
        'RMSprop': torch.optim.RMSprop,
        'SGD': torch.optim.SGD,
        'AdamW': torch.optim.AdamW
    }
    optimizer = optimizer_classes[optimizer_name](model.parameters(), lr=lr)

    scheduler_classes = {
        'StepLR': torch.optim.lr_scheduler.StepLR,
        'ExponentialLR': torch.optim.lr_scheduler.ExponentialLR,
    }

    if scheduler_name == 'StepLR':
        scheduler = scheduler_classes[scheduler_name](optimizer, step_size=step_size, gamma=gamma)
    elif scheduler_name == 'ExponentialLR':
        scheduler = scheduler_classes[scheduler_name](optimizer, gamma=gamma)

    train_data_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, pin_memory=True)
    val_data_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, pin_memory=True)

    for epoch in range(40):
        train_loss = train_model(model, optimizer, criterion, train_data_loader, device, scaler, scheduler)
        intermediate_value = 1.0 / (train_loss + 1e-5)
        trial.report(intermediate_value, epoch)
        if trial.should_prune():
            raise optuna.TrialPruned()

    predictions_val = evaluate_model(model, val_data_loader, device)
    binary_predictions_val = (np.array(predictions_val) > 0.5).astype(int)
    binary_labels_val = y_val.numpy().reshape(-1)
    f1_val = f1_score(binary_labels_val, binary_predictions_val)

    trial.set_user_attr("f1_val", f1_val)

    return f1_val

def trainable(config, checkpoint_dir=None):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    trial = optuna.trial.FixedTrial(config)
    result = objective(trial, device)
    tune.report(score=result)
                  
if __name__ == "__main__":
    resources_per_trial = {"gpu": 1, "cpu": num_cores} if torch.cuda.is_available() else {"cpu": num_cores}
    scheduler = MedianStoppingRule(metric="score", mode="max")
    search_alg = OptunaSearch(metric="score", mode="max")

    analysis = tune.run(
        trainable,
        config={
            "input_size": input_size,
            "hidden_size": tune.randint(500, 2000),
            "num_layers": tune.randint(1, 5),
            "dropout": tune.uniform(0.0, 0.5),
            "lr": tune.loguniform(1e-5, 1.0),
            "optimizer": tune.choice(['Adam', 'RMSprop', 'SGD', 'AdamW']),
            "batch_size": tune.randint(32, 256),
            "lr_scheduler": tune.choice(['StepLR', 'ExponentialLR']),
            "gamma": tune.uniform(0.05, 1.0),
            "step_size": tune.randint(1, 100),
        },
        resources_per_trial=resources_per_trial,
        num_samples=15,
        scheduler=scheduler,
        search_alg=search_alg,
    )

    best_parameters = analysis.get_best_config(metric="score", mode="max")
    best_trial = analysis.get_best_trial(metric="score", mode="max")
    
    print('Best Trial: score {},\nparams {}'.format(best_trial.last_result["score"], best_parameters))

    for trial in analysis.trials:
        print(f"Trial {trial.trial_id}, F1 score: {trial.last_result['score']}")
        
    ray. Shutdown()

I opened a github issue about this [Tune] FinalizeS3 was not called even though S3 was initialized · Issue #35825 · ray-project/ray · GitHub
we will hopefully get to this soon.
thanks for the report.

1 Like

oh, thank you, i have one already opened C:\arrow\cpp\src\arrow\filesystem\s3fs.cc:2598: arrow::fs::FinalizeS3 was not called even though S3 was initialized. This could lead to a segmentation fault at exit · Issue #35771 · ray-project/ray · GitHub ; just tell me if you want the previous one removed.