PB2 and tensorflow: how to fix unexpected kwarg 'config'?

I’m trying to use the PB2 scheduler with a TensorFlow neural network, with the subclassing-Trainable API. Currently I’m trying to get it to work in a dummy case, on my local machine.

I’m getting the following error, and I’m not sure why. I have looked at the flatten_args function but I don’t understand what’s going on before that. I can’t find any bug reports or other issues on this.

I have looked through examples in the Ray docs, including: here and here.

Any help would be greatly appreciated.

Error:

Traceback (most recent call last):
  File "/wd/venv2/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 508, in start_trial
    return self._start_trial(trial)
  File "/wd/venv2/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 414, in _start_trial
    runner = self._setup_remote_runner(trial)
  File "/wd/venv2/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 355, in _setup_remote_runner
    return full_actor_class.remote(**kwargs)
  File "/wd/venv2/lib/python3.8/site-packages/ray/actor.py", line 610, in remote
    return actor_cls._remote(
  File "/wd/venv2/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 383, in _invocation_actor_class_remote_span
    return method(self, args, kwargs, *_args, **_kwargs)
  File "/wd/venv2/lib/python3.8/site-packages/ray/actor.py", line 891, in _remote
    creation_args = signature.flatten_args(function_signature, args, kwargs)
  File "/wd/venv2/lib/python3.8/site-packages/ray/_private/signature.py", line 114, in flatten_args
    raise TypeError(str(exc)) from None
TypeError: got an unexpected keyword argument 'config'

Code:

import ray
from ray import tune
from ray.tune import Trainable
from ray.tune.schedulers.pb2 import PB2
import numpy as np
from tensorflow import keras
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.utils import shuffle


ray.init()


class MyTrainable(Trainable):
    def __init__(self) -> None:
        super().__init__()
        self.load_data()
        self.epochs_per_step = 1
        self.total_n_examples = int(1e3)
        self.train_perc = 0.6
        self.val_perc = 0.2
        self.train_and_val_perc = self.train_perc + self.val_perc
        self.n_train = int(self.total_n_examples * self.train_perc)
        self.n_train_and_val = int(self.total_n_examples * self.train_and_val_perc)
        self.tf_steps_per_epoch = 3

    def load_data(self):
        """Generates a dummy dataset from the sine function."""

        # make data
        input_len = 3
        self.target_len = 1
        c = input_len + self.target_len
        n_points = c * self.total_n_examples

        data = np.sin(np.linspace(0, np.pi * 2, n_points)).reshape((-1, c))
        data = shuffle(data, random_state=42)
        x = data[:, :input_len]
        y = data[:, input_len:]
        self.x_train = x[: self.n_train]
        self.y_train = y[: self.n_train]
        self.x_val = x[self.n_train : self.n_train_and_val]
        self.y_val = y[self.n_train : self.n_train_and_val]
        self.x_test = x[self.n_train_and_val :]
        self.y_test = y[self.n_train_and_val :]

    def setup(self, config):
        "invoked once training starts"

        # unpack config
        self.units = config.get("units")
        self.lr = config.get("lr")

        # shuffle data
        # This allows all data to be used, but does introduce some redundancy as
        # some examples will be used for training twice before others have been
        # used once. To get around this, the model.fit process would have to use
        # a data feeder (tf dataset or other) that is preserved even when the
        # evaluation happens.
        self.x_train, self.y_train = shuffle(self.x_train, self.y_train)

        # build and compile the model
        self.model = self.build_model()
        optimizer = Adam(learning_rate=self.lr)
        self.model.compile(loss="mse", optimizer=optimizer)

    def reset_config(self, new_config):
        self.setup(new_config)

    def build_model(self):
        inputs = keras.Input(shape=(3, 1))
        x = Dense(units=self.units, activation="relu")(inputs)
        x = Dense(units=self.units, activation="relu")(x)
        x = Dense(units=self.units, activation="relu")(x)
        x = Dense(units=self.units, activation="relu")(x)
        outputs = Dense(units=self.target_len, activation="tanh")(x)
        model = keras.Model(inputs, outputs)
        return model

    def step(self,):

        # train
        fit_result = self.model.fit(
            self.x_train,
            self.y_train,
            validation_data=(self.x_val, self.y_val),
            epochs=self.epochs_per_step,
            steps_per_epoch=self.tf_steps_per_epoch,
        )

        # report
        final_val_loss = fit_result.history["val_loss"][-1]
        result_dict = {
            "val_loss": final_val_loss,
            # "training_iteration": self.epochs_per_step, #TODO
        }
        return result_dict

    def save_checkpoint(self, checkpoint_dir):
        """Must entirely save model to checkpoint dir."""
        fp = Path(checkpoint_dir) / "model"
        self.model.save(fp)

        return fp

    def load_checkpoint(self, path):
        """Must entirely load self.model from the checkpoint"""
        del self.model
        self.model = load_model(path)


pert_space = {"units": [5, 50], "lr": [0.0001, 0.01]}

pb2 = PB2(
    time_attr="training_iteration",
    metric="val_loss",
    mode="min",
    perturbation_interval=3,
    hyperparam_bounds=pert_space,
    quantile_fraction=0.25,
)

num_samples = 8  # Number of times to sample from the sample space
init_config = {"units": 5, "lr": 0.01}

analysis = tune.run(
    MyTrainable,
    config=init_config,
    num_samples=num_samples,
    scheduler=pb2,
    local_dir=model_dir_path,  #! Set as your own
    fail_fast="raise",
    # reuse_actors=True,
    name="test_name",
)

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Could you try -

import ray
from ray import tune
from ray.tune import Trainable
from ray.tune.schedulers.pb2 import PB2
import numpy as np
from tensorflow import keras
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.utils import shuffle


ray.init()


class MyTrainable(Trainable):

    def load_data(self):
        """Generates a dummy dataset from the sine function."""

        # make data
        input_len = 3
        self.target_len = 1
        c = input_len + self.target_len
        n_points = c * self.total_n_examples

        data = np.sin(np.linspace(0, np.pi * 2, n_points)).reshape((-1, c))
        data = shuffle(data, random_state=42)
        x = data[:, :input_len]
        y = data[:, input_len:]
        self.x_train = x[: self.n_train]
        self.y_train = y[: self.n_train]
        self.x_val = x[self.n_train : self.n_train_and_val]
        self.y_val = y[self.n_train : self.n_train_and_val]
        self.x_test = x[self.n_train_and_val :]
        self.y_test = y[self.n_train_and_val :]

    def setup(self, config):
        "invoked once training starts"

        self.load_data()
        self.epochs_per_step = 1
        self.total_n_examples = int(1e3)
        self.train_perc = 0.6
        self.val_perc = 0.2
        self.train_and_val_perc = self.train_perc + self.val_perc
        self.n_train = int(self.total_n_examples * self.train_perc)
        self.n_train_and_val = int(self.total_n_examples * self.train_and_val_perc)
        self.tf_steps_per_epoch = 3

        # unpack config
        self.units = config.get("units")
        self.lr = config.get("lr")

        # shuffle data
        # This allows all data to be used, but does introduce some redundancy as
        # some examples will be used for training twice before others have been
        # used once. To get around this, the model.fit process would have to use
        # a data feeder (tf dataset or other) that is preserved even when the
        # evaluation happens.
        self.x_train, self.y_train = shuffle(self.x_train, self.y_train)

        # build and compile the model
        self.model = self.build_model()
        optimizer = Adam(learning_rate=self.lr)
        self.model.compile(loss="mse", optimizer=optimizer)

    def reset_config(self, new_config):
        self.setup(new_config)

    def build_model(self):
        inputs = keras.Input(shape=(3, 1))
        x = Dense(units=self.units, activation="relu")(inputs)
        x = Dense(units=self.units, activation="relu")(x)
        x = Dense(units=self.units, activation="relu")(x)
        x = Dense(units=self.units, activation="relu")(x)
        outputs = Dense(units=self.target_len, activation="tanh")(x)
        model = keras.Model(inputs, outputs)
        return model

    def step(self,):

        # train
        fit_result = self.model.fit(
            self.x_train,
            self.y_train,
            validation_data=(self.x_val, self.y_val),
            epochs=self.epochs_per_step,
            steps_per_epoch=self.tf_steps_per_epoch,
        )

        # report
        final_val_loss = fit_result.history["val_loss"][-1]
        result_dict = {
            "val_loss": final_val_loss,
            # "training_iteration": self.epochs_per_step, #TODO
        }
        return result_dict

    def save_checkpoint(self, checkpoint_dir):
        """Must entirely save model to checkpoint dir."""
        fp = Path(checkpoint_dir) / "model"
        self.model.save(fp)

        return fp

    def load_checkpoint(self, path):
        """Must entirely load self.model from the checkpoint"""
        del self.model
        self.model = load_model(path)


pert_space = {"units": [5, 50], "lr": [0.0001, 0.01]}

pb2 = PB2(
    time_attr="training_iteration",
    metric="val_loss",
    mode="min",
    perturbation_interval=3,
    hyperparam_bounds=pert_space,
    quantile_fraction=0.25,
)

num_samples = 8  # Number of times to sample from the sample space
init_config = {"units": 5, "lr": 0.01}

analysis = tune.run(
    MyTrainable,
    config=init_config,
    num_samples=1,
    scheduler=pb2,
    fail_fast="raise",
    # reuse_actors=True,
    name="test_name",
)
1 Like

Thanks very much @xwjiang2010 !

This seems to have fixed it - I am no longer getting the above mentioned TypeError. Big relief. (I have moved onto another error - the raylet cannot find the external hard drive I’m running the code from.)

The lessons to learn from this seem to be:

  1. Don’t have an __init__() when subclassing Trainable.
  2. set num_samples=1 in tune.run(...).

Please could you explain why setting num_samples=8 would cause a problem? Does it affect the PBT-based schedulers? (I had assumed that setting num_samples=8 would limit the number of PBT workers that could be started.)

Oh num_samples is accidentally changed by me during debug. You can change it back and shouldn’t cause any issue. Sorry about the confusion.

Yeah it’s generally better to implement setup() rather than __init__ when subclassing Trainable.

As for the external hard drive issue, it may be worthwhile to create a new thread.