Why does ray tune sometimes finish, and sometimes stalls on running when combined with pytorch lightning

I’ve been stuck on this for a LONG time.

I have this code which combines ray tune and pytorch lightning:


CHECKPOINT_PATH = "/home/pytorch_test/"
DATASET_PATH = "/home/pytorch_test/"


seqs_pkl = open('seqs.pkl','rb')
data_list_pkl = open('data_list.pkl','rb')

seqs = pickle.load(seqs_pkl)
data_list = pickle.load(data_list_pkl)


def train_val_test_split(train_cut_off,val_cut_off,test_cut_off,data_list,seqs):
    if float(train_cut_off) > 0:
        train_limit = round(len(data_list)*train_cut_off)
        train_dataset = data_list[0:train_limit]
        train_seqs = seqs[0:train_limit]

        test_limit = round(len(data_list)*test_cut_off) + train_limit
        test_dataset = data_list[train_limit:test_limit]
        test_seqs = seqs[train_limit:test_limit]
    
    
        val_limit = round(len(data_list)*val_cut_off) + test_limit
        val_dataset = data_list[test_limit:val_limit]
        val_seqs = seqs[test_limit:val_limit]

        return train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs





def prep_training(train_percent,val_percent,test_percent,data_list=data_list,seqs=seqs):

    train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs = train_val_test_split(train_percent,val_percent,test_percent,data_list,seqs)
    graph_train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    graph_val_loader = DataLoader(val_dataset, batch_size=64) 
    graph_test_loader = DataLoader(test_dataset, batch_size=64) #ultimately end up similar to what Ale had, but a bit different (e.g. pytorch format, need edges both ways, target per graph not node)

    return train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs,graph_train_loader,graph_val_loader,graph_test_loader



train_dataset,train_seqs,test_dataset,test_seqs,val_dataset,val_seqs,graph_train_loader,graph_val_loader,graph_test_loader = prep_training(0.8,
                   0.1,
                   0.1,
                   data_list=data_list,
                   seqs=seqs,
                   )



gnn_layer_by_name = {
    "GCN": geom_nn.GCNConv,
    "GraphConv": geom_nn.GraphConv
}



class GNNModel(nn.Module):
    
    def __init__(self, c_in, c_hidden, c_out, num_layers, activation_function, optimizer_name, learning_rate, dp_rate_linear,layer_name="GCN",**kwargs):

        super().__init__()
        gnn_layer = geom_nn.GCNConv #gnn_layer_by_name[layer_name]
        
        layers = []

        activation_function = eval(activation_function) ##not great to use
        in_channels, out_channels = c_in, c_hidden
        for l_idx in range(num_layers-1):
            layers += [
                gnn_layer(in_channels=in_channels, 
                          out_channels=out_channels,
                          **kwargs),
                activation_function,
                nn.Dropout(p=dp_rate_linear)
            ]
            in_channels = c_hidden
        layers += [gnn_layer(in_channels=in_channels, 
                             out_channels=c_out,
                             **kwargs)]
        self.layers = nn.ModuleList(layers)
    
        

    def forward(self, x, edge_index):
        for l in self.layers:
            if isinstance(l, geom_nn.MessagePassing): #passing data between conv
                x = l(x, edge_index) 
            else:
                x = l(x)
        return x




class GraphGNNModel(nn.Module):

    def __init__(self, c_in, c_hidden, c_out, dp_rate_linear,num_layers,activation_function,optimizer_name,learning_rate):

        super().__init__()
        self.GNN = GNNModel(c_in=c_in, 
                            c_hidden=c_hidden, 
                            c_out=c_hidden,
                            dp_rate_linear = dp_rate_linear, 
                            num_layers=num_layers,
                            activation_function=activation_function,
                            optimizer_name=optimizer_name,
                            learning_rate=learning_rate)


        self.head = nn.Sequential(
            nn.Dropout(p=dp_rate_linear),
            nn.Linear(c_hidden, c_out)
        )

    def forward(self, x, edge_index, batch_idx):

        x = self.GNN(x, edge_index)
        x = geom_nn.global_mean_pool(x, batch_idx) 
        x = self.head(x)
        return x





class GraphLevelGNN(pl.LightningModule):

    def __init__(self,**model_kwargs):
        super().__init__()


       # Saving hyperparameters
        self.save_hyperparameters()
        self.model = GraphGNNModel(**model_kwargs)
        self.loss_module = nn.BCEWithLogitsLoss() if self.hparams.c_out == 1 else nn.CrossEntropyLoss()
        self.optimizer_name = model_kwargs['optimizer_name']
        self.learning_rate = model_kwargs['learning_rate']


    def forward(self, data, mode="train"):
        x, edge_index, batch_idx = data.x, data.edge_index, data.batch
        x = self.model(x, edge_index, batch_idx)
        x = x.squeeze(dim=-1)
        
        if self.hparams.c_out == 1:
            preds = (x > 0).float()
            data.y = data.y.float()
        else:
            preds = x.argmax(dim=-1)

        loss = self.loss_module(x, data.y.float())
        acc = (preds == data.y).sum().float() / preds.shape[0]

        data.y = data.y.int()
        preds = preds.int()

        f1 = F1Score(num_classes=1,multiclass=False) #change num_classes
        f1_score = f1(preds,data.y)

        precision = Precision(num_classes=1,multiclass=False)
        precision_score=precision(preds,data.y)

        recall = Recall(num_classes=1,multiclass=False)
        recall_score=recall(preds,data.y)

        return loss, acc, f1_score,precision_score, recall_score,preds


    def configure_optimizers(self):
        learning_rate = self.learning_rate
        
        if self.optimizer_name == 'SGD':
            optimizer = optim.SGD(self.parameters(),lr=learning_rate)

        elif self.optimizer_name == 'NAdam':
            optimizer = optim.NAdam(self.parameters(), lr=learning_rate)   
        
        elif self.optimizer_name == 'Adam':
            optimizer = optim.Adam(self.parameters(), lr=learning_rate) 
        

        return optimizer


    def training_step(self, batch, batch_idx):
        loss, acc, _,_,_,_ = self.forward(batch, mode="train")
        self.log('train_loss', loss,on_epoch=True,logger=True,batch_size=64)
        self.log('train_acc', acc,on_epoch=True,logger=True,batch_size=64)
        return loss


    def validation_step(self, batch, batch_idx):
        loss, acc, _,_,_,_ = self.forward(batch, mode="val")
        self.log('val_acc', acc,on_epoch=True,logger=True,batch_size=64)
        self.log('val_loss', loss,on_epoch=True,logger=True,batch_size=64)


    def test_step(self, batch, batch_idx):
        loss,acc, f1,precision, recall,preds = self.forward(batch, mode="test")
        self.log('test_acc', acc,on_epoch=True,logger=True,batch_size=64)
        self.log('test_f1', f1,on_epoch=True,logger=True,batch_size=64)
        self.log('test_precision', precision,on_epoch=True,logger=True,batch_size=64)       
        self.log('test_recall', recall,on_epoch=True,logger=True,batch_size=64)



scheduler_asha = ASHAScheduler(
    max_t=100,
    grace_period=1,
    mode="min",
    metric='val_loss',
    reduction_factor=2,
)

from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback

tune_report_callback = TuneReportCheckpointCallback(
    metrics={
    "val_loss": "val_loss",
    "val_acc": "val_acc",
    },
    filename="ray_ckpt",
    on="validation_end",
)


config = {
            "c_hidden": tune.choice([32,64,128,256]),
            "dp_rate_linear":tune.uniform(0.4,0.8),
            "num_layers":tune.randint(3, 5),
            "activation_function":tune.choice(['nn.ReLU(inplace=True)','nn.LeakyReLU(inplace=True)','nn.Sigmoid()','nn.Tanh()']),#,'nn.LeakyReLU(inplace=True)','nn.Sigmoid()','nn.Tanh()']),
            "optimizer_name" : tune.choice(['SGD','NAdam', 'Adam']),
            "learning_rate":tune.uniform(0.01,0.1),
               }



def run_with_tune(config, graph_train_loader=graph_train_loader,graph_val_loader=graph_val_loader,graph_data_dir=None, epochs=5, gpus=0, trained_weights=None, num_layers=4, activation_function='nn.Tanh()',optimizer_name='Adam', learning_rate=0.01,c_in=5,c_out=1,c_hidden=64,dp_rate_linear=0.01,**model_kwargs):
    model = GraphLevelGNN(c_in = c_in,
                          c_out= c_out,
                          c_hidden=c_hidden,
                          dp_rate_linear=dp_rate_linear,
                          num_layers=num_layers,
                          activation_function=activation_function,
                          optimizer_name=optimizer_name,
                          learning_rate=learning_rate)

    trainer = pl.Trainer(
        max_epochs=5,
        #gpus=1,
        default_root_dir="/home/pytorch_test/ray_logs",  # path for saving logs
        callbacks=[
            tune_report_callback,
        ],
    )

    trainer.fit(model,graph_train_loader,graph_val_loader)



reporter = CLIReporter(parameter_columns=['c_hidden','dp_rate_linear','num_layers','activation_function','optimizer_name','learning_rate'],metric_columns=["val_loss", "val_acc", "training_iteration"])


result = tune.run(
    tune.with_parameters(
        run_with_tune,
        data_dir="/home/pytorch_test/",
        epochs=5,
        #gpus=1
        ),

    resources_per_trial={
        "cpu": 1,
        #"gpu": 0,
    },

    local_dir="/home/pytorch_test/ray_ckpt",  # path for saving checkpoints
    config=config,
    num_samples=20,
    scheduler=scheduler_asha,
    progress_reporter=reporter,
    name="test",
)

Which combines pytorch lightning and ray tune.

When I run the pytorch lightning part alone, training will run as expected, and finish consistently as expected.

When I run ray tune on a small test network, it will run and finish consistently, as expected.

When I try to merge the two, as in this script, ray tune will (1) only complete a small amount of trials (n=30 max) and (2) will not run consistently, i.e. sometimes it will finish, and sometimes, it will stall, usually with very few runs left to go. e.g. if I set num_samples = 30, it will stall and just saying ‘28 terminated, 2 running’ indefinitely, or if I try num_samples=200, it will stall at 192-196, and the last 4-8 will have a status of ‘running’ indefinitely (200 samples will never finish, <30 will sometimes finish and sometimes now).

I’ve been stuck on this for weeks at this point, I’ve tried changing the data set, adding in ray_init() memory conditions, commented out whatever I can to see if it helps, implemented ray tune two different ways (without hyperopt search, and the way above). I’m totally stuck, if anyone could help I’d appreciate it.

@SlowatKela1 what version of Pytorch Lightning are you using?

If this is PTL 1.7+, then it is very likely due to this issue: [Tune] Torch Lightning example hangs · Issue #28666 · ray-project/ray · GitHub.

This has been fixed in the nightly version of Ray. For Ray 2.0 or prior, you can use this workaround: [Tune] Torch Lightning example hangs · Issue #28666 · ray-project/ray · GitHub