Ray train tensorflowtrainer look slower than than (normal pandas and tensorflow) i.e without using distribution training or any framework

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I was benchmarking ray using training time as a benchmarking factor for this I created a tensorflowtrainer using ray train, with 4 workers each with 1 cpu. I created ray cluster with only head node(8 cpu) and 0 worker node on gcp so that I can compare the ray distributed training against normal tensorflow written along with pandas without any framework like ray or dask and I found ray ro be slower than pandas on only 1.2GB of training data ray took almost 883 sec to train where normal pandas with tensorflow completed the task in around 400 sec over single epoch. There was no case of object store being full. So am I doing something wrong or is it really true
My trainer code look something like this

def tran_func(config: dict):

learning_rate = config.get('lr', 0.001)
epochs = config.get('epochs', 4)
batch_size = config.get('batch_size', 64)
strategy = tf.distribute.MultiWorkerMirroredStrategy()
train_data_shard = session.get_dataset_shard("train")

# scoping the strategy
with strategy.scope():
    model = build_model()
    model.compile(
        loss=tf.keras.losses.binary_crossentropy,
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        metrics=["accuracy"]
    )

def to_tf_dataset(data):
    ds = tf.data.Dataset.from_tensors((
        tf.convert_to_tensor(list(data['INPUT'])),
        tf.convert_to_tensor(list(data['diagnosis']))
    ))
    return prepare_dataset_shard(ds)

results = []


for epoch in range(epochs):
    # written data in pandas format
    for batch_train in train_data_shard.iter_batches(batch_size=batch_size):
        batch_train_data = to_tf_dataset(batch_train)
        history = model.fit(
            batch_train_data,
            verbose=0
            callbacks=[Callback(frequency=0)],
        )
    results.append(history.history)
    model.save("my_model")
    checkpoint = TensorflowCheckpoint.from_saved_model("my_model")

    session.report({
        'accuracy': history.history['accuracy'][0],
        'loss': history.history['loss'][0],
    },
        checkpoint=checkpoint
    )
return

tensorflow trainer

def train_tensorflow(num_workers: int, use_gpu: bool, epochs: int, training_data: ray.data.Dataset):

config = {'lr': 1e-3, "batch_size": 1000,
          "epochs": epochs}

trainer = TensorflowTrainer(
    train_loop_per_worker=tran_func,
    train_loop_config=config,
    scaling_config=ScalingConfig(
        num_workers=num_workers, resources_per_worker={"CPU": 1}, use_gpu=use_gpu, _max_cpu_fraction_per_node=0.8),
    run_config=RunConfig(
        name="breast_cancer",
        local_dir="./output",
        sync_config=SyncConfig(
            # upload_dir="gs://drug_classification",
            # sync_period=2,
            # sync_on_checkpoint=True
        ),
        checkpoint_config=CheckpointConfig(
            num_to_keep=2,
            checkpoint_score_attribute="accuracy",
            checkpoint_score_order="max"
        ),
    ),
    datasets={"train": training_data},
    dataset_config={
        "train": DatasetConfig(
            max_object_store_memory_fraction=0.8,
            split=True,
            fit=True,
            transform=True,
        ),

    },

    preprocessor=get_preprocessors()
)
results = trainer.fit()
return results

def UDF (df: pd.DataFrame):

df = df.drop(columns=["id"]) 
return df

preprocessor

def get_preprocessors():

preprocessor = Chain (
    StandardScaler(columns=train_column), 
    LabelEncoder(Label_column="diagnosis"), 
    BatchMapper(UDF, 
    batch_format="pandas"),
    Concatenator(
    output_column_name="INPUT", 
    exclude= ["diagnosis"])
     )

return preprocessor

Ray is best for scenarios when there are needs for scaling up.
I don’t think distributed training is necessarily faster than non distributed. If dataset is small and there is only one machine and you are using cpu, tensorflow non distributed is more efficient and you probably don’t need ray.
For distributed training, you are paying network communication cost for ring operation in order to spread computation load across multiple nodes. It only makes sense if the latter weighs more than the former. Ray AIR makes distributed training easier but you have to assess if you need distributed training to start with.

But initially CPU remains in the ray::IDLE process for very long time before the trainingworker executor starts, even after distributing the training to multiple nodes(2 worker 1 head)