Ray data throughput

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Currently, I’m considering switching my training pipeline to ray train + ray data.
I’ve done some benchmarking,
My model consumes data at a rate 1000-2000 MB/s

Ray data + ray train showed the following throughput:
workers=1 elapsed 7.5s speed_mb:1359.0MB/s total_speed_md:1359.0MB/s
workers=4 elapsed 7.5s speed_mb:339.8MB/s total_speed_md:1359.0MB/s

  1. Is it true, that ~1200.0MB/s is the maximum throughput for data, or could it be improved. As I understand RAM throughput is > 10GB/s. Though it’s a theoretical limit, there must be a place for improvement.

  2. After enabling local_shuffle_buffer_size I got following results
    workers=1 elapsed 57.4s speed_mb:178.0MB/s total_speed_md:178.0MB/s
    workers=4 elapsed 31.0s speed_mb:82.5MB/s total_speed_md:330.0MB/s
    It looks like x7 degradation. Is there any option to fix it, I’m probably doing something wrong.

Thank you

Benchmark code:

import os
os.environ["RAY_DEDUP_LOGS"] = "0"

import torch
import time
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")


import ray


ray.data.DataContext.get_current().execution_options.verbose_progress = True
ray.init(num_cpus=16, dashboard_host="0.0.0.0", logging_level="WARN")

GB = 1024 * 1024 * 1024
MB = 1024 * 1024
SIZE_GB = 10
COLS = 100
ROWS = SIZE_GB * GB // 4 // COLS

ds = ray.data.range_tensor(ROWS, shape=(COLS,))

BATCH_SIZE = 80000
DEBUG = False
WORKERS = 1 if DEBUG else 4
logger.info(f"SIZE_GB:{SIZE_GB}GB ROWS:{ROWS} COLS:{COLS}")
ds.materialize()


def benchmark_iter(ds, workers):
    logger.warning(f"start")
    start_ts = time.time()
    for batch in ds.iter_torch_batches(batch_size=BATCH_SIZE, local_shuffle_buffer_size=BATCH_SIZE*3):
        pass
    elapsed = time.time() - start_ts
    total_speed_mb = SIZE_GB * GB // MB // elapsed
    logger.warning(f"{workers=} end elapsed {elapsed:.1f}s speed_mb:{total_speed_mb/workers:.1f}MB/s total_speed_md:{total_speed_mb:.1f}MB/s")
    
def train_loop_per_worker():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    
    train_ds = train.get_dataset_shard("train")
    benchmark_iter(train_ds, workers=WORKERS)
    benchmark_iter(train_ds, workers=WORKERS)

benchmark_iter(ds, workers=1)
my_trainer = TorchTrainer(
    train_loop_per_worker,
    scaling_config=ScalingConfig(num_workers=WORKERS),
    datasets={"train": ds},
)

my_trainer.fit()

Hey @Aleksei , thanks for your interest in Ray Train/Data.

  1. In the code example shared above, I see you are materializing the dataset (ds.materialize()) before it is consumed by the Trainer. This has the effect of caching the dataset in memory prior to iterating, which can improve performance for repeated iterations over the same dataset with the tradeoff of extra memory used. It’s possible that removing this caching could potentially help free up extra memory to improve throughput. However, if your workload is GPU-bound, then this may not have much impact (this is more applicable later on when you add the model training code).

  2. For local shuffle buffer reducing throughput with Ray Trainer, this is a currently known issue and we plan on a fix in the near future: [Data] Enabling local shuffle buffer reduces throughput when iterating with Ray Trainer · Issue #42317 · ray-project/ray · GitHub
    For now, we recommend disabling the local shuffle buffer for maximum throughput if your model training workflow does not require it. Another possible option (depending on your dataset) could be to use file-level shuffling (see shuffle="files" parameter in read_images for an example).

Thank you for your answer, sjl
Sorry, I don’t get your point: “It’s possible that removing this caching could potentially help free up extra memory to improve throughput”
I agree that I will free memory, but how can this improve throughput? As I understand memory has the fastest throughput compared to s3/storage.

Yes, you’re correct. Usually the bottleneck is the network when using cloud storage. One related case is if materializing the dataset causes object spilling, this could adversely impact the throughput, as the data will need to be serialized/deserialized from the object store.

Another potential option for increasing throughput is to try increasing the batch size to the maximum possible size without causing OOM / running out of GRAM, so that Ray can process as many rows per second. You can use the Ray Dashboard for both of these use cases (monitoring memory / GRAM).