How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hi, I have been experimenting with Ray for some time now, at first with datasets, then for training Scikit-Learn distribution and now for distributinf a Tensorflow Model. However, I am experiencing troubles with memory rapidly growing while fitting this TF model. I am aware that deep-learning models do not distribute in the same manner as Scikit-Learn models and that they should be treated as two separate problems.
My problem with is that I am trying to scale the training of a Tensorflow model built and compiled using Keras. This training works fine when using a small dataset of ~30 observations, but when scaling to 1000 observations (which should not be that much) my cluster’s memory gets filled and the training job crashes.
The dataset I am using consists of a features column containg tensors of ~ 1 000 000 length and multiple labels columns which are encoded using ray.data.preprocessors.LabelEncoder
then one-hot encoded into a tensor using a custom class of ray.data.preprocessors.OneHotEncoder
.
I have tried to reduce the memory usage of this training with partial success. To do so, I have used the Streaming Ingest as well as Splitting auxiliary datasets described in the documentation for configuring training datasets. I have also used the Dummy Trainer to debug data ingest for the original distribution of the training with Ray.
Here is the code for the trainer definition:
The fit_model() method is part of a class used to preprocess data, build, train and predict using a Tensorflow model. Datasets used for training and validation are stored in a Ray dataset objects but are slightly different. They both contain a __value__ tensor of features column but the training dataset also contains a label tensor of labels column. The dataset used for prediction only contains the __value__ tensor of features column.
import tensorflow as tf
from ray.air import session
from ray.air.integrations.keras import Callback
from ray.air.config import ScalingConfig, DatasetConfig
from ray.train.tensorflow import TensorflowTrainer, TensorflowCheckpoint
from ray.air.config import RunConfig
def fit_model(self, datasets):
# Transform the datasets using a pre-fitted preprocessor
# Chained Min-Max scaling, Label Encoding and One-Hot Encoding
# Min-Max scaling of features and One-Hot Encoding of labels are custom classes to work on tensors directly
for name, ds in datasets.items():
ds = self._preprocessor.transform(ds)
datasets[name] = ds
# Training parameters
self._train_params = {
'batch_size': self.batch_size, # 64
'epochs': self._training_epochs, # 10
'size': self._nb_kmers, # Number of features : ~ 1 000 000
'nb_cls':self._nb_classes, # Number of classes : ~ 48 000
'model': self.classifier # Model name
}
# Define TF trainer
self._trainer = TensorflowTrainer(
train_loop_per_worker = train_func, # Training function defined lower
train_loop_config = self._train_params, # Training parameters defined above
scaling_config = ScalingConfig(
trainer_resources={'CPU': 1}, # Default ray training resources https://docs.ray.io/en/latest/ray-air/package-ref.html#ray.air.config.ScalingConfig
num_workers = self._n_workers, # 3
use_gpu = self._use_gpu, # Default for testing is False
resources_per_worker={'CPU': self._nb_CPU_per_worker} # 17
),
dataset_config = {
'train': DatasetConfig(
fit = False, # Don't fit a preprocessor since none is passed and it is already fitted
transform = False, # Don't transform the dataset since it is already transformed
split = True, # Split the dataset accross training workers
use_stream_api = True # Use the stream API to use DatasetPipeline in training function
),
'validation': DatasetConfig(
fit = False, # Don't fit a preprocessor since this is the validation dataset
transform = False, # Don't transform the dataset since it is already transformed
split = True, # Split the dataset accross training workers
use_stream_api = True # Use the stream API to use DatasetPipeline in training function
)
},
run_config = RunConfig(
name = self.classifier, # Name of the model
local_dir = self._workdir, # Path to a directory for spilling data
),
datasets = datasets, # {train: Ray dataset, validation: Ray dataset}
)
training_result = self._trainer.fit() # Train the model
The training function is outside of the class as mentioned on this Ray discussion subject
import pandas as pd
def train_func(config):
# Get parameters from config dict
epochs = config.get('epochs', 10)
size = config.get('size')
nb_cls = config.get('nb_cls')
model = config.get('model')
# Build/compile model in a distributed manner
strategy = tf.distribute.MultiWorkerMirroredStrategy()
with strategy.scope():
model = build_model(model, nb_cls, size)
# Get dataset shard equivalent to a window from DatasetPipeline passed to the trainer
train_data = session.get_dataset_shard('train')
val_data = session.get_dataset_shard('validation')
# Function to convert a dataset shard into a tf dataset
def to_tf_dataset(data):
ds = tf.data.Dataset.from_tensors((
tf.convert_to_tensor(list(data['__value__'])),
tf.convert_to_tensor(list(data['labels']))
))
return ds
results = []
# Convert the validation dataset into a tf dataset
# This must be executed over one epoch/batches because it is a DatasetPipeline shard
batch_val = pd.DataFrame(columns=['__value__', 'labels'])
for epoch in val_data.iter_epochs(1):
for batch in epoch.iter_batches():
batch_val = pd.concat([batch_val, batch])
batch_val = to_tf_dataset(batch_val)
# Fit the model using training DatasetPipeline shard
for epoch_train in train_data.iter_epochs(epochs): # Iterate over epochs
for batch_train in epoch_train.iter_batches(): # Iterate over batches
batch_train = to_tf_dataset(batch_train) # Convert the batch into a tf dataset
history = model.fit(
batch_train,
validation_data=batch_val,
callbacks=[Callback()], # Default ray.air.integrations.keras.Callback
verbose=0
)
results.append(history.history)
# Report metrics and checkpoint to the trainer
session.report({
'accuracy': history.history['accuracy'][0],
'loss': history.history['loss'][0],
'val_accuracy': history.history['val_accuracy'][0],
'val_loss': history.history['val_loss'][0],
},
checkpoint=TensorflowCheckpoint.from_model(model)
)
I am using a cluster of 64 CPU cores with 249G RAM.
My python 3.8.10 environment is located in a Singularity container and uses Ray 2.2.0.
Is there a way to reduce the memory usage of this training function that I missed ? Or should I put certain objects into the shared memory using ray.put()
?