Migrating from TFRecords to ray.Data

I’m working to migrate my training pipelines to use ray.Data. There has not been any effort to distribute the dataset yet, my first attempts are with 1 worker.

My current solution to train a Keras model uses TFRecords to produce a dataset like this:

def get_dataset(filenames, batch_size=32):
    def decode_tfrec_fn(record_bytes):
        rec = tf.io.parse_single_example(
            record_bytes,
            { "X": tf.io.FixedLenFeature([], tf.string),
              "Y":  tf.io.FixedLenFeature([], tf.string), } )
        return rec

    def fetch_dataset(filename): 
      return tf.data.TFRecordDataset(filename)

    def prepare_sample(rec): 
      return rec['X'], rec['Y']

    dataset = tf.data.TFRecordDataset(filenames).map(decode_tfrec_fn).map(prepare_sample).batch(batch_size)

    return dataset

To implement ray.Data I have produced Parquet folders to match each TFRecord file.

My goal is to create TFRecord pipelines that can be used in:

  1. Classification problems using Torch
  2. Classification problems using Tensorflow

Starting with #1, I’d reference the Ray example Training a Torch Classifier

So far I have patched together:

import ray
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.datasource.file_meta_provider import DefaultFileMetadataProvider

def create_shuffle_pipeline(
    training_data_dirs: list, num_epochs: int, num_shards: int
) -> List[DatasetPipeline]:
    return (
        ray.data.read_parquet_bulk(training_data_dirs, meta_provider=DefaultFileMetadataProvider())
        .repeat(num_epochs)
        .random_shuffle_each_window()
        .split(num_shards, equal=True)
    )[0]

So far it’s not working yet, more investigation to come. I managed to use the Parquet files while training a Torch model one file but attempting any shuffling was dreadfully slow, and worse when trying with ray.data.context.DatasetContext.get_current().use_push_based_shuffle = True, but perhaps that’s only beneficial with a multi-node setup.

for f in train_filenames:
    train_loader = ray.data.read_parquet(f).iter_torch_batches(batch_size=32)
    for data in train_loader:

#2 Reviewing the code in ray.Data, it seems to convert back to TFRecords for feeding the ray.data.dataset_pipeline.DatasetPipeline to a tf.keras.model, a Keras Generator is created under the hood. I understand from Keras documentation that the Generator method is the slowest, but perhaps that is offset by using the use_multiprocessing=True argument when calling fit() or evaluate()?

Hey @Gregory, thanks for posting your question.

To implement ray.Data I have produced Parquet folders to match each TFRecord file.

Any reason you can’t read the TFRecord files directly with read_tfrecords?

I managed to use the Parquet files while training a Torch model one file but attempting any shuffling was dreadfully slow

random_shuffle_each_window is slow. It performs a global shuffle. If you’re using a Trainer, could you try performing a local shuffle instead? You can learn more in Shuffling Data.

Reviewing the code in ray.Data, it seems to convert back to TFRecords for feeding the ray.data.dataset_pipeline.DatasetPipeline to a tf.keras.model, a Keras Generator is created under the hood.

Are you referring to the implementation of to_tf?

Hi @bveeramani, thanks for looking at my thread and for your reply.

The example seen in the read_tfrecords link is:

features = tf.train.Features(
    feature={
        "length": tf.train.Feature(float_list=tf.train.FloatList(value=[5.1])),
        "width": tf.train.Feature(float_list=tf.train.FloatList(value=[3.5])),
        "species": tf.train.Feature(bytes_list=tf.train.BytesList(value=[b"setosa"])),
    }
)
example = tf.train.Example(features=features)

I believe I wasn’t able to use that method because my observations are multi-dimensional lists of lists.

The way I found to store my observations as tfrecords was iterating through each record, shown in the following example, but the resulting files aren’t recognized by the protobuf decoder as a tf.train.Example message type:

with tf.io.TFRecordWriter(tfrec_file, options=tf_data_options) as file_writer:

    for i in range(1000):

        record_bytes = tf.train.Example(features=tf.train.Features(feature={
            "x": tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.serialize_tensor( tf.convert_to_tensor(x[i])).numpy()])),
            "y": tf.train.Feature(bytes_list=tf.train.BytesList(value=[tf.io.serialize_tensor( tf.convert_to_tensor(y[i])).numpy()])),
         })
        ).SerializeToString()

        file_writer.write(record_bytes)

I also tried the following example through ray, but ran into the same issue with the limitation of multi-dimensional lists:

ds = ray.data.from_items([
        { "name": 0, "score": [ [42,43], [42,43] ] }, 
        { "name": 1, "score": [ [43,44], [43,44] ] } 
        ])
ds.write_tfrecords("/tmp/tfrec_example")

Also yes, you are correct: I was referring to to_tf in the initial post when referring to the Generator method (tf.data.Dataset.from_generator).

Regarding using parquet files and Torch, thank you for your suggestion of the local shuffle. Parquet files are new to me, but seem to be a preferred choice for ray.data. I’ll edit this message shortly to follow-up with specifics to the question encountered with ray.data.read_parquet_bulk.