How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Currently, I’m considering switching my training pipeline to ray train + ray data.
I’ve done some benchmarking,
My model consumes data at a rate 1000-2000 MB/s
Ray data + ray train showed the following throughput:
workers=1 elapsed 7.5s speed_mb:1359.0MB/s total_speed_md:1359.0MB/s
workers=4 elapsed 7.5s speed_mb:339.8MB/s total_speed_md:1359.0MB/s
-
Is it true, that ~1200.0MB/s is the maximum throughput for data, or could it be improved. As I understand RAM throughput is > 10GB/s. Though it’s a theoretical limit, there must be a place for improvement.
-
After enabling local_shuffle_buffer_size I got following results
workers=1 elapsed 57.4s speed_mb:178.0MB/s total_speed_md:178.0MB/s
workers=4 elapsed 31.0s speed_mb:82.5MB/s total_speed_md:330.0MB/s
It looks like x7 degradation. Is there any option to fix it, I’m probably doing something wrong.
Thank you
Benchmark code:
import os
os.environ["RAY_DEDUP_LOGS"] = "0"
import torch
import time
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
import ray
ray.data.DataContext.get_current().execution_options.verbose_progress = True
ray.init(num_cpus=16, dashboard_host="0.0.0.0", logging_level="WARN")
GB = 1024 * 1024 * 1024
MB = 1024 * 1024
SIZE_GB = 10
COLS = 100
ROWS = SIZE_GB * GB // 4 // COLS
ds = ray.data.range_tensor(ROWS, shape=(COLS,))
BATCH_SIZE = 80000
DEBUG = False
WORKERS = 1 if DEBUG else 4
logger.info(f"SIZE_GB:{SIZE_GB}GB ROWS:{ROWS} COLS:{COLS}")
ds.materialize()
def benchmark_iter(ds, workers):
logger.warning(f"start")
start_ts = time.time()
for batch in ds.iter_torch_batches(batch_size=BATCH_SIZE, local_shuffle_buffer_size=BATCH_SIZE*3):
pass
elapsed = time.time() - start_ts
total_speed_mb = SIZE_GB * GB // MB // elapsed
logger.warning(f"{workers=} end elapsed {elapsed:.1f}s speed_mb:{total_speed_mb/workers:.1f}MB/s total_speed_md:{total_speed_mb:.1f}MB/s")
def train_loop_per_worker():
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
train_ds = train.get_dataset_shard("train")
benchmark_iter(train_ds, workers=WORKERS)
benchmark_iter(train_ds, workers=WORKERS)
benchmark_iter(ds, workers=1)
my_trainer = TorchTrainer(
train_loop_per_worker,
scaling_config=ScalingConfig(num_workers=WORKERS),
datasets={"train": ds},
)
my_trainer.fit()