High: It blocks me to complete my task.
I am trying to train a Tensorflow model using Ray train. I want to use Ray dataset sharding to implement Distributed Data Ingest with Ray Datasets.
My Training data is in following format.
X_train is 3-dimensional data numpy array of shape (512, 10, 200) and y_train is target variable numpy array of shape (512,)
I have referred ray documentations (link1 , link2 (Tensorflow)) and refactored the code for Ray dataset sharding as below to generate Dataset generator object suitable for my use-case.
def train_func(config):
CONF, lstm_params= config.values()
start_epoch = 0
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
with strategy.scope():
model = lstm_model() # returns a sequential model
checkpoint = session.get_checkpoint()
if checkpoint:
# assume that we have run the session.report() example
# and successfully save some model weights
checkpoint_dict = checkpoint.to_dict()
model.set_weights(checkpoint_dict.get("model_weights"))
start_epoch = checkpoint_dict.get("epoch", -1) + 1
model.compile(loss=CONF['lstm_params']['loss'],
optimizer=CONF['lstm_params']['optimizer'],
metrics=le(CONF['lstm_params']['metrics']))
dataset_shard = session.get_dataset_shard("train")
def to_tf_dataset(dataset, batch_size):
def to_tensor_iterator():
for batch in dataset.iter_tf_batches(batch_size=batch_size, dtypes=tf.float32):
yield tf.expand_dims(batch["x"], 10, 200), batch["y"]
output_signature = (
tf.TensorSpec(shape=(None,10,200), dtype=tf.float32),
tf.TensorSpec(shape=(None,), dtype=tf.float32),)
tf_dataset = tf.data.Dataset.from_generator(
to_tensor_iterator, output_signature=output_signature
)
return prepare_dataset_shard(tf_dataset)
for epoch in range(start_epoch, lstm_params['epochs']):
tf_dataset = to_tf_dataset(dataset=dataset_shard, batch_size= 64)
model.fit(tf_dataset, shuffle=True)
checkpoint = Checkpoint.from_dict(dict(epoch=epoch, model_weights=model.get_weights()))
session.report({}, checkpoint=checkpoint)
config = {...} # hyperparameters for LSTM model
checkpoint_config = CheckpointConfig(checkpoint_score_attribute="accuracy", checkpoint_score_order="max")
X_train, y_train = read_input_data() #function returns training data numpy arrays
dataset = ray.data.from_items([{"x": X_train[index,:,:], "y": y_train[index]} for index in range(y_train.shape[0])])
trainer = TensorflowTrainer(train_func, train_loop_config=config,
scaling_config=ScalingConfig(num_workers=2),
run_config=RunConfig(checkpoint_config=checkpoint_config),
datasets={"train": dataset})
result = trainer.fit()
But when I pass this dataset shard object to model.fit(), I couldn’t get desired execution
The model gets train on None Steps and then the execution fails. Please refer below output logs.
(RayTrainWorker pid=8089) Train on None steps
(RayTrainWorker pid=8088) Train on None steps
can you please help me understand what am I missing here and point out the mistake if any?
Also it would be helpful if you can provide some documentation links which can be referred for this.