High: It blocks me to complete my task.
I am trying to train a Tensorflow model using Ray train. Refer to the code below.
def train_func(config):
CONF, lstm_params= config.values()
start_epoch = 0
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
model = lstm_model() # returns a sequential model
checkpoint = session.get_checkpoint()
if checkpoint:
# assume that we have run the session.report() example
# and successfully save some model weights
checkpoint_dict = checkpoint.to_dict()
model.set_weights(checkpoint_dict.get("model_weights"))
start_epoch = checkpoint_dict.get("epoch", -1) + 1
model.compile(loss=CONF['lstm_params']['loss'],
optimizer=CONF['lstm_params']['optimizer'],
metrics=le(CONF['lstm_params']['metrics']))
dataset_shard = session.get_dataset_shard("train")
def to_tf_dataset(dataset, batch_size):
def to_tensor_iterator():
for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32):
yield batch["x"], batch["y"]
output_signature = (
tf.TensorSpec(shape=(None), dtype=tf.float32),
tf.TensorSpec(shape=(None), dtype=tf.float32),)
tf_dataset = tf.data.Dataset.from_generator(
to_tensor_iterator, output_signature=output_signature
)
return prepare_dataset_shard(tf_dataset)
for epoch in range(start_epoch, lstm_params['epochs']):
tf_dataset = to_tf_dataset(dataset=dataset_shard, batch_size=lstm_params['batch_size'])
model.fit(tf_dataset, shuffle=lstm_params['shuffle'])
checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model_weights=model.get_weights()))
session.report({}, checkpoint=checkpoint)
config = {...} # hyperparameters for LSTM model
checkpoint_config = CheckpointConfig(checkpoint_score_attribute="accuracy", checkpoint_score_order="max")
X_train, y_train = read_input_data() #function returns training data numpy arrays
dataset = ray.data.from_items([{"x": X_train[index,:,:], "y": y_train[index]} for index in range(y_train.shape[0])])
trainer = TensorflowTrainer(train_func, train_loop_config=config,
scaling_config=ScalingConfig(num_workers=2),
run_config=RunConfig(checkpoint_config=checkpoint_config),
datasets={"train": dataset})
result = trainer.fit()
I am running it on Google Colab, where I have 2 CPUs available and I am initializing the the Tensorflow Trainer with 2 workers only.
Still, ray trainer is looking for 3 CPU’s and throwing below warning at trainer.fit() execution.
WARNING insufficient_resources_manager.py:128 -- Ignore this message if the cluster is autoscaling. You asked for 3.0 cpu and 0 gpu per trial, but the cluster only has 2.0 cpu and 0 gpu. Stop the tuning job and adjust the resources requested per trial (possibly via `resources_per_trial` or via `num_workers` for rllib) and/or add more resources to your Ray runtime.
It throws above warning multiple times and after some re-trials execution fails.
Can you please help me understand and resolve the issue here and let me know if I am missing something