How to Keep Tensor Shape w/Ray Datasets?

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi there,

I have a dataset stored as a set of parquet files in S3 under the same prefix:

# my dataset
s3://my-bucket/my-prefix/file-1.pq
s3://my-bucket/my-prefix/file-2.pq
s3://my-bucket/my-prefix/file-3.pq
...
s3://my-bucket/my-prefix/file-10.pq

(Let’s assume for now that I only have 10 files).

Each file contains the time-series data for a length 5401 sequence with 15 feature values, so the shape of my data is (5401, 15).

When I load this into a Ray Dataset, Ray treats each row as its own element and gives me back a dataset with num_rows=54010:

>>> import ray
>>> ds = ray.data.read_parquet("s3://my-bucket/my-prefix/")
>>> ds
Dataset(num_blocks=10, num_rows=54010, schema={# my columns #}

The problem is that I’m trying to train a sequence model (think Transformer / LSTM) that performs classification on the length 5401 sequences. So my question is: what would be the most idiomatic way for me to group / batch my rows such that I can iterate over batches of (5401, 15) sized tensors when feeding my Dataset Pipeline into Ray Train?

For full disclosure, I’m ultimately trying to do something like this:

from torch.data.utils import DataLoader
from ray import train
import ray

def train_func(config):
    # get shard of the dataset containing data for N sequences
    # --- shape: [N x 5401 x 15]
    dataset_pipeline_shard = ray.train.get_dataset_shard()

    # wrap dataset into Torch DataLoader with given batch size
    data_loader = DataLoader(dataset_pipeline_shard, batch_size=config['batch_size'])
    data_loader = train.torch.prepare_data_loader(data_loader)
    
    # train
    for _ in range(config['epochs']):
        for batch in data_loader:
            # ... train script goes here ...
            # batch is [batch_size x 5401 x 15]


if __name__ == "__main__":
    # create connection to Ray K8s cluster
    ray.init(...)
    
    # create Dataset Pipeline
    dataset_pipeline = # do magic here

    # run Ray Train
    trainer = Trainer(num_workers=2, backend="torch")
    trainer.start()
    trainer.run(train_func, config={"batch_size": 64, "epochs": 10}, dataset=dataset_pipeline)
    trainer.shutdown()

Any help would be greatly appreciated, and I would be more than happy to follow up with additional details!

Hi @mrusso, thanks for posting this question!

We don’t recommend using Datasets with the Torch DataLoader, since the Torch DataLoader will unbatch your data before yielding tensor batches (which is inefficient), and the Datasets APIs are designed to provide the same functionality as the Torch DataLoader but in a distributed data setting.

Would ds.iter_batches(batch_size=5401) work (API)? For tabular (e.g. Parquet) data, that should return a (5401, 15) table as a Pandas DataFrame, which can then be converted into Torch tensors.

You also might want to look at the ds.to_torch(batch_size=5401) API, which will do this Torch tensor conversion for you and will return to you a Torch IterableDataset that yields tensor batches. E.g.:

def train_func(config):
    # get shard of the dataset containing data for N sequences
    # --- shape: [N x 5401 x 15]
    pipe = ray.train.get_dataset_shard()
    
    # train
    for epoch, epoch_pipe in zip(range(config['epochs']), pipe.iter_epochs()):
        for batch in epoch_pipe.to_torch(batch_size=5401):
            # ... train script goes here ...
            # batch is [batch_size x 5401 x 15]

Note that we iterate directly over the torch.data.utils.IterableDataset returned by .to_torch(), we do not give it to a Torch DataLoader.

1 Like

Hi Clark,

Thanks so much for the quick reply! This definitely looks like the approach I want to take, I just have a couple more questions related to your proposed solution.

Combining your example with my trainer code above, I assume I would be setting:

dataset_pipeline = ds.data.read_parquet("...")

And then passing that into trainer.run(). My first question is:

  1. Do I need to worry about the last sequence in some shards not having the full 5401 rows? My understanding is that Ray reads and partitions the dataset into physical blocks which are spread across the worker nodes. I don’t know if shards and blocks map one-to-one (or if e.g. one shard could consist of many blocks), but I just want to know if I’ll need to handle the potential corner case where the last sequence of 5401 rows didn’t fully fit into the shard?

Next, I notice that you have:

for epoch, epoch_pipe in zip(range(config['epochs']), pipe.iter_epochs()):

My second question is:

  1. Where is pipe (the shard) getting the number of epochs? Do I need to modify my code above to be:
dataset_pipeline = ds.data.read_parquet("...").repeat(config['epochs'])

to get it to match?

Finally, my last question is related to the inner for loop:

for batch in epoch_pipe.to_torch(batch_size=5401):
    # ... train script goes here ...
    # batch is [batch_size x 5401 x 15]
  1. if batch is [batch_size x 5401 x 15], where is it getting the batch_size dimension from?

Sorry for all of these questions, I am drinking from the firehose of documentation on Ray Datasets / Ray Train – which is really amazing – but I just need to refine my understanding on some of these finer details.