Hi, I‘m comparing the performance of ray.util.sgd.tf.TFTrainer
and ray.train.Trainer
for our project, and the backend is tensorflow. When using ray.train.Trainer
, all cores are used by default, I also knowed how to set the number of cores with the help from @xwjiang2010 (how to set the number of cores in Raytrain). But when using TFTrainer
, only two or three cores is enabled by default. I tried setting parameters such as OMP_NUM_THREADS, TF_NUM_INTEROP_THREADS and TF_NUM_INTRAOP_THREADS but this doesn’t work. How could I set it?
sample code
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import numpy as np
import ray
from ray.util.sgd.tf import TFTrainer
NUM_TRAIN_SAMPLES = 1000
NUM_TEST_SAMPLES = 400
def create_config(batch_size):
return {
# todo: batch size needs to scale with # of workers
"batch_size": batch_size,
"fit_config": {
"steps_per_epoch": NUM_TRAIN_SAMPLES // batch_size
},
"evaluate_config": {
"steps": NUM_TEST_SAMPLES // batch_size,
}
}
def linear_dataset(a=2, size=1000):
x = np.random.rand(size)
y = x / 2
x = x.reshape((-1, 1))
y = y.reshape((-1, 1))
return x, y
def simple_dataset(config):
batch_size = config["batch_size"]
x_train, y_train = linear_dataset(size=NUM_TRAIN_SAMPLES)
x_test, y_test = linear_dataset(size=NUM_TEST_SAMPLES)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
train_dataset = train_dataset.shuffle(NUM_TRAIN_SAMPLES).repeat().batch(
batch_size)
test_dataset = test_dataset.repeat().batch(batch_size)
return train_dataset, test_dataset
def simple_model(config):
model = Sequential([Dense(10, input_shape=(1, )), Dense(1)])
model.compile(
optimizer="sgd",
loss="mean_squared_error",
metrics=["mean_squared_error"])
return model
def train_example(num_replicas=1, num_cpus_per_worker=1, batch_size=128, use_gpu=False):
trainer = TFTrainer(
model_creator=simple_model,
data_creator=simple_dataset,
num_replicas=num_replicas,
use_gpu=use_gpu,
num_cpus_per_worker=num_cpus_per_worker,
verbose=True,
config=create_config(batch_size))
import tensorflow as tf
tf.config.threading.set_inter_op_parallelism_threads(2)
tf.config.threading.set_intra_op_parallelism_threads(64)
tf.config.set_soft_device_placement(True)
# model baseline performance
start_stats = trainer.validate()
print(start_stats)
# train for 2 epochs
for i in range(1000):
trainer.train()
# model performance after training (should improve)
end_stats = trainer.validate()
print(end_stats)
if __name__ == "__main__":
ray.init(num_cpus=2)
train_example(num_replicas=1)
Core usage during execution