Improper 'run' - not string nor trainable

Traceback (most recent call last):
  File "Tune_DT_PB2.py", line 159, in <module>
    analysis = run(
  File "/data/user/tmamidi/.conda/envs/training/lib/python3.8/site-packages/ray/tune/tune.py", line 304, in run
    experiments[i] = Experiment(
  File "/data/user/tmamidi/.conda/envs/training/lib/python3.8/site-packages/ray/tune/experiment.py", line 149, in __init__
    self._run_identifier = Experiment.register_if_needed(run)
  File "/data/user/tmamidi/.conda/envs/training/lib/python3.8/site-packages/ray/tune/experiment.py", line 297, in register_if_needed
    raise TuneError("Improper 'run' - not string nor trainable.")
ray.tune.error.TuneError: Improper 'run' - not string nor trainable.

I’m using this in my script -

variants = ['non_snv']#,'snv_protein_coding'] #'non_snv','snv',
for var in variants:
    start = time.perf_counter()
    if not os.path.exists('tuning/'+var):
        os.makedirs('./tuning/'+var)
    output = f"tuning/{var}/RandomForestClassifier_{var}_.csv"
    print('Working with '+var+' dataset...', file=open(output, "w"))
    print('Working with '+var+' dataset...')
    #X_train, X_test, Y_train, Y_test, feature_names = ray.get(data_parsing.remote(var,config_dict,output))
    X_train, X_test, Y_train, Y_test, feature_names = data_parsing(var,config_dict,output)
    #objective = RF_PB2(X_train, X_test, Y_train, Y_test)
    analysis = run(
        RF_PB2(X_train, X_test, Y_train, Y_test),
        name="RandomForestClassifier_PB2",
        verbose=0,
        scheduler=pbt,
        reuse_actors=True,
        #resources_per_trial={
        ##    "cpu": 1,
        #    "gpu": 1
        #},
        #global_checkpoint_period=np.inf,   # Do not save checkpoints based on time interval
        checkpoint_freq = 20,        # Save checkpoint every time the checkpoint_score_attr improves
        checkpoint_at_end = True,   
        keep_checkpoints_num = 2,   # Keep only the best checkpoint
        checkpoint_score_attr = 'mean_accuracy', # Metric used to compare checkpoints
        metric="mean_accuracy",
        mode="max",
        stop={
            "training_iteration": 50,
        },
        num_samples=10,
        fail_fast=True,
        queue_trials=True,
        config={ #https://www.geeksforgeeks.org/hyperparameters-of-random-forest-classifier/
            "n_estimators" : tune.randint(50, 200),
            "min_samples_split" : tune.randint(2, 6),
            "min_samples_leaf" : tune.randint(1, 4),
            "max_features" : tune.choice(["sqrt", "log2"])
    })

You need to pass just the function and what it returns to tune.run - so without the brackets and its contents, just the function name itself. Of course you will need to pass the arguments through some other way, possibly through config.

That makes sense. I’ll try to use config and see if it works.
Thanks! I’ll keep you updated :slight_smile:

Update - Looks like it is working. Thanks!

So, when I use run in a loop, it gives me an error when running for the 2nd time.
ValueError: You passed a `metric` or `mode` argument to `tune.run()`, but the scheduler you are using was already instantiated with their own `metric` and `mode` parameters. Either remove the arguments from your scheduler or from your call to `tune.run()

I’m using metric and mode in the scheduler and not in the run function.
Please help!

Here is my code -

pbt = PB2(
        time_attr="training_iteration",
        #metric="mean_accuracy",
        #mode="max",
        perturbation_interval=20,
        #resample_probability=0.25,
        quantile_fraction=0.25,  # copy bottom % with top %
        log_config=True,
        # Specifies the search space for these hyperparams
        hyperparam_bounds={
            "n_estimators" : [50, 200],
            "min_samples_split" : [2, 6],
            "min_samples_leaf" : [1, 4]})
        
 variants = ['non_snv','snv','snv_protein_coding']
 for var in variants:
            start = time.perf_counter()
            if not os.path.exists('tuning/'+var):
                os.makedirs('./tuning/'+var)
            output = f"tuning/{var}/RandomForestClassifier_{var}_.csv"
            print('Working with '+var+' dataset...', file=open(output, "w"))
            print('Working with '+var+' dataset...')
            analysis = run(
                RF_PB2,
                name=f"RandomForestClassifier_PB2_{var}",
                verbose=0,
                scheduler=pbt,
                reuse_actors=True,
                #resources_per_trial={
                ##    "cpu": 1,
                #    "gpu": 1
                #},
                #global_checkpoint_period=np.inf,   # Do not save checkpoints based on time interval
                checkpoint_freq = 20,        # Save checkpoint every time the checkpoint_score_attr improves
                checkpoint_at_end = True,   
                keep_checkpoints_num = 2,   # Keep only the best checkpoint
                checkpoint_score_attr = 'mean_accuracy', # Metric used to compare checkpoints
                metric="mean_accuracy",
                mode="max",
                stop={
                    "training_iteration": 10,
                },
                num_samples=10,
                fail_fast=True,
                queue_trials=True,
                config={ #https://www.geeksforgeeks.org/hyperparameters-of-random-forest-classifier/
                    "var": var,
                    "n_estimators" : tune.randint(50, 200),
                    "min_samples_split" : tune.randint(2, 6),
                    "min_samples_leaf" : tune.randint(1, 4),
                    "criterion" : tune.choice(["gini", "entropy"]),
                    "max_features" : tune.choice(["sqrt", "log2"]),
                    "class_weight" : tune.choice(["None", "balanced", "balanced_subsample"])
            })

Hi @tkmamidi, the error comes up because you try to re-use the same PB2 scheduler you’re using in the first runs. In most cases you wouldn’t want to do that, as the scheduler retains state about the trials and trial performances.

Can you just re-create the pbt object in the loop as well?

By the way, why are you running tune in a loop? (What is your goal?)

I want to create 3 different models for 3 datasets. Here’s the whole script

import numpy as np
import pandas as pd
import ray
import time
import argparse
import pickle
import yaml
from ray import tune
from ray.tune import Trainable, run
from ray.tune.schedulers.pb2 import PB2
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_validate
from sklearn.preprocessing import label_binarize
from sklearn.metrics import precision_score, roc_auc_score, accuracy_score, confusion_matrix, recall_score
from sklearn.ensemble import RandomForestClassifier
import os
import gc
import shap
from joblib import dump, load
import matplotlib.pyplot as plt
import warnings
warnings.simplefilter("ignore")

TUNE_STATE_REFRESH_PERIOD = 10  # Refresh resources every 10 s

class RF_PB2(Trainable):  #https://docs.ray.io/en/master/tune/examples/pbt_tune_cifar10_with_keras.html
    def _read_data(self, config):
        os.chdir('/data/project/worthey_lab/projects/experimental_pipelines/tarun/ditto/data/processed/')
        with open("../../configs/columns_config.yaml") as fh:
            config_dict = yaml.safe_load(fh)
        var = config.get("var")
        x_train = pd.read_csv(f'train_{var}/merged_data-train_{var}.csv')
        #var = X_train[config_dict['ML_VAR']]
        x_train = x_train.drop(config_dict['ML_VAR'], axis=1)
        feature_names = x_train.columns.tolist()
        x_train = x_train.values
        y_train = pd.read_csv(f'train_{var}/merged_data-y-train_{var}.csv')
        #Y_train = pd.get_dummies(Y_train)
        y_train = label_binarize(y_train.values, classes=['low_impact', 'high_impact']) 

        x_test = pd.read_csv(f'test_{var}/merged_data-test_{var}.csv')
        #var = X_test[config_dict['ML_VAR']]
        x_test = x_test.drop(config_dict['ML_VAR'], axis=1)
        #feature_names = X_test.columns.tolist()
        x_test = x_test.values
        y_test = pd.read_csv(f'test_{var}/merged_data-y-test_{var}.csv')
        print('Data Loaded!')
        #Y_test = pd.get_dummies(Y_test)
        y_test = label_binarize(y_test.values, classes=['low_impact', 'high_impact']) 
        #print(f'Shape: {Y_test.shape}')
        return x_train, x_test, y_train, y_test, feature_names

    def setup(self, config):
        self.x_train, self.x_test, self.y_train, self.y_test, self.feature_names = self._read_data(config)
        model = RandomForestClassifier(random_state=42, n_estimators=self.config.get("n_estimators", 100), min_samples_split=self.config.get("min_samples_split",2), min_samples_leaf=self.config.get("min_samples_leaf",1), max_features=self.config.get("max_features","sqrt"), n_jobs = -1)
        #model = RandomForestClassifier(config)
        self.model = model

    def reset_config(self, new_config):
        self.n_estimators = new_config["n_estimators"]
        self.min_samples_split = new_config["min_samples_split"]
        self.min_samples_leaf = new_config["min_samples_leaf"]
        self.max_features = new_config["max_features"]
        self.config = new_config
        return True

    def step(self):
        self.model.fit(self.x_train, self.y_train)
        y_score = self.model.predict(self.x_test)
        accuracy = accuracy_score(self.y_test, y_score)
        #print(accuracy)
        return {"mean_accuracy": accuracy}

    def save_checkpoint(self, checkpoint_dir):
        file_path = checkpoint_dir + "/model"
        pickle.dump(self.model, open(file_path, 'wb'))
        return file_path

    def load_checkpoint(self, path):
        # See https://stackoverflow.com/a/42763323
        del self.model
        self.model = pickle.load(open(path,'rb'))

    def cleanup(self):
        # If need, save your model when exit.
        # saved_path = self.model.save(self.logdir)
        # print("save model at: ", saved_path)
        pass

    def results(self,config,var, output):
        start1 = time.perf_counter()
        self.x_train, self.x_test, self.y_train, self.y_test, self.feature_names = self._read_data(config)
        clf = RandomForestClassifier(random_state=42, n_estimators=config["n_estimators"], min_samples_split=config["min_samples_split"], min_samples_leaf=config["min_samples_leaf"], max_features=config["max_features"], n_jobs = -1)
        score = cross_validate(clf, self.x_train, self.y_train, cv=10, return_train_score=True, return_estimator=True, n_jobs=-1, verbose=0)
        clf = score['estimator'][np.argmax(score['test_score'])]
        training_score = np.mean(score['train_score'])
        testing_score = np.mean(score['test_score'])
        y_score = clf.predict(self.x_test)
        prc = precision_score(self.y_test,y_score, average="weighted")
        recall  = recall_score(self.y_test,y_score, average="weighted")
        roc_auc = roc_auc_score(self.y_test, y_score)
        accuracy = accuracy_score(self.y_test, y_score)
        matrix = confusion_matrix(self.y_test, y_score)
        finish = (time.perf_counter()-start1)/60
        print(f'RandomForestClassifier: \nCross_validate(avg_train_score): {training_score}\nCross_validate(avg_test_score): {testing_score}\nPrecision: {prc}\nRecall: {recall}\nROC_AUC: {roc_auc}\nAccuracy: {accuracy}\nTime(in min): {finish}\nConfusion Matrix:\n{matrix}', file=open(output, "a"))
        # explain all the predictions in the test set
        background = shap.kmeans(self.x_train, 10)
        explainer = shap.KernelExplainer(clf.predict, background)
        with open(f"./tuning/{var}/RandomForestClassifier_{var}.joblib", 'wb') as f:
         dump(clf, f, compress='lz4')
        del clf, self.x_train, self.y_train, background
        background = self.x_test[np.random.choice(self.x_test.shape[0], 1000, replace=False)]
        shap_values = explainer.shap_values(background)
        plt.figure()
        shap.summary_plot(shap_values, background, self.feature_names, show=False)
        plt.savefig(f"./tuning/{var}/RandomForestClassifier_{var}_features.pdf", format='pdf', dpi=1000, bbox_inches='tight')
        return None

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--smoke-test", action="store_true", help="Finish quickly for testing")
    args, _ = parser.parse_known_args()
    if args.smoke_test:
        ray.init(num_cpus=2)  # force pausing to happen for test
    else:
        ray.init(ignore_reinit_error=True)
    
    os.chdir('/data/project/worthey_lab/projects/experimental_pipelines/tarun/ditto/data/processed/')
    

    pbt = PB2(
        time_attr="training_iteration",
        #metric="mean_accuracy",
        #mode="max",
        perturbation_interval=20,
        #resample_probability=0.25,
        quantile_fraction=0.25,  # copy bottom % with top %
        log_config=True,
        # Specifies the search space for these hyperparams
        hyperparam_bounds={
            "n_estimators" : [50, 200],
            "min_samples_split" : [2, 6],
            "min_samples_leaf" : [1, 4]})
        
    variants = ['non_snv','snv','snv_protein_coding']
    for var in variants:
        start = time.perf_counter()
        if not os.path.exists('tuning/'+var):
            os.makedirs('./tuning/'+var)
        output = f"tuning/{var}/RandomForestClassifier_{var}_.csv"
        print('Working with '+var+' dataset...', file=open(output, "w"))
        print('Working with '+var+' dataset...')
        analysis = run(
            RF_PB2,
            name=f"RandomForestClassifier_PB2_{var}",
            verbose=0,
            scheduler=pbt,
            reuse_actors=True,
            #resources_per_trial={
            ##    "cpu": 1,
            #    "gpu": 1
            #},
            #global_checkpoint_period=np.inf,   # Do not save checkpoints based on time interval
            checkpoint_freq = 20,        # Save checkpoint every time the checkpoint_score_attr improves
            checkpoint_at_end = True,   
            keep_checkpoints_num = 2,   # Keep only the best checkpoint
            checkpoint_score_attr = 'mean_accuracy', # Metric used to compare checkpoints
            metric="mean_accuracy",
            mode="max",
            stop={
                "training_iteration": 100,
            },
            num_samples=10,
            fail_fast=True,
            queue_trials=True,
            config={ #https://www.geeksforgeeks.org/hyperparameters-of-random-forest-classifier/
                "var": var,
                "n_estimators" : tune.randint(50, 200),
                "min_samples_split" : tune.randint(2, 6),
                "min_samples_leaf" : tune.randint(1, 4),
                "criterion" : tune.choice(["gini", "entropy"]),
                "max_features" : tune.choice(["sqrt", "log2"]),
                "class_weight" : tune.choice(["None", "balanced", "balanced_subsample"])
        })
        finish = (time.perf_counter()- start)/120
        #ttime = (finish- start)/120
        print(f'Total time in hrs: {finish}')
        print(f"RandomForestClassifier best hyperparameters found for {var} were:  {analysis.best_config}", file=open(f"tuning/{var}/tuned_parameters_{var}_.csv", "a"))
        RF_PB2(analysis.best_config).results(analysis.best_config, var, output)
        gc.collect()

One more question is that it is loading the data 10 times (num_samples=10). Can I make it load only once and use it for all 10 samples?

Hi @tkmamidi, did you try to move the PB2 call into the loop? That seems to be the right thing to do here (it doesn’t make sense to re-use the PB2 instance over different datasets/models).

I.e.

# ...
for var in variants:
    pbt = PB2(
        time_attr="training_iteration",
        # ...
    )
    start = time.perf_counter()
    # ...

For loading data normally you’ll want to use tune.with_parameters() (see here and here in the documentation). But this currently works only with the function API. As a workaround, you can wrap your trainable, e.g. like this (this follows the idea of tune.with_parameters():

class RF_PB2(Trainable):
    def setup(self, config, x_train, x_test, y_train, y_test):
        self.x_train = x_train
        self.x_test = x_test
        self.y_train = y_train
        self.y_test = y_test
        self.model = RandomForestClassifier(random_state=42, n_estimators=self.config.get("n_estimators", 100), min_samples_split=self.config.get("min_samples_split",2), min_samples_leaf=self.config.get("min_samples_leaf",1), max_features=self.config.get("max_features","sqrt"), n_jobs = -1)
    
    # ....

# ....
def wrap_trainable(trainable, x_train, x_test, y_train, y_test):
    x_train_id = ray.put(x_train)
    x_test_id = ray.put(x_test)
    y_train_id = ray.put(y_train)
    y_test_id = ray.put(y_test)

    class _Wrapped(trainable):
        def setup(self, config):
            x_train = ray.get(x_train_id)
            x_test = ray.get(x_test_id)
            y_train = ray.get(y_train_id)
            y_test = ray.get(y_test_id)

            super(_Wrapped, self).setup(config,x_train, x_test, y_train, y_test)

    return _Wrapped

# ...
x_train, x_test, y_train, y_test = load_data(...)

ray.init()

tune.run(
    wrap_trainable(RF_PB2, x_train, x_test, y_train, y_test),
    # ...
)

Actually, this seems like a great extension for tune.with_parameters - I might file a PR soon that enables this for class trainables without the workaround.

This is perfect.
Thanks!