Why inter_op_parallelism_threads and intra_op_parallelism_threads don't work when using ray train

Hi, I’m testing tensorflow mnist example with ray train, and I hope the code can take up less cores when executing, so I set the inter_op_parallelism_threads and intra_op_parallelism_threads for tensorflow, but it doesn’t get the desired result.
When I tested it in tensorflow code without ray, it works. So I guess the ways I set it up may be incorrect. Could you give me some advices? Thanks~

  1. the machine

    Thread(s) per core:              2
    Core(s) per socket:              32
    Socket(s):                       2
    
  2. tensorflow minist example without ray

    import numpy as np
    import tensorflow as tf
    from tensorflow.keras.callbacks import Callback
    import tensorflow as tf
    
    # set inter_op & intra_op
    # tf.config.threading.set_inter_op_parallelism_threads(1) 
    # tf.config.threading.set_intra_op_parallelism_threads(1)
    # tf.config.set_soft_device_placement(True)
    
    def mnist_dataset(batch_size):
        (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
        x_train = x_train / np.float32(255)
        y_train = y_train.astype(np.int64)
        train_dataset = (
            tf.data.Dataset.from_tensor_slices((x_train, y_train))
            .shuffle(60000)
            .repeat()
            .batch(batch_size)
        )
        return train_dataset
    
    def train_and_save_model():
        import tensorflow as tf
    
        train_dataset = mnist_dataset(batch_size=10240)
    
        model = tf.keras.Sequential(
            [
                tf.keras.Input(shape=(28, 28)),
                tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
                tf.keras.layers.Conv2D(32, 3, activation="relu"),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(128, activation="relu"),
                tf.keras.layers.Dense(10),
            ]
        )
        model.compile(
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            optimizer=tf.keras.optimizers.SGD(learning_rate=1e-3),
            metrics=["accuracy"],
        )
        train_result = model.fit(train_dataset, steps_per_epoch=70, epochs=30)
        print("result: ", train_result.history)
    
    if __name__ == "__main__":
        train_and_save_model()
    
  3. tensorflow minist example with ray

    import json
    import os
    
    import numpy as np
    import tensorflow as tf
    from tensorflow.keras.callbacks import Callback
    
    import ray.train as train
    from ray.train import Trainer
    import ray
    
    import tensorflow as tf
    # set inter_op & intra_op like the former
    # tf.config.threading.set_inter_op_parallelism_threads(1) 
    # tf.config.threading.set_intra_op_parallelism_threads(1)
    # tf.config.set_soft_device_placement(True)
    
    class TrainReportCallback(Callback):
        def on_epoch_end(self, epoch, logs=None):
            train.report(**logs)
    
    def mnist_dataset(batch_size):
        (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
        x_train = x_train / np.float32(255)
        y_train = y_train.astype(np.int64)
        train_dataset = (
            tf.data.Dataset.from_tensor_slices((x_train, y_train))
            .shuffle(60000)
            .repeat()
            .batch(batch_size)
        )
        return train_dataset
    
    def build_and_compile_cnn_model(config):
        learning_rate = config.get("lr", 0.001)
        model = tf.keras.Sequential(
            [
                tf.keras.Input(shape=(28, 28)),
                tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
                tf.keras.layers.Conv2D(32, 3, activation="relu"),
                tf.keras.layers.Flatten(),
                tf.keras.layers.Dense(128, activation="relu"),
                tf.keras.layers.Dense(10),
            ]
        )
        model.compile(
            loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
            optimizer=tf.keras.optimizers.SGD(learning_rate=learning_rate),
            metrics=["accuracy"],
        )
        return model
    
    def train_func(config):
        per_worker_batch_size = config.get("batch_size", 64)
        epochs = config.get("epochs", 3)
        steps_per_epoch = config.get("steps_per_epoch", 70)
    
        tf_config = json.loads(os.environ["TF_CONFIG"])
        num_workers = len(tf_config["cluster"]["worker"])
    
        strategy = tf.distribute.MultiWorkerMirroredStrategy()
    
        global_batch_size = per_worker_batch_size * num_workers
        multi_worker_dataset = mnist_dataset(global_batch_size)
    
        with strategy.scope():
            multi_worker_model = build_and_compile_cnn_model(config)
    
        history = multi_worker_model.fit(
            multi_worker_dataset,
            epochs=epochs,
            steps_per_epoch=steps_per_epoch,
            callbacks=[TrainReportCallback()],
        )
        results = history.history
        return results
    
    def train_tensorflow_mnist(num_workers=1, use_gpu=False, epochs=30):
        trainer = Trainer(backend="tensorflow", num_workers=num_workers, use_gpu=use_gpu)
        trainer.start()
        results = trainer.run(
            train_func=train_func, config={"lr": 1e-3, "batch_size": 10240, "epochs": epochs}
        )
        trainer.shutdown()
        print(f"Results: {results[0]}")
    
    if __name__ == "__main__":
        ray.init()
        train_tensorflow_mnist(num_workers=1, use_gpu=False, epochs=30)
    
    
    • Core usage during execution
      Besides setting them like the code above, I also tried setting the environment variable via ‘export’.
      And I also tried setting ‘runtime_env’ in trainer.py:
      runtime_env = {
                  'env_vars': {
                      "OMP_NUM_THREADS": "1",
                      "inter_op_parallelism_threads": "1",
                      "intra_op_parallelism_threads": "1"
                  },
                  "local_tf_session_args": {
                      "intra_op_parallelism_threads": "1",
                      "inter_op_parallelism_threads": "1",
                  },
              }
      
      But the core usage didn’t change as expected.

Hi @KepingYan,
Thanks for putting up this question!
It seems that Ray automatically sets OMP_NUM_THREADS=1 by default and this may affect TF’s threading story.

Hi, thanks for your reply. I have tried modifying OMP_NUM_THREADS according to configuring-parallelism-with-omp-num-threads. This works on pytorch, but it doesn’t work on tensorflow. So I further modified inter_op & intra_op for tensorflow, but this doesn’t work on tensorflow with Ray.

Ah got it! I tried everything you provided and observed the same thing as your reported. @KepingYan
I think it has something to do with how tensorflow passes this setting around. It probably doesn’t go through env variable - whereas pytorch probably does.
Just for an FYI, the actors that Ray starts will automatically inherit the same runtime_env.env_vars from its parents and all the way to the most top level ray.init(runtime_env=runtime_env). That is why your threading setting takes effect for pytorch but not tensorflow.

So I tried to put the following

tf.config.threading.set_inter_op_parallelism_threads(1)
tf.config.threading.set_intra_op_parallelism_threads(1)
tf.config.set_soft_device_placement(True)

in train_func in your last snippet and run it on Ray, and that gave me the expected results.

Also, as I think about it, Ray Train is usually used when people want to fully leverage multiple cores on multiple machines for distributed data parallel training. But in this case, you want to limit core usage, which I think is a bit different requirement.

Also cc @amogkam

1 Like

Thank you very much @xwjiang2010 , your method also works for me. Just now I had a new discovery. Based on what you provided( may affect ), I set TF_NUM_INTEROP_THREADS & TF_NUM_INTRAOP_THREADS in runtime_env and it also works. Thanks again~

1 Like