I’m trying to use the PB2 scheduler with a TensorFlow neural network, with the subclassing-Trainable API. Currently I’m trying to get it to work in a dummy case, on my local machine.
I’m getting the following error, and I’m not sure why. I have looked at the flatten_args
function but I don’t understand what’s going on before that. I can’t find any bug reports or other issues on this.
I have looked through examples in the Ray docs, including: here and here.
Any help would be greatly appreciated.
Error:
Traceback (most recent call last):
File "/wd/venv2/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 508, in start_trial
return self._start_trial(trial)
File "/wd/venv2/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 414, in _start_trial
runner = self._setup_remote_runner(trial)
File "/wd/venv2/lib/python3.8/site-packages/ray/tune/ray_trial_executor.py", line 355, in _setup_remote_runner
return full_actor_class.remote(**kwargs)
File "/wd/venv2/lib/python3.8/site-packages/ray/actor.py", line 610, in remote
return actor_cls._remote(
File "/wd/venv2/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 383, in _invocation_actor_class_remote_span
return method(self, args, kwargs, *_args, **_kwargs)
File "/wd/venv2/lib/python3.8/site-packages/ray/actor.py", line 891, in _remote
creation_args = signature.flatten_args(function_signature, args, kwargs)
File "/wd/venv2/lib/python3.8/site-packages/ray/_private/signature.py", line 114, in flatten_args
raise TypeError(str(exc)) from None
TypeError: got an unexpected keyword argument 'config'
Code:
import ray
from ray import tune
from ray.tune import Trainable
from ray.tune.schedulers.pb2 import PB2
import numpy as np
from tensorflow import keras
from tensorflow.keras.models import load_model
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
from sklearn.utils import shuffle
ray.init()
class MyTrainable(Trainable):
def __init__(self) -> None:
super().__init__()
self.load_data()
self.epochs_per_step = 1
self.total_n_examples = int(1e3)
self.train_perc = 0.6
self.val_perc = 0.2
self.train_and_val_perc = self.train_perc + self.val_perc
self.n_train = int(self.total_n_examples * self.train_perc)
self.n_train_and_val = int(self.total_n_examples * self.train_and_val_perc)
self.tf_steps_per_epoch = 3
def load_data(self):
"""Generates a dummy dataset from the sine function."""
# make data
input_len = 3
self.target_len = 1
c = input_len + self.target_len
n_points = c * self.total_n_examples
data = np.sin(np.linspace(0, np.pi * 2, n_points)).reshape((-1, c))
data = shuffle(data, random_state=42)
x = data[:, :input_len]
y = data[:, input_len:]
self.x_train = x[: self.n_train]
self.y_train = y[: self.n_train]
self.x_val = x[self.n_train : self.n_train_and_val]
self.y_val = y[self.n_train : self.n_train_and_val]
self.x_test = x[self.n_train_and_val :]
self.y_test = y[self.n_train_and_val :]
def setup(self, config):
"invoked once training starts"
# unpack config
self.units = config.get("units")
self.lr = config.get("lr")
# shuffle data
# This allows all data to be used, but does introduce some redundancy as
# some examples will be used for training twice before others have been
# used once. To get around this, the model.fit process would have to use
# a data feeder (tf dataset or other) that is preserved even when the
# evaluation happens.
self.x_train, self.y_train = shuffle(self.x_train, self.y_train)
# build and compile the model
self.model = self.build_model()
optimizer = Adam(learning_rate=self.lr)
self.model.compile(loss="mse", optimizer=optimizer)
def reset_config(self, new_config):
self.setup(new_config)
def build_model(self):
inputs = keras.Input(shape=(3, 1))
x = Dense(units=self.units, activation="relu")(inputs)
x = Dense(units=self.units, activation="relu")(x)
x = Dense(units=self.units, activation="relu")(x)
x = Dense(units=self.units, activation="relu")(x)
outputs = Dense(units=self.target_len, activation="tanh")(x)
model = keras.Model(inputs, outputs)
return model
def step(self,):
# train
fit_result = self.model.fit(
self.x_train,
self.y_train,
validation_data=(self.x_val, self.y_val),
epochs=self.epochs_per_step,
steps_per_epoch=self.tf_steps_per_epoch,
)
# report
final_val_loss = fit_result.history["val_loss"][-1]
result_dict = {
"val_loss": final_val_loss,
# "training_iteration": self.epochs_per_step, #TODO
}
return result_dict
def save_checkpoint(self, checkpoint_dir):
"""Must entirely save model to checkpoint dir."""
fp = Path(checkpoint_dir) / "model"
self.model.save(fp)
return fp
def load_checkpoint(self, path):
"""Must entirely load self.model from the checkpoint"""
del self.model
self.model = load_model(path)
pert_space = {"units": [5, 50], "lr": [0.0001, 0.01]}
pb2 = PB2(
time_attr="training_iteration",
metric="val_loss",
mode="min",
perturbation_interval=3,
hyperparam_bounds=pert_space,
quantile_fraction=0.25,
)
num_samples = 8 # Number of times to sample from the sample space
init_config = {"units": 5, "lr": 0.01}
analysis = tune.run(
MyTrainable,
config=init_config,
num_samples=num_samples,
scheduler=pb2,
local_dir=model_dir_path, #! Set as your own
fail_fast="raise",
# reuse_actors=True,
name="test_name",
)
How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.