How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I was benchmarking ray using training time as a benchmarking factor for this I created a tensorflowtrainer using ray train, with 4 workers each with 1 cpu. I created ray cluster with only head node(8 cpu) and 0 worker node on gcp so that I can compare the ray distributed training against normal tensorflow written along with pandas without any framework like ray or dask and I found ray ro be slower than pandas on only 1.2GB of training data ray took almost 883 sec to train where normal pandas with tensorflow completed the task in around 400 sec over single epoch. There was no case of object store being full. So am I doing something wrong or is it really true
My trainer code look something like this
def tran_func(config: dict):
learning_rate = config.get('lr', 0.001)
epochs = config.get('epochs', 4)
batch_size = config.get('batch_size', 64)
strategy = tf.distribute.MultiWorkerMirroredStrategy()
train_data_shard = session.get_dataset_shard("train")
# scoping the strategy
with strategy.scope():
model = build_model()
model.compile(
loss=tf.keras.losses.binary_crossentropy,
optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
metrics=["accuracy"]
)
def to_tf_dataset(data):
ds = tf.data.Dataset.from_tensors((
tf.convert_to_tensor(list(data['INPUT'])),
tf.convert_to_tensor(list(data['diagnosis']))
))
return prepare_dataset_shard(ds)
results = []
for epoch in range(epochs):
# written data in pandas format
for batch_train in train_data_shard.iter_batches(batch_size=batch_size):
batch_train_data = to_tf_dataset(batch_train)
history = model.fit(
batch_train_data,
verbose=0
callbacks=[Callback(frequency=0)],
)
results.append(history.history)
model.save("my_model")
checkpoint = TensorflowCheckpoint.from_saved_model("my_model")
session.report({
'accuracy': history.history['accuracy'][0],
'loss': history.history['loss'][0],
},
checkpoint=checkpoint
)
return
tensorflow trainer
def train_tensorflow(num_workers: int, use_gpu: bool, epochs: int, training_data: ray.data.Dataset):
config = {'lr': 1e-3, "batch_size": 1000,
"epochs": epochs}
trainer = TensorflowTrainer(
train_loop_per_worker=tran_func,
train_loop_config=config,
scaling_config=ScalingConfig(
num_workers=num_workers, resources_per_worker={"CPU": 1}, use_gpu=use_gpu, _max_cpu_fraction_per_node=0.8),
run_config=RunConfig(
name="breast_cancer",
local_dir="./output",
sync_config=SyncConfig(
# upload_dir="gs://drug_classification",
# sync_period=2,
# sync_on_checkpoint=True
),
checkpoint_config=CheckpointConfig(
num_to_keep=2,
checkpoint_score_attribute="accuracy",
checkpoint_score_order="max"
),
),
datasets={"train": training_data},
dataset_config={
"train": DatasetConfig(
max_object_store_memory_fraction=0.8,
split=True,
fit=True,
transform=True,
),
},
preprocessor=get_preprocessors()
)
results = trainer.fit()
return results
def UDF (df: pd.DataFrame):
df = df.drop(columns=["id"])
return df
preprocessor
def get_preprocessors():
preprocessor = Chain (
StandardScaler(columns=train_column),
LabelEncoder(Label_column="diagnosis"),
BatchMapper(UDF,
batch_format="pandas"),
Concatenator(
output_column_name="INPUT",
exclude= ["diagnosis"])
)
return preprocessor