I’ve been stuck on this for a LONG time.
I have this code which combines ray tune and pytorch lightning:
CHECKPOINT_PATH = "/home/pytorch_test/"
DATASET_PATH = "/home/pytorch_test/"
seqs_pkl = open('seqs.pkl','rb')
data_list_pkl = open('data_list.pkl','rb')
seqs = pickle.load(seqs_pkl)
data_list = pickle.load(data_list_pkl)
def train_val_test_split(train_cut_off,val_cut_off,test_cut_off,data_list,seqs):
if float(train_cut_off) > 0:
train_limit = round(len(data_list)*train_cut_off)
train_dataset = data_list[0:train_limit]
train_seqs = seqs[0:train_limit]
test_limit = round(len(data_list)*test_cut_off) + train_limit
test_dataset = data_list[train_limit:test_limit]
test_seqs = seqs[train_limit:test_limit]
val_limit = round(len(data_list)*val_cut_off) + test_limit
val_dataset = data_list[test_limit:val_limit]
val_seqs = seqs[test_limit:val_limit]
return train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs
def prep_training(train_percent,val_percent,test_percent,data_list=data_list,seqs=seqs):
train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs = train_val_test_split(train_percent,val_percent,test_percent,data_list,seqs)
graph_train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
graph_val_loader = DataLoader(val_dataset, batch_size=64)
graph_test_loader = DataLoader(test_dataset, batch_size=64) #ultimately end up similar to what Ale had, but a bit different (e.g. pytorch format, need edges both ways, target per graph not node)
return train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs,graph_train_loader,graph_val_loader,graph_test_loader
train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs,graph_train_loader,graph_val_loader,graph_test_loader = prep_training(0.8,
0.1,
0.1,
data_list=data_list,
seqs=seqs,
)
gnn_layer_by_name = {
"GCN": geom_nn.GCNConv,
"GraphConv": geom_nn.GraphConv
}
class GNNModel(nn.Module):
def __init__(self, c_in, c_hidden, c_out, num_layers, activation_function, optimizer_name, learning_rate, dp_rate_linear,layer_name="GCN",**kwargs):
super().__init__()
gnn_layer = geom_nn.GCNConv #gnn_layer_by_name[layer_name]
layers = []
activation_function = eval(activation_function) ##not great to use
in_channels, out_channels = c_in, c_hidden
for l_idx in range(num_layers-1):
layers += [
gnn_layer(in_channels=in_channels,
out_channels=out_channels,
**kwargs),
activation_function,
nn.Dropout(p=dp_rate_linear)
]
in_channels = c_hidden
layers += [gnn_layer(in_channels=in_channels,
out_channels=c_out,
**kwargs)]
self.layers = nn.ModuleList(layers)
def forward(self, x, edge_index):
for l in self.layers:
if isinstance(l, geom_nn.MessagePassing): #passing data between conv
x = l(x, edge_index)
else:
x = l(x)
return x
class GraphGNNModel(nn.Module):
def __init__(self, c_in, c_hidden, c_out, dp_rate_linear,num_layers,activation_function,optimizer_name,learning_rate):
super().__init__()
self.GNN = GNNModel(c_in=c_in,
c_hidden=c_hidden,
c_out=c_hidden,
dp_rate_linear = dp_rate_linear,
num_layers=num_layers,
activation_function=activation_function,
optimizer_name=optimizer_name,
learning_rate=learning_rate)
self.head = nn.Sequential(
nn.Dropout(p=dp_rate_linear),
nn.Linear(c_hidden, c_out)
)
def forward(self, x, edge_index, batch_idx):
x = self.GNN(x, edge_index)
x = geom_nn.global_mean_pool(x, batch_idx)
x = self.head(x)
return x
class GraphLevelGNN(pl.LightningModule):
def __init__(self,**model_kwargs):
super().__init__()
# Saving hyperparameters
self.save_hyperparameters()
self.model = GraphGNNModel(**model_kwargs)
self.loss_module = nn.BCEWithLogitsLoss() if self.hparams.c_out == 1 else nn.CrossEntropyLoss()
self.optimizer_name = model_kwargs['optimizer_name']
self.learning_rate = model_kwargs['learning_rate']
def forward(self, data, mode="train"):
x, edge_index, batch_idx = data.x, data.edge_index, data.batch
x = self.model(x, edge_index, batch_idx)
x = x.squeeze(dim=-1)
if self.hparams.c_out == 1:
preds = (x > 0).float()
data.y = data.y.float()
else:
preds = x.argmax(dim=-1)
loss = self.loss_module(x, data.y.float())
acc = (preds == data.y).sum().float() / preds.shape[0]
data.y = data.y.int()
preds = preds.int()
f1 = F1Score(num_classes=1,multiclass=False) #change num_classes
f1_score = f1(preds,data.y)
precision = Precision(num_classes=1,multiclass=False)
precision_score=precision(preds,data.y)
recall = Recall(num_classes=1,multiclass=False)
recall_score=recall(preds,data.y)
return loss, acc, f1_score,precision_score, recall_score,preds
def configure_optimizers(self):
learning_rate = self.learning_rate
if self.optimizer_name == 'SGD':
optimizer = optim.SGD(self.parameters(),lr=learning_rate)
elif self.optimizer_name == 'NAdam':
optimizer = optim.NAdam(self.parameters(), lr=learning_rate)
elif self.optimizer_name == 'Adam':
optimizer = optim.Adam(self.parameters(), lr=learning_rate)
return optimizer
def training_step(self, batch, batch_idx):
loss, acc, _,_,_,_ = self.forward(batch, mode="train")
self.log('train_loss', loss,on_epoch=True,logger=True,batch_size=64)
self.log('train_acc', acc,on_epoch=True,logger=True,batch_size=64)
return loss
def validation_step(self, batch, batch_idx):
loss, acc, _,_,_,_ = self.forward(batch, mode="val")
self.log('val_acc', acc,on_epoch=True,logger=True,batch_size=64)
self.log('val_loss', loss,on_epoch=True,logger=True,batch_size=64)
def test_step(self, batch, batch_idx):
loss,acc, f1,precision, recall,preds = self.forward(batch, mode="test")
self.log('test_acc', acc,on_epoch=True,logger=True,batch_size=64)
self.log('test_f1', f1,on_epoch=True,logger=True,batch_size=64)
self.log('test_precision', precision,on_epoch=True,logger=True,batch_size=64)
self.log('test_recall', recall,on_epoch=True,logger=True,batch_size=64)
scheduler_asha = ASHAScheduler(
max_t=100,
grace_period=1,
mode="min",
metric='val_loss',
reduction_factor=2,
)
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback
tune_report_callback = TuneReportCheckpointCallback(
metrics={
"val_loss": "val_loss",
"val_acc": "val_acc",
},
filename="ray_ckpt",
on="validation_end",
)
config = {
"c_hidden": tune.choice([32,64,128,256]),
"dp_rate_linear":tune.uniform(0.4,0.8),
"num_layers":tune.randint(3, 5),
"activation_function":tune.choice(['nn.ReLU(inplace=True)','nn.LeakyReLU(inplace=True)','nn.Sigmoid()','nn.Tanh()']),#,'nn.LeakyReLU(inplace=True)','nn.Sigmoid()','nn.Tanh()']),
"optimizer_name" : tune.choice(['SGD','NAdam', 'Adam']),
"learning_rate":tune.uniform(0.01,0.1),
}
def run_with_tune(config, graph_train_loader=graph_train_loader,graph_val_loader=graph_val_loader,graph_data_dir=None, epochs=5, gpus=0, trained_weights=None, num_layers=4, activation_function='nn.Tanh()',optimizer_name='Adam', learning_rate=0.01,c_in=5,c_out=1,c_hidden=64,dp_rate_linear=0.01,**model_kwargs):
model = GraphLevelGNN(c_in = c_in,
c_out= c_out,
c_hidden=c_hidden,
dp_rate_linear=dp_rate_linear,
num_layers=num_layers,
activation_function=activation_function,
optimizer_name=optimizer_name,
learning_rate=learning_rate)
trainer = pl.Trainer(
max_epochs=5,
#gpus=1,
default_root_dir="/home/pytorch_test/ray_logs", # path for saving logs
callbacks=[
tune_report_callback,
],
)
trainer.fit(model,graph_train_loader,graph_val_loader)
reporter = CLIReporter(parameter_columns=['c_hidden','dp_rate_linear','num_layers','activation_function','optimizer_name','learning_rate'],metric_columns=["val_loss", "val_acc", "training_iteration"])
result = tune.run(
tune.with_parameters(
run_with_tune,
data_dir="/home/pytorch_test/",
epochs=5,
#gpus=1
),
resources_per_trial={
"cpu": 1,
#"gpu": 0,
},
local_dir="/home/pytorch_test/ray_ckpt", # path for saving checkpoints
config=config,
num_samples=20,
scheduler=scheduler_asha,
progress_reporter=reporter,
name="test",
)
Which combines pytorch lightning and ray tune.
When I run the pytorch lightning part alone, training will run as expected, and finish consistently as expected.
When I run ray tune on a small test network, it will run and finish consistently, as expected.
When I try to merge the two, as in this script, ray tune will (1) only complete a small amount of trials (n=30 max) and (2) will not run consistently, i.e. sometimes it will finish, and sometimes, it will stall, usually with very few runs left to go. e.g. if I set num_samples = 30, it will stall and just saying ‘28 terminated, 2 running’ indefinitely, or if I try num_samples=200, it will stall at 192-196, and the last 4-8 will have a status of ‘running’ indefinitely (200 samples will never finish, <30 will sometimes finish and sometimes now).
I’ve been stuck on this for weeks at this point, I’ve tried changing the data set, adding in ray_init() memory conditions, commented out whatever I can to see if it helps, implemented ray tune two different ways (without hyperopt search, and the way above). I’m totally stuck, if anyone could help I’d appreciate it.