How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
my trials are keep getting terminated and my models score zero with this error : C:\arrow\cpp\src\arrow\filesystem\s3fs.cc:2598: arrow::fs::FinalizeS3 was not called even though S3 was initialized. This could lead to a segmentation fault at exit.
i would appreciate some help . i use ray 2.4.0 and conda for windows
ray.init(_metrics_export_port=9191)
input_size = X_train.shape[1]
num_cores = 16
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers, dropout):
super(LSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, batch_first=True, dropout=dropout)
self.linear = nn.Linear(hidden_size, output_size)
def forward(self, x):
x = x.view(x.shape[0], -1, input_size)
h_0, c_0 = self.init_hidden(x.shape[0], x.device)
out, _ = self.lstm(x, (h_0, c_0))
out = self.linear(out[:, -1])
return out.squeeze()
def init_hidden(self, batch_size, device):
h_0 = torch.zeros(self.lstm.num_layers, batch_size, self.lstm.hidden_size).to(device)
c_0 = torch.zeros(self.lstm.num_layers, batch_size, self.lstm.hidden_size).to(device)
return h_0, c_0
def train_model(model, optimizer, criterion, data_loader, device, scaler, scheduler):
model.train()
total_loss = 0
for x_batch, y_batch in data_loader:
x_batch, y_batch = x_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
with torch.cuda.amp.autocast():
output = model(x_batch)
loss = criterion(output, y_batch)
scaler.scale(loss).backward()
scaler.unscale_(optimizer) # to check for any possible inf/nan gradients
# step the optimizer manually
optimizer.step()
# update the scaler
scaler.update()
# step the scheduler after the optimizer
scheduler.step()
total_loss += loss.item()
return total_loss
def evaluate_model(model, data_loader, device):
model.eval()
predictions = []
with torch.no_grad():
for x_batch, y_batch in data_loader:
x_batch = x_batch.to(device)
output = model(x_batch)
predictions.extend(torch.sigmoid(output).detach().cpu().numpy().flatten())
return predictions
def objective(trial, device):
hidden_size = trial.suggest_int('hidden_size', 500, 2000)
num_layers = trial.suggest_int('num_layers', 1, 5)
dropout = trial.suggest_float('dropout', 0.0, 0.5)
lr = trial.suggest_float('lr', 1e-5, 1.0, log=True)
optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD', 'AdamW'])
batch_size = trial.suggest_int('batch_size', 32, 256)
scheduler_name = trial.suggest_categorical('lr_scheduler', ['StepLR', 'ExponentialLR'])
gamma = trial.suggest_float('gamma', 0.05, 1.0)
step_size = trial.suggest_int('step_size', 1, 100)
model = LSTMModel(input_size, hidden_size, 1, num_layers, dropout)
model.to(device)
scaler = torch.cuda.amp.GradScaler()
criterion = nn.MSELoss()
optimizer_classes = {
'Adam': torch.optim.Adam,
'RMSprop': torch.optim.RMSprop,
'SGD': torch.optim.SGD,
'AdamW': torch.optim.AdamW
}
optimizer = optimizer_classes[optimizer_name](model.parameters(), lr=lr)
scheduler_classes = {
'StepLR': torch.optim.lr_scheduler.StepLR,
'ExponentialLR': torch.optim.lr_scheduler.ExponentialLR,
}
if scheduler_name == 'StepLR':
scheduler = scheduler_classes[scheduler_name](optimizer, step_size=step_size, gamma=gamma)
elif scheduler_name == 'ExponentialLR':
scheduler = scheduler_classes[scheduler_name](optimizer, gamma=gamma)
train_data_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=batch_size, pin_memory=True)
val_data_loader = DataLoader(TensorDataset(X_val, y_val), batch_size=batch_size, pin_memory=True)
for epoch in range(40):
train_loss = train_model(model, optimizer, criterion, train_data_loader, device, scaler, scheduler)
intermediate_value = 1.0 / (train_loss + 1e-5)
trial.report(intermediate_value, epoch)
if trial.should_prune():
raise optuna.TrialPruned()
predictions_val = evaluate_model(model, val_data_loader, device)
binary_predictions_val = (np.array(predictions_val) > 0.5).astype(int)
binary_labels_val = y_val.numpy().reshape(-1)
f1_val = f1_score(binary_labels_val, binary_predictions_val)
trial.set_user_attr("f1_val", f1_val)
return f1_val
def trainable(config, checkpoint_dir=None):
device = "cuda" if torch.cuda.is_available() else "cpu"
trial = optuna.trial.FixedTrial(config)
result = objective(trial, device)
tune.report(score=result)
if __name__ == "__main__":
resources_per_trial = {"gpu": 1, "cpu": num_cores} if torch.cuda.is_available() else {"cpu": num_cores}
scheduler = MedianStoppingRule(metric="score", mode="max")
search_alg = OptunaSearch(metric="score", mode="max")
analysis = tune.run(
trainable,
config={
"input_size": input_size,
"hidden_size": tune.randint(500, 2000),
"num_layers": tune.randint(1, 5),
"dropout": tune.uniform(0.0, 0.5),
"lr": tune.loguniform(1e-5, 1.0),
"optimizer": tune.choice(['Adam', 'RMSprop', 'SGD', 'AdamW']),
"batch_size": tune.randint(32, 256),
"lr_scheduler": tune.choice(['StepLR', 'ExponentialLR']),
"gamma": tune.uniform(0.05, 1.0),
"step_size": tune.randint(1, 100),
},
resources_per_trial=resources_per_trial,
num_samples=15,
scheduler=scheduler,
search_alg=search_alg,
)
best_parameters = analysis.get_best_config(metric="score", mode="max")
best_trial = analysis.get_best_trial(metric="score", mode="max")
print('Best Trial: score {},\nparams {}'.format(best_trial.last_result["score"], best_parameters))
for trial in analysis.trials:
print(f"Trial {trial.trial_id}, F1 score: {trial.last_result['score']}")
ray. Shutdown()