Tensorflowtrainer train way slower than (normal pandas and tensorflow)

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I was benchmarking ray using training time as a benchmarking factor for this I created a tensorflowtrainer using ray train, with 4 workers each with 1 cpu. I created ray cluster with only head node(8 cpu) and 0 worker node on gcp so that I can compare the ray distributed training against normal tensorflow written along with pandas without any framework like ray or dask and I found ray ro be slower than pandas on only 1.2GB of training data ray took almost 883 sec to train where normal pandas with tensorflow completed the task in around 400 sec over single epoch. There was no case of object store being full. So am I doing something wrong or is it really true

My trainer code look something like this:

def tran_func(config: dict):
learning_rate = config.get(‘lr’, 0.001)
epochs = config.get(‘epochs’, 4)
batch_size = config.get(‘batch_size’, 64)

strategy = tf.distribute.MultiWorkerMirroredStrategy()
train_data_shard = session.get_dataset_shard("train")
# valid_data_shard = session.get_dataset_shard("valid")

# scoping the strategy
with strategy.scope():
    model = build_model()
    model.compile(
        loss=tf.keras.losses.binary_crossentropy,
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        metrics=["accuracy"]
    )

def to_tf_dataset(data):
    ds = tf.data.Dataset.from_tensors((
        tf.convert_to_tensor(list(data['INPUT'])),
        tf.convert_to_tensor(list(data['diagnosis']))
    ))
    return prepare_dataset_shard(ds)

results = []
# Convert the validation dataset into a tf dataset
# This must be executed over one epoch/batches because it is a DatasetPipeline shard
# batch_val = pd.DataFrame(columns=['INPUT', 'diagnosis'])
# for batch in valid_data_shard.iter_batches():
#     batch_val = pd.concat([batch_val, batch])
# batch_val = to_tf_dataset(batch_val)

for epoch in range(epochs):
    # written data in pandas format
    for batch_train in train_data_shard.iter_batches(batch_size=batch_size):
        batch_train_data = to_tf_dataset(batch_train)
        history = model.fit(
            batch_train_data,
            verbose=0,
            # validation_data=batch_val,
            callbacks=[Callback(frequency=0)],
        )
    results.append(history.history)
    model.save("my_model")
    checkpoint = TensorflowCheckpoint.from_saved_model("my_model")

    session.report({
        'accuracy': history.history['accuracy'][0],
        'loss': history.history['loss'][0],
        # 'val_accuracy': history.history['val_accuracy'][0],
        # 'val_loss': history.history['val_loss'][0],
    },
        checkpoint=checkpoint
    )
return

tensorflow trainer

def train_tensorflow(num_workers: int, use_gpu: bool, epochs: int, training_data: ray.data.Dataset, steps_per_epoch: int, comet_project: str):
# validation_data: ray.data.Dataset,
# num_features = len(training_data.schema().names) - 2

config = {'lr': 1e-3, "batch_size": 1000,
          "epochs": epochs, "steps_per_epoch": steps_per_epoch, "num_features": 1}

trainer = TensorflowTrainer(
    train_loop_per_worker=tran_func,
    train_loop_config=config,
    scaling_config=ScalingConfig(
        num_workers=num_workers, resources_per_worker={"CPU": 1}, use_gpu=use_gpu, _max_cpu_fraction_per_node=0.8),
    run_config=RunConfig(
        name="breast_cancer",
        local_dir="./output",
        sync_config=SyncConfig(
            # upload_dir="gs://drug_classification",
            # sync_period=2,
            # sync_on_checkpoint=True
        ),
        checkpoint_config=CheckpointConfig(
            num_to_keep=2,
            checkpoint_score_attribute="accuracy",
            checkpoint_score_order="max"
        ),
    ),
    datasets={"train": training_data},
    # "valid": validation_data
    dataset_config={
        "train": DatasetConfig(
            max_object_store_memory_fraction=0.8,
            split=True,
            fit=True,
            transform=True,
        ),
        # "valid": DatasetConfig(
        #     max_object_store_memory_fraction=0.2,
        #     split=True,
        #     fit=False,
        #     transform=True
        # )
    },

    preprocessor=get_preprocessors()
)
results = trainer.fit()
return results

Preprocessing code look something like this:

def UDF(df: pd.DataFrame):

df = df.drop(columns=["id"])
return df

preprocessor

def get_preprocessors():

preprocessor = Chain(
    StandardScaler(columns=train_column),
    LabelEncoder(label_column="diagnosis"),
    BatchMapper(UDF, batch_format="pandas"),
    Concatenator(output_column_name="INPUT", exclude=["diagnosis"])
)
return preprocessor

Although my trainers are completed utilizing the cpu

This is duplicate of Ray train tensorflowtrainer look slower than than (normal pandas and tensorflow) i.e without using distribution training or any framework - #2 by xwjiang2010