Hi, I’m testing tensorflow mnist example with ray train, and I hope the code can take up less cores when executing, so I set the inter_op_parallelism_threads and intra_op_parallelism_threads for tensorflow, but it doesn’t get the desired result.
When I tested it in tensorflow code without ray, it works. So I guess the ways I set it up may be incorrect. Could you give me some advices? Thanks~
-
the machine
Thread(s) per core: 2 Core(s) per socket: 32 Socket(s): 2
-
tensorflow minist example without ray
import numpy as np import tensorflow as tf from tensorflow.keras.callbacks import Callback import tensorflow as tf # set inter_op & intra_op # tf.config.threading.set_inter_op_parallelism_threads(1) # tf.config.threading.set_intra_op_parallelism_threads(1) # tf.config.set_soft_device_placement(True) def mnist_dataset(batch_size): (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() x_train = x_train / np.float32(255) y_train = y_train.astype(np.int64) train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(60000) .repeat() .batch(batch_size) ) return train_dataset def train_and_save_model(): import tensorflow as tf train_dataset = mnist_dataset(batch_size=10240) model = tf.keras.Sequential( [ tf.keras.Input(shape=(28, 28)), tf.keras.layers.Reshape(target_shape=(28, 28, 1)), tf.keras.layers.Conv2D(32, 3, activation="relu"), tf.keras.layers.Flatten(), tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10), ] ) model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.SGD(learning_rate=1e-3), metrics=["accuracy"], ) train_result = model.fit(train_dataset, steps_per_epoch=70, epochs=30) print("result: ", train_result.history) if __name__ == "__main__": train_and_save_model()
-
Core usage during execution(without setting)
-
Core usage during execution (setting inter_op & intra_op)
-
-
tensorflow minist example with ray
import json import os import numpy as np import tensorflow as tf from tensorflow.keras.callbacks import Callback import ray.train as train from ray.train import Trainer import ray import tensorflow as tf # set inter_op & intra_op like the former # tf.config.threading.set_inter_op_parallelism_threads(1) # tf.config.threading.set_intra_op_parallelism_threads(1) # tf.config.set_soft_device_placement(True) class TrainReportCallback(Callback): def on_epoch_end(self, epoch, logs=None): train.report(**logs) def mnist_dataset(batch_size): (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() x_train = x_train / np.float32(255) y_train = y_train.astype(np.int64) train_dataset = ( tf.data.Dataset.from_tensor_slices((x_train, y_train)) .shuffle(60000) .repeat() .batch(batch_size) ) return train_dataset def build_and_compile_cnn_model(config): learning_rate = config.get("lr", 0.001) model = tf.keras.Sequential( [ tf.keras.Input(shape=(28, 28)), tf.keras.layers.Reshape(target_shape=(28, 28, 1)), tf.keras.layers.Conv2D(32, 3, activation="relu"), tf.keras.layers.Flatten(), tf.keras.layers.Dense(128, activation="relu"), tf.keras.layers.Dense(10), ] ) model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate), metrics=["accuracy"], ) return model def train_func(config): per_worker_batch_size = config.get("batch_size", 64) epochs = config.get("epochs", 3) steps_per_epoch = config.get("steps_per_epoch", 70) tf_config = json.loads(os.environ["TF_CONFIG"]) num_workers = len(tf_config["cluster"]["worker"]) strategy = tf.distribute.MultiWorkerMirroredStrategy() global_batch_size = per_worker_batch_size * num_workers multi_worker_dataset = mnist_dataset(global_batch_size) with strategy.scope(): multi_worker_model = build_and_compile_cnn_model(config) history = multi_worker_model.fit( multi_worker_dataset, epochs=epochs, steps_per_epoch=steps_per_epoch, callbacks=[TrainReportCallback()], ) results = history.history return results def train_tensorflow_mnist(num_workers=1, use_gpu=False, epochs=30): trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu) trainer.start() results = trainer.run( train_func=train_func, config={"lr": 1e-3, "batch_size": 10240, "epochs": epochs} ) trainer.shutdown() print(f"Results: {results[0]}") if __name__ == "__main__": ray.init() train_tensorflow_mnist(num_workers=1, use_gpu=False, epochs=30)
- Core usage during execution
Besides setting them like the code above, I also tried setting the environment variable via ‘export’.
And I also tried setting ‘runtime_env’ in trainer.py:
But the core usage didn’t change as expected.runtime_env = { 'env_vars': { "OMP_NUM_THREADS": "1", "inter_op_parallelism_threads": "1", "intra_op_parallelism_threads": "1" }, "local_tf_session_args": { "intra_op_parallelism_threads": "1", "inter_op_parallelism_threads": "1", }, }
- Core usage during execution