Training issues with MultiworkerMirroredStrategy

1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
High: Completely blocks me.

2. Environment:

  • Ray version: 2.51.1
  • Python version: 3.13.5
  • OS: Linux Mint and OpenSUSE
  • Cloud/Infrastructure: Head on server, two workers on desktops with GPUs
  • Other libs/tools (if relevant): Tensorflow

3. What happened vs. what you expected:

  • Expected: Able to run Tensorflow training with multiworkermirrored
  • Actual: Always crashing with “shape=(64, 32, 32, 3), dtype=float32)>
    }) with an unsupported type (<class ‘tensorflow.python.distribute.values.PerReplica’>) to a Tensor"

I have been trying to run distributed training with Ray Train and Tensorflow using MultiworkerMirroredStrategy to be able to efficiently distribute it to two workers.

I have used these as examples:

But neither one runs on my cluster. I am not able to get either one or any other example with this strategy to run successfully on 2 nodes. With a single node it does run.

Is this somehow deprecated or how come not even the examples work? I’m also trying to understand if I even need it but based on my testing one node is as fast or faster when not using this.

I am using a simple CIFAR10 with CNN for my ray train learning and like I mentioned can’t even get the examples to run without crashing.

Any help would be extremely appreciated.

Ray Train’s distributed TensorFlow integration with MultiWorkerMirroredStrategy is not deprecated, but users have reported issues running official examples across multiple nodes, especially with recent TensorFlow and Keras versions. The error you encountered—“unsupported type (<class ‘tensorflow.python.distribute.values.PerReplica’>) to a Tensor”—is a known TensorFlow issue when handling distributed datasets, often related to how data is sharded or how the dataset is constructed in a distributed context. This problem does not occur on a single node, which matches your experience. According to the Ray distributed TensorFlow guide, Ray should handle TF_CONFIG and worker setup, but compatibility issues with newer TensorFlow/Keras versions (e.g., Keras 3.x) can break these examples, as discussed in community threads.

Additionally, distributed training with Ray Train and TensorFlow is only beneficial for large datasets or when scaling across multiple GPUs/nodes outweighs the communication overhead. For small datasets or single-node setups, distributed training can be slower or unnecessary, as noted in user discussions. If you are using recent TensorFlow/Keras versions, you may need to set os.environ["TF_USE_LEGACY_KERAS"] = "1" as a workaround, but this is not ideal for all users. Would you like a step-by-step breakdown of how to debug or adapt your code for multi-node distributed training with Ray and TensorFlow?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM

Thank you for reply and help, we did just find similar mentions of newer TensorFlow and Keras issues relating to Multiworker.

We are just trying a simple dataset to get Ray Train running before using it for more demanding transformer training.

If you would be so kind to give us an example of how to get our distributed training running I would be so grateful.

Good bot, I presume?

Thank you! To get distributed TensorFlow training running with Ray Train, you should use the MultiWorkerMirroredStrategy and ensure your dataset and model creation are inside the strategy’s scope. However, with recent TensorFlow/Keras versions (especially Keras 3.x), you may encounter compatibility issues. The official Ray example below is known to work with TensorFlow 2.x and Keras 2.x, but may require setting os.environ["TF_USE_LEGACY_KERAS"] = "1" before importing TensorFlow if you are using Keras 3.x (see discussion).

Here is a minimal working example for distributed training on CIFAR-10 using Ray Train and TensorFlow:

import os
os.environ["TF_USE_LEGACY_KERAS"] = "1"  # Only if using Keras 3.x

import ray
import tensorflow as tf
from ray.train import ScalingConfig
from ray.train.tensorflow import TensorflowTrainer

def build_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(32, 32, 3)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(10, activation='softmax')
    ])
    model.compile(
        optimizer='adam',
        loss='sparse_categorical_crossentropy',
        metrics=['accuracy']
    )
    return model

def train_func(config):
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    (x_train, y_train), _ = tf.keras.datasets.cifar10.load_data()
    x_train = x_train.astype("float32") / 255.0
    y_train = y_train.astype("int64")
    batch_size = config.get("batch_size", 64)
    epochs = config.get("epochs", 2)
    with strategy.scope():
        model = build_model()
    model.fit(x_train, y_train, batch_size=batch_size, epochs=epochs)

trainer = TensorflowTrainer(
    train_loop_per_worker=train_func,
    train_loop_config={"batch_size": 64, "epochs": 2},
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)

if __name__ == "__main__":
    ray.init(address="auto")  # or ray.init() for local
    trainer.fit()

Make sure all nodes have the same TensorFlow and Ray versions, and that you start Ray on all nodes before running the script. If you still encounter the PerReplica error, try downgrading to TensorFlow 2.x and Keras 2.x, as this is a known compatibility issue (see Ray docs).

Would you like more detail on debugging multi-node setup or dataset sharding?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM

It seems to be very old version and very poorly compatible with anything modern.

Maybe we are supposed to instead use pytorch nowadays? How are other people doing distributed ray today?

Can you give us a very simple example of how to get a working distributed test with pytorch or other way that is used today?

Yes, PyTorch is currently the most widely used and best-supported framework for distributed training with Ray Train. Most recent Ray documentation and community examples focus on PyTorch, as it offers smoother integration and fewer compatibility issues than TensorFlow/Keras, especially with modern versions. According to the Ray PyTorch distributed training guide, you can easily run distributed training across multiple nodes and GPUs.

Here is a minimal, modern example for distributed training on CIFAR-10 using Ray Train and PyTorch:

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import ray.train.torch
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

def train_func():
    transform = transforms.Compose([transforms.ToTensor()])
    dataset = datasets.CIFAR10(root="/tmp/data", train=True, download=True, transform=transform)
    dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
    dataloader = ray.train.torch.prepare_data_loader(dataloader)

    model = nn.Sequential(
        nn.Flatten(),
        nn.Linear(32*32*3, 128),
        nn.ReLU(),
        nn.Linear(128, 10)
    )
    model = ray.train.torch.prepare_model(model)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

    for epoch in range(2):
        for X, y in dataloader:
            optimizer.zero_grad()
            pred = model(X)
            loss = criterion(pred, y)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)

if __name__ == "__main__":
    import ray
    ray.init(address="auto")  # or ray.init() for local
    trainer.fit()

This example is directly based on the official Ray PyTorch quickstart. Would you like a step-by-step explanation or more details on multi-node setup?

Sources:

Hint: Mention @RunLLM in the post for followups.