How to use more cores when use TFTrainer?

Hi, I‘m comparing the performance of ray.util.sgd.tf.TFTrainer and ray.train.Trainer for our project, and the backend is tensorflow. When using ray.train.Trainer, all cores are used by default, I also knowed how to set the number of cores with the help from @xwjiang2010 (how to set the number of cores in Raytrain). But when using TFTrainer, only two or three cores is enabled by default. I tried setting parameters such as OMP_NUM_THREADS, TF_NUM_INTEROP_THREADS and TF_NUM_INTRAOP_THREADS but this doesn’t work. How could I set it?

sample code

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
import numpy as np
import ray
from ray.util.sgd.tf import TFTrainer

NUM_TRAIN_SAMPLES = 1000
NUM_TEST_SAMPLES = 400

def create_config(batch_size):
    return {
        # todo: batch size needs to scale with # of workers
        "batch_size": batch_size,
        "fit_config": {
            "steps_per_epoch": NUM_TRAIN_SAMPLES // batch_size
        },
        "evaluate_config": {
            "steps": NUM_TEST_SAMPLES // batch_size,
        }
    }

def linear_dataset(a=2, size=1000):
    x = np.random.rand(size)
    y = x / 2

    x = x.reshape((-1, 1))
    y = y.reshape((-1, 1))

    return x, y

def simple_dataset(config):
    batch_size = config["batch_size"]
    x_train, y_train = linear_dataset(size=NUM_TRAIN_SAMPLES)
    x_test, y_test = linear_dataset(size=NUM_TEST_SAMPLES)

    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    test_dataset = tf.data.Dataset.from_tensor_slices((x_test, y_test))
    train_dataset = train_dataset.shuffle(NUM_TRAIN_SAMPLES).repeat().batch(
        batch_size)
    test_dataset = test_dataset.repeat().batch(batch_size)

    return train_dataset, test_dataset

def simple_model(config):
    model = Sequential([Dense(10, input_shape=(1, )), Dense(1)])

    model.compile(
        optimizer="sgd",
        loss="mean_squared_error",
        metrics=["mean_squared_error"])

    return model

def train_example(num_replicas=1, num_cpus_per_worker=1, batch_size=128, use_gpu=False):
    trainer = TFTrainer(
        model_creator=simple_model,
        data_creator=simple_dataset,
        num_replicas=num_replicas,
        use_gpu=use_gpu,
        num_cpus_per_worker=num_cpus_per_worker,
        verbose=True,
        config=create_config(batch_size))
    import tensorflow as tf
    tf.config.threading.set_inter_op_parallelism_threads(2) 
    tf.config.threading.set_intra_op_parallelism_threads(64)
    tf.config.set_soft_device_placement(True)
    # model baseline performance
    start_stats = trainer.validate()
    print(start_stats)

    # train for 2 epochs
    for i in range(1000):
      trainer.train()

    # model performance after training (should improve)
    end_stats = trainer.validate()
    print(end_stats)

if __name__ == "__main__":
    ray.init(num_cpus=2)
    train_example(num_replicas=1)

Core usage during execution

Hi @KepingYan,
Can you explain why do you need SGD trainer instead of ray train Trainer?
afaik, SGD stuff is already deprecated. Would love to learn more of your use case.

Thanks!

Thanks for your reply @xwjiang2010 , We previously provided users with an api using SGD trainer, now we migrated SGD trainer to ray train Trainer because SGD is deprecated. So I need to compare the performance of our code using these two APIs respectively. But in these two ways, the number of cpu cores started by default confuses me.

Got it. @KepingYan just FYI, the whole Ray ML libraries are now consolidated under Ray AIR (training is also part of it). You may want to take a look at the new Ray AIR training API. For its backend, the new API is just a nicer wrapper around Ray Train trainer and I wouldn’t expect any performance change in that regard.

I am not familiar with the SGD stuff - @amogkam could you comment on the legacy SGD tf trainer’s backend threading behavior?

Ok, We are also following up on the Ray AIR training API. Thank you!@xwjiang2010