I’m working to migrate my training pipelines to use ray.Data. There has not been any effort to distribute the dataset yet, my first attempts are with 1 worker.
My current solution to train a Keras model uses TFRecords to produce a dataset like this:
def get_dataset(filenames, batch_size=32):
def decode_tfrec_fn(record_bytes):
rec = tf.io.parse_single_example(
record_bytes,
{ "X": tf.io.FixedLenFeature([], tf.string),
"Y": tf.io.FixedLenFeature([], tf.string), } )
return rec
def fetch_dataset(filename):
return tf.data.TFRecordDataset(filename)
def prepare_sample(rec):
return rec['X'], rec['Y']
dataset = tf.data.TFRecordDataset(filenames).map(decode_tfrec_fn).map(prepare_sample).batch(batch_size)
return dataset
To implement ray.Data I have produced Parquet folders to match each TFRecord file.
My goal is to create TFRecord pipelines that can be used in:
- Classification problems using Torch
- Classification problems using Tensorflow
Starting with #1, I’d reference the Ray example Training a Torch Classifier
So far I have patched together:
import ray
from ray.data.dataset_pipeline import DatasetPipeline
from ray.data.datasource.file_meta_provider import DefaultFileMetadataProvider
def create_shuffle_pipeline(
training_data_dirs: list, num_epochs: int, num_shards: int
) -> List[DatasetPipeline]:
return (
ray.data.read_parquet_bulk(training_data_dirs, meta_provider=DefaultFileMetadataProvider())
.repeat(num_epochs)
.random_shuffle_each_window()
.split(num_shards, equal=True)
)[0]
So far it’s not working yet, more investigation to come. I managed to use the Parquet files while training a Torch model one file but attempting any shuffling was dreadfully slow, and worse when trying with ray.data.context.DatasetContext.get_current().use_push_based_shuffle = True
, but perhaps that’s only beneficial with a multi-node setup.
for f in train_filenames:
train_loader = ray.data.read_parquet(f).iter_torch_batches(batch_size=32)
for data in train_loader:
#2 Reviewing the code in ray.Data, it seems to convert back to TFRecords for feeding the ray.data.dataset_pipeline.DatasetPipeline to a tf.keras.model, a Keras Generator is created under the hood. I understand from Keras documentation that the Generator method is the slowest, but perhaps that is offset by using the use_multiprocessing=True argument when calling fit() or evaluate()?