LSTM model is not getting trained on all the input batches using ray train

High: It blocks me to complete my task.

I am trying to train a Tensorflow model using Ray train using. My Input data has 24 batches ( 1536 samples and I have kept batch_size = 64) and running 2 epochs. Refer to the code below.
(Here I am using 2 CPUs for ray distributed training)

def train_func(config):
    CONF, data_path = config.values()
    
    X_train, y_train = read_input_data(data_path)

    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = lstm_model()  # returns a sequential model
        start_epoch = 0

        checkpoint = train.load_checkpoint()
        if checkpoint:
            model.set_weights(checkpoint.get("model_weights"))
            start_epoch = checkpoint.get("epoch", -1) + 1

        model.compile() 
    epochs = 2
    for epoch in range(start_epoch, epochs):
        history = model.fit(X_train, y_train,batch_size=64)
        train.save_checkpoint(epoch=epoch, accuracy=history.history['acc'][0],
                          model_weights=model.get_weights())

config = CONF ## model parameters 


trainer = Trainer(backend="tensorflow", num_workers=num_cpus, logdir=os.getcwd())
trainer.start()
checkpoint_strategy = CheckpointStrategy(num_to_keep=1, checkpoint_score_attribute="accuracy", checkpoint_score_order="max")
trainer.run(train_func, config=config, callbacks=[PrintingCallback()], checkpoint_strategy=checkpoint_strategy,
            checkpoint=trainer.latest_checkpoint)
trainer.shutdown()

From the logs you could see that the model is getting trained only on few of the batches (out of 24). As well as each time, model gets trained on random batches and different no. of batches. Please refer below screenshot of logs.

Logs of 1st and 2nd epochs:

In above screenshot you can see that, in 1st epoch, model is getting trained on 5th,12th,16th,24th batches only and in 2nd epoch on batches [1,9,16,22,24].

Can you please help me understand, if my assumption is correct that some batches are getting skipped while training. If yes, why this is happening?
Also, please let me know I am missing something.

Hi @suraj-gade, the output does not mean that your batches are getting skipped. Rather, processing is so fast (16ms/step!) that the progress bar can’t keep up with the speed. When the progress bar renders the next step, it already processed more than one step and thus skips displaying the intermediate steps.

Please also note that this progress bar is not rendered by Ray, but by Tensorflow/Keras.

If you want to confirm that it trains on every batch, you could try something like this:

print_callback = tf.keras.callbacks.LambdaCallback(
    on_batch_end=lambda batch, logs: print("Trained batch number", batch)
)
history = model.fit(X_train, y_train, batch_size=64, callbacks=[print_callback])

Hi @kai, thanks for your response.
It solved my issue…

I have one follow-up question on this issue.

I am using 2 CPUs for ray distributed training. so from the logs, you can see that we get two progress bars for each batch.
Assumption 1: Each batch is being processed on both the cores fully
Assumption 2: Ray train distribute batch samples equally on each core and as Tensorflow/Keras is rendering the progress bar, it prints the logs 2 times for each batch.

Please, help me understand which assumption is correct?

Keras automatically distributes the data so that each worker only works on one shard. Thus, the workers see different data.

And yes, the progress bar is displayed twice - once for each worker.

Hi @kai, Thanks for your response.

Please help me understand here. As per your response Keras is distributing the data batch on each worker, however I am using Ray here to implement distributed training not Keras multiprocessing.

As per my understanding Ray is to divide different batches on different workers. (which also I have seen while using Pytorch with Ray). So, if I consider my earlier example, I have 24 batches in total and 2 CPUs for distributed training then Ray should distributes 12 batches on each of the worker. which is not evident in this case.

But, from your response I could conclude that each batch out of 24 batches is getting distributed by Keras on each of the worker and each worker sees a shard of that batch. So, doesn’t ray play any part in this?

Please, help me understand what I am missing here?

def train_func(config):
    CONF, data_path = config.values()
    
    X_train, y_train = read_input_data(data_path)

    strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = lstm_model()  # returns a sequential model
        start_epoch = 0

        checkpoint = train.load_checkpoint()
        if checkpoint:
            model.set_weights(checkpoint.get("model_weights"))
            start_epoch = checkpoint.get("epoch", -1) + 1

        model.compile() 
    epochs = 2
    for epoch in range(start_epoch, epochs):
        history = model.fit(X_train, y_train,batch_size=64)
        train.save_checkpoint(epoch=epoch, accuracy=history.history['acc'][0],
                          model_weights=model.get_weights())

config = CONF ## model parameters 
num_cpus = 2

trainer = Trainer(backend="tensorflow", num_workers=num_cpus, logdir=os.getcwd())
trainer.start()
checkpoint_strategy = CheckpointStrategy(num_to_keep=1, checkpoint_score_attribute="accuracy", checkpoint_score_order="max")
trainer.run(train_func, config=config, callbacks=[PrintingCallback()], checkpoint_strategy=checkpoint_strategy,
            checkpoint=trainer.latest_checkpoint)
trainer.shutdown()

So both workers will see 24 batches, but each batch will only comprise half of the data. So the batch_size you set in model.fit is the so call global batch size. Each iteration 64 samples will be handled. However, if you have 2 workers, this means each worker will see 32 samples.

You can refer to the official tensorflow documentation for more details: Entrada Distribuída  |  TensorFlow Core

As for Ray, in you example Ray is used to kick off the different training processes on the workers and nodes of the cluster. Ray Train and Ray Datasets can be used to do this manually, which is e.g. beneficial if you’re doing preprocessing. You can find more details for that here: Configuring Training Datasets — Ray 2.0.0

Generally, for your use case you can use Ray for training, for data processing/distribution, or both. In your example, you’re using Ray only for training, and the data distribution is taken care of by Keras.