How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hi there,
I have a dataset stored as a set of parquet files in S3 under the same prefix:
# my dataset
s3://my-bucket/my-prefix/file-1.pq
s3://my-bucket/my-prefix/file-2.pq
s3://my-bucket/my-prefix/file-3.pq
...
s3://my-bucket/my-prefix/file-10.pq
(Let’s assume for now that I only have 10 files).
Each file contains the time-series data for a length 5401 sequence with 15 feature values, so the shape of my data is (5401, 15)
.
When I load this into a Ray Dataset, Ray treats each row as its own element and gives me back a dataset with num_rows=54010
:
>>> import ray
>>> ds = ray.data.read_parquet("s3://my-bucket/my-prefix/")
>>> ds
Dataset(num_blocks=10, num_rows=54010, schema={# my columns #}
The problem is that I’m trying to train a sequence model (think Transformer / LSTM) that performs classification on the length 5401 sequences. So my question is: what would be the most idiomatic way for me to group / batch my rows such that I can iterate over batches of (5401, 15)
sized tensors when feeding my Dataset Pipeline into Ray Train?
For full disclosure, I’m ultimately trying to do something like this:
from torch.data.utils import DataLoader
from ray import train
import ray
def train_func(config):
# get shard of the dataset containing data for N sequences
# --- shape: [N x 5401 x 15]
dataset_pipeline_shard = ray.train.get_dataset_shard()
# wrap dataset into Torch DataLoader with given batch size
data_loader = DataLoader(dataset_pipeline_shard, batch_size=config['batch_size'])
data_loader = train.torch.prepare_data_loader(data_loader)
# train
for _ in range(config['epochs']):
for batch in data_loader:
# ... train script goes here ...
# batch is [batch_size x 5401 x 15]
if __name__ == "__main__":
# create connection to Ray K8s cluster
ray.init(...)
# create Dataset Pipeline
dataset_pipeline = # do magic here
# run Ray Train
trainer = Trainer(num_workers=2, backend="torch")
trainer.start()
trainer.run(train_func, config={"batch_size": 64, "epochs": 10}, dataset=dataset_pipeline)
trainer.shutdown()
Any help would be greatly appreciated, and I would be more than happy to follow up with additional details!