[Ray Train] Memory overloading rapidly while training TensorFlow model

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi, I have been experimenting with Ray for some time now, at first with datasets, then for training Scikit-Learn distribution and now for distributinf a Tensorflow Model. However, I am experiencing troubles with memory rapidly growing while fitting this TF model. I am aware that deep-learning models do not distribute in the same manner as Scikit-Learn models and that they should be treated as two separate problems.

My problem with is that I am trying to scale the training of a Tensorflow model built and compiled using Keras. This training works fine when using a small dataset of ~30 observations, but when scaling to 1000 observations (which should not be that much) my cluster’s memory gets filled and the training job crashes.

The dataset I am using consists of a features column containg tensors of ~ 1 000 000 length and multiple labels columns which are encoded using ray.data.preprocessors.LabelEncoder then one-hot encoded into a tensor using a custom class of ray.data.preprocessors.OneHotEncoder.

I have tried to reduce the memory usage of this training with partial success. To do so, I have used the Streaming Ingest as well as Splitting auxiliary datasets described in the documentation for configuring training datasets. I have also used the Dummy Trainer to debug data ingest for the original distribution of the training with Ray.

Here is the code for the trainer definition:
The fit_model() method is part of a class used to preprocess data, build, train and predict using a Tensorflow model. Datasets used for training and validation are stored in a Ray dataset objects but are slightly different. They both contain a __value__ tensor of features column but the training dataset also contains a label tensor of labels column. The dataset used for prediction only contains the __value__ tensor of features column.

import tensorflow as tf
from ray.air import session
from ray.air.integrations.keras import Callback
from ray.air.config import ScalingConfig, DatasetConfig
from ray.train.tensorflow import TensorflowTrainer, TensorflowCheckpoint

from ray.air.config import RunConfig

def fit_model(self, datasets):
    # Transform the datasets using a pre-fitted preprocessor
    # Chained Min-Max scaling, Label Encoding and One-Hot Encoding
    # Min-Max scaling of features and One-Hot Encoding of labels are custom classes to work on tensors directly
    for name, ds in datasets.items():
        ds = self._preprocessor.transform(ds)
        datasets[name] = ds

    # Training parameters
    self._train_params = {
        'batch_size': self.batch_size, # 64
        'epochs': self._training_epochs, # 10
        'size': self._nb_kmers, # Number of features : ~ 1 000 000
        'nb_cls':self._nb_classes, # Number of classes : ~ 48 000
        'model': self.classifier # Model name
    }

    # Define TF trainer
    self._trainer = TensorflowTrainer(
        train_loop_per_worker = train_func, # Training function defined lower
        train_loop_config = self._train_params, # Training parameters defined above
        scaling_config = ScalingConfig(
            trainer_resources={'CPU': 1}, # Default ray training resources https://docs.ray.io/en/latest/ray-air/package-ref.html#ray.air.config.ScalingConfig
            num_workers = self._n_workers, # 3
            use_gpu = self._use_gpu, # Default for testing is False
            resources_per_worker={'CPU': self._nb_CPU_per_worker} # 17
        ),
        dataset_config = {
            'train': DatasetConfig(
                fit = False, # Don't fit a preprocessor since none is passed and it is already fitted
                transform = False, # Don't transform the dataset since it is already transformed
                split = True, # Split the dataset accross training workers
                use_stream_api = True # Use the stream API to use DatasetPipeline in training function
            ),
            'validation': DatasetConfig(
                fit = False, # Don't fit a preprocessor since this is the validation dataset
                transform = False, # Don't transform the dataset since it is already transformed
                split = True, # Split the dataset accross training workers
                use_stream_api = True # Use the stream API to use DatasetPipeline in training function
            )
        },
        run_config = RunConfig(
            name = self.classifier, # Name of the model
            local_dir = self._workdir, # Path to a directory for spilling data
        ),
        datasets = datasets, # {train: Ray dataset, validation: Ray dataset}
    )

    training_result = self._trainer.fit() # Train the model

The training function is outside of the class as mentioned on this Ray discussion subject

import pandas as pd

def train_func(config):
    # Get parameters from config dict
    epochs = config.get('epochs', 10)
    size = config.get('size')
    nb_cls = config.get('nb_cls')
    model = config.get('model')

    # Build/compile model in a distributed manner
    strategy = tf.distribute.MultiWorkerMirroredStrategy()
    with strategy.scope():
        model = build_model(model, nb_cls, size)

    # Get dataset shard equivalent to a window from DatasetPipeline passed to the trainer
    train_data = session.get_dataset_shard('train')
    val_data = session.get_dataset_shard('validation')

    # Function to convert a dataset shard into a tf dataset
    def to_tf_dataset(data):
        ds = tf.data.Dataset.from_tensors((
            tf.convert_to_tensor(list(data['__value__'])),
            tf.convert_to_tensor(list(data['labels']))
        ))
        return ds

    results = []
    # Convert the validation dataset into a tf dataset
    # This must be executed over one epoch/batches because it is a DatasetPipeline shard
    batch_val = pd.DataFrame(columns=['__value__', 'labels'])
    for epoch in val_data.iter_epochs(1):
        for batch in epoch.iter_batches():
            batch_val = pd.concat([batch_val, batch])
    batch_val = to_tf_dataset(batch_val)

    # Fit the model using training DatasetPipeline shard
    for epoch_train in train_data.iter_epochs(epochs): # Iterate over epochs
        for batch_train in epoch_train.iter_batches(): # Iterate over batches
            batch_train = to_tf_dataset(batch_train) # Convert the batch into a tf dataset
            history = model.fit(
                batch_train,
                validation_data=batch_val,
                callbacks=[Callback()], # Default ray.air.integrations.keras.Callback
                verbose=0
            )
            results.append(history.history)
            # Report metrics and checkpoint to the trainer
            session.report({
                'accuracy': history.history['accuracy'][0],
                'loss': history.history['loss'][0],
                'val_accuracy': history.history['val_accuracy'][0],
                'val_loss': history.history['val_loss'][0],
            },
                checkpoint=TensorflowCheckpoint.from_model(model)
            )

I am using a cluster of 64 CPU cores with 249G RAM.
My python 3.8.10 environment is located in a Singularity container and uses Ray 2.2.0.

Is there a way to reduce the memory usage of this training function that I missed ? Or should I put certain objects into the shared memory using ray.put()?

@nicdemon thanks for submitting

Do you know if the memory issue is related to the node or the GPU memory? Have you tried reducing the number of trainers that are running in parallel, by setting num_workers to a smaller value than 3?

Hi @ClarenceNg , thank you for your answer.
I am not using a GPU right now, so I doubt this have any link to the GPU memory.
Also, I have tried reducing the number of trainers running in parallel on my local computer (which worked well with a small dataset) but not on the cluster, I will try this and get back to you.

I have tried to reduce the number of trainers running in parallel but nothing changed.
I have seen form a forum discussion post that the number of parallel trainers as well as the number of workers might influence the number of times the data is copied (1 copy per parallel worker) so using a window of 1Gb for the DatasetConfig(use_stream_api = True) would copy this 1Gb by the number of parallel workers. I have then tried to use less trainers/worker and diminishing window size but nothing changed.

cc: @matthewdeng for ideas too

I have ran some experiments since last time I posted and even with Scikit-Learn models the memory fills up rapidly with the same dataset, so maybe this has to do with the way the data are represented and distributed?

This seems to be a recurrent issue known in the TensorFlow community…

These issues on GitHub describe well the problem : #15887 and #33030.

There are two main possible causes identified when searching over internet :

  • Keras’ high level API copies data somewhere saturating the memory
  • Python’s garbage collector is not optimal for collecting residues from tensorflow training

The TensoFlow Core Team is aware of this memory issue but has not provided any solution on it yet. Therefore, there are some workarounds that were found by users to be effective :

  • Installing and using the tcmalloc garbage collector
  • Calling Python’s gc.collect() and/or Keras’ tf.keras.backend.clear_session() after each epoch
  • Executing each call to model.fit() into a different process and killing this process after it finished computing
  • Writing / overloading custom training functions to ensure all training steps are known and don’t leak memory

I will try these workarounds for my current projet, but I guess that the optimal solution would be to wait on the TensoFlow Core Team to address this problem.

Using Prometheus as stated in the Ray Documentation I have been able to profile the memory usage for my Scikit-Learn models as well as my TensorFlow models and I can confirm that the Scikit-Learn models use more memory than the Tensorflow ones (~7 Gb vs ~5 Gb peak memory usage for a testing dataset using the same batch_size).

I have tried some of the aforementioned possible solutions and am still getting an Error saying that one of my workers died of OOM… Here is the output :

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/execution/ray_trial_executor.py", line 1070, in get_next_executor_event
    future_result = ray.get(ready_future)
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError: e[36mray::_Inner.train()e[39m (pid=1061244, ip=10.80.43.151, repr=TensorflowTrainer)
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/trainable/trainable.py", line 367, in train
    raise skipped from exception_cause(skipped)
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/trainable/function_trainable.py", line 335, in entrypoint
    return self._trainable_func(
  File "/usr/local/lib/python3.8/dist-packages/ray/train/base_trainer.py", line 480, in _trainable_func
    super()._trainable_func(self._merged_config, reporter, checkpoint_dir)
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/trainable/function_trainable.py", line 652, in _trainable_func
    output = fn()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/base_trainer.py", line 390, in train_func
    trainer.training_loop()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/data_parallel_trainer.py", line 376, in training_loop
    self._report(training_iterator)
  File "/usr/local/lib/python3.8/dist-packages/ray/train/data_parallel_trainer.py", line 324, in _report
    for results in training_iterator:
  File "/usr/local/lib/python3.8/dist-packages/ray/train/trainer.py", line 134, in __next__
    next_results = self._run_with_error_handling(self._fetch_next_result)
  File "/usr/local/lib/python3.8/dist-packages/ray/train/trainer.py", line 97, in _run_with_error_handling
    return func()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/trainer.py", line 168, in _fetch_next_result
    results = self._backend_executor.get_next_results()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/backend_executor.py", line 439, in get_next_results
    results = self.get_with_failure_handling(futures)
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/backend_executor.py", line 528, in get_with_failure_handling
    self._increment_failures()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/backend_executor.py", line 590, in _increment_failures
    raise failure
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
	class_name: RayTrainWorker
	actor_id: dbe01899fc5669af98016acd01000000
	pid: 1063802
	namespace: 728b8df5-9d52-4908-89e4-2e1fb39a4ebe
	ip: 10.80.43.151
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
2023-02-09 04:28:33,652	ERROR tune.py:758 -- Trials did not complete: [TensorflowTrainer_e50fe_00000]
Traceback (most recent call last):
  File "/usr/local/bin/Caribou_classification_train_cv.py", line 86, in <module>
    bacteria_classification_train_cv(opt)
  File "/usr/local/bin/Caribou_classification_train_cv.py", line 52, in bacteria_classification_train_cv
    ClassificationMethods(
  File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 131, in execute_training
    self._train_model(taxa)
  File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 139, in _train_model
    self._multiclass_training(taxa)
  File "/usr/local/lib/python3.8/dist-packages/models/classification.py", line 230, in _multiclass_training
    self.models[taxa].train(self._training_datasets, self._database_data, self._cv)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 173, in train
    self._cross_validation(df, df_test, kmers_ds)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 215, in _cross_validation
    self._fit_model(datasets)
  File "/usr/local/lib/python3.8/dist-packages/models/kerasTF/ray_keras_tf.py", line 288, in _fit_model
    training_result = self._trainer.fit()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/base_trainer.py", line 360, in fit
    raise result.error
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/execution/ray_trial_executor.py", line 1070, in get_next_executor_event
    future_result = ray.get(ready_future)
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/client_mode_hook.py", line 105, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/ray/_private/worker.py", line 2309, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError: e[36mray::_Inner.train()e[39m (pid=1061244, ip=10.80.43.151, repr=TensorflowTrainer)
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/trainable/trainable.py", line 367, in train
    raise skipped from exception_cause(skipped)
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/trainable/function_trainable.py", line 335, in entrypoint
    return self._trainable_func(
  File "/usr/local/lib/python3.8/dist-packages/ray/train/base_trainer.py", line 480, in _trainable_func
    super()._trainable_func(self._merged_config, reporter, checkpoint_dir)
  File "/usr/local/lib/python3.8/dist-packages/ray/tune/trainable/function_trainable.py", line 652, in _trainable_func
    output = fn()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/base_trainer.py", line 390, in train_func
    trainer.training_loop()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/data_parallel_trainer.py", line 376, in training_loop
    self._report(training_iterator)
  File "/usr/local/lib/python3.8/dist-packages/ray/train/data_parallel_trainer.py", line 324, in _report
    for results in training_iterator:
  File "/usr/local/lib/python3.8/dist-packages/ray/train/trainer.py", line 134, in __next__
    next_results = self._run_with_error_handling(self._fetch_next_result)
  File "/usr/local/lib/python3.8/dist-packages/ray/train/trainer.py", line 97, in _run_with_error_handling
    return func()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/trainer.py", line 168, in _fetch_next_result
    results = self._backend_executor.get_next_results()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/backend_executor.py", line 439, in get_next_results
    results = self.get_with_failure_handling(futures)
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/backend_executor.py", line 528, in get_with_failure_handling
    self._increment_failures()
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/backend_executor.py", line 590, in _increment_failures
    raise failure
  File "/usr/local/lib/python3.8/dist-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayActorError: The actor died unexpectedly before finishing this task.
	class_name: RayTrainWorker
	actor_id: dbe01899fc5669af98016acd01000000
	pid: 1063802
	namespace: 728b8df5-9d52-4908-89e4-2e1fb39a4ebe
	ip: 10.80.43.151
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
slurmstepd: error: Detected 2 oom-kill event(s) in StepId=13557350.batch. Some of your processes may have been killed by the cgroup out-of-memory handler.

For now, the solutions I have tried are :

I guess I’ll have to try :

Even though I felt like that was already what Ray was doing…

Hi,
Thanks for putting together all these notes. Before jumping into “moving each call to model.fit() into a different process”, I wonder if we could dig more into the cause of OOM?

If both tf and scikit-learn are experiencing the same thing, I wonder maybe it’s something related to ray data (i.e. not specific to what training framework the training function is using).
Can we then maybe use the same data ingestion pattern but use a dummy trainer to see if there is the same issue?

@ClarenceNg For OOM killer, is there a way to see which actor(s)/task(s) are consuming the most memory at the time the killer kicks in? Maybe the name of them?

@Huaiwei_Sun If we find the cause of this, sounds like a good debuggability and observability story.

1 Like

@nicdemon

The log indicates the actor died and might be due to OOM, it is not certain as it just noticed the process is dead, there are several ways to verify that

  • check the memory usage on the node to see if it was close to the limit at any point of time, if so it is likely triggering OOM
  • check the kernel logs to see if there are OOM events (e.g. via dmesg, etc)
  • check raylet logs and search for “running low on memory”, to see if Ray has triggered killing of any of the tasks, example logs can be found here: Out-Of-Memory Prevention — Ray 2.2.0

cc: @sangcho to think about what dashboard could do help debug this

Sorry for the delay, I had to to test the options for debugging.

I have the output from the DummyTrainer as @xwjiang2010 mentioned :

Result for DummyTrainer_e2f56_00000:
  _time_this_iter_s: 4.656460523605347
  _timestamp: 1677185284
  _training_iteration: 99
  batch_delay: 0.0028449230012483895
  batches_read: 99
  bytes_read: 3213524
  date: 2023-02-23_15-48-07
  done: false
  epochs_read: 10
  experiment_id: 7d230f16d69a40888a5e3de3b2e66840
  experiment_tag: '0'
  hostname: nl20401.narval.calcul.quebec
  iterations_since_restore: 99
  node_ip: 10.80.44.151
  pid: 2979320
  time_since_restore: 203.61900353431702
  time_this_iter_s: 0.012787342071533203
  time_total_s: 203.61900353431702
  timestamp: 1677185287
  timesteps_since_restore: 0
  training_iteration: 99
  trial_id: e2f56_00000
  warmup_time: 5.473980903625488

It crashed on the last iteration but I think the results are fine anyways :

RuntimeError: Some workers returned results while others didn't. Make sure that `session.report()` are called the same number of times on all workers.

I have also reran a simpler version of the program to access the logs as @ClarenceNg mentionned but the task didn’t crash due to OOM.

After talking with people from my lab, they told me that I had too many features to train on. So I reduced the number of features from 1,000,000 to 10,000 and now the memory taken for training is around 400Gb, which can be handled by the cluster I am using.