Data loading of parquet files is very memory consuming

High: It blocks me to complete my task.

We have two data pipelines for feeding a Tensorflow model. Both pipelines aims to achieve the same thing.
The first pipeline: Tfrecords → Tensorflow
This pipeline acts as expected, reads initial data to perform shuffle of the fist 10000 samples and then starts ingesting data to the model with a high throughput, not much memory is consumed.

The second pipeline: Parquet files → Ray Dataset → Tensorflow
This pipeline seems to try and load all data in memory/object store and when the object store is full it starts spilling data to disk and if we dont have enough disk space the pipeline is not longer responsive.

In a “large scale” scenario we would have 300 files where each file is 100MB. Loading up to 5 of these files with the second pipeline does not cause any issue, but from 10+ the object store starts to get full and Ray starts spilling to disk, which at some point get full (limited storage on /).

Note (1): the second pipeline is using Ray Tune and Ray Train to sweep hyper parameters if that has any impact on how Ray consumes the data.

# Something like this
trainer = Trainer(backend="tensorflow", num_workers=1)
Trainable = trainer.to_tune_trainable(train_func, dataset={'data': load_parquet(list_of_files))
analysis = tune.run(Trainable,....) 

Note (2): both pipelines run on a single machine on AWS, however with a fair amount of RAM.

Here is then the question: How can we achieve the same behaviour with Ray Dataset and Parquet as with Tf Data and Tf Records, lazily reading and shuffling and only consuming the memory necessary for feeding the ML model with current batch size and prefetch data to increase speed of data ingestion to the GPU?
Tried to add experimental_lazy() to the pipeline, but this did not seem to have any impact.

To illustrate the pipelines .

# First pipeline: Tfrecords -> Tensorflow
def load_tfrecords(list_of_files):
  data = tf.data.TFRecordDataset(list_of_files)

  # Create a dictionary describing the features.
  feature_description = {
    'x': tf.io.FixedLenFeature([128,128,2], tf.float32),
    'y': tf.io.FixedLenFeature([100], tf.float32)
  }

  def _parse_function(example_proto):
    # Parse the input tf.train.Example proto using the dictionary above.
    sample = tf.io.parse_single_example(example_proto, feature_description)
    return sample['x'], \
           sample['y']

  parsed_dataset = data.map(_parse_function, num_parallel_calls=tf.data.AUTOTUNE)
  
  return parsed_dataset


def data_builder_tfrecords(list_of_files):
  dataset_pipeline = load_tfrecords(list_of_files) \
    .cache() \
    .shuffle(10000) \
    .batch(32) \
    .prefetch(tf.data.AUTOTUNE)

  return dataset_pipeline

def train_model(list_of_files, model):
    dataset = data_builder_tfrecords(list_of_files)
    history = model.fit(dataset, epochs=10)


# Second pipeline: Parquet -> Ray Dataset -> Tensorflow 
def load_parquet(list_of_file):
  data = ray.data.read_parquet(paths = list_of_files, \
                               tensor_column_schema={"x": ("float32", (128,128,2)), \
                                                     "y": ("float32", (100))}) \
                  .experimental_lazy() \ # can't see any difference adding this
                  .repeat()

  return data

def prepare_dataset_shard(dataset_shard: tf.data.Dataset):
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = \
      tf.data.experimental.AutoShardPolicy.OFF
  dataset = dataset_shard.with_options(options)
  return dataset

def train_model(list_of_files, model):
  # create dataset iterator
  dataset = load_parquet(list_of_files)
  dataset_iterator = dataset.iter_epochs()

  for epoch in range(10):
    dataset = next(dataset_iterator)
    tf_dataset = prepare_dataset_shard(
      raydataset.to_tf(
        dataset=dataset,
        feature_columns=["x"],
        label_column="y",
        output_signature=(
          tf.TensorSpec(shape=(None, 128, 128, 2), dtype=tf.float32),
          tf.TensorSpec(shape=(None, 100), dtype=tf.float32)),
        batch_size=32))
    history = model.fit(tf_dataset, epochs=1)

I think you might be looking for something like the .window() functionality in Ray Data.

Here’s a guide that aims to capture the idea, though it uses some newer APIs as an example.

Here’s a guide to using windows in Ray data: Pipelining Compute — Ray 1.13.0

cc @ericl

Thanks for an excellent answer as always!
Great to see that this is explicitly mentioned and further elaborated in the coming 3.0 documentation.