How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
Hi there,
I’ve been working with Ray Datasets + Ray Train for a few weeks now and have generally been struggling to get them to work together for a large scale data ingest + training job. I initially was using the two separately in Ray 1.13 but recently upgraded to Ray 3.0.0.dev0 because I thought some of the improvements in AIR would help me overcome my issues. At the moment I’m trying to use the DummyTrainer to debug my ingest pipeline, but am confused by some of the logs (shared below) and would greatly appreciate a second pair of eyes / advice on how to tweak my setup.
As a TL;DR, my main question is whether anyone can tell if I’m failing to specify that my 32 GPU workers should be fetching and preprocessing blocks into the object store on their respective nodes using their own CPU allocations? And if I’m failing to do this, how can I modify my Trainer code to make this happen?
Ray Version
3.0.0.dev0 (I’m using the nightly build)
Ray Cluster Setup
I’m trying to parallelize the training of a deep learning model across 4 p3.16xlarge instances. Each node has:
- 8 GPUs
- 32 physical CPUs
- 488 GB memory
I also have my head node running on a r5dn.24xlarge with 700 GB of memory.
Currently my cluster is running on kubernetes with one pod for the head node and 4 pods (one per node) for the GPU workers. At the moment I’m fixing the min and max number of GPU workers to be 4 so that I don’t have to worry about issues related to autoscaling. Finally, I’m allocating 256GB to the object store on each node.
Dataset
I have a dataset of 4,000 parquet files with an average file size of ~100 MB (i.e. 400GB total). When these files are pulled into memory I’ve seen them use ~3-5x memory so the total dataset size in memory is probably in the ballpark of 1 - 2 TB.
However, for the purpose of debugging my ingest pipeline I’ve limited my dataset to just 40 files in the experiment below.
My parquet files have the following schema (when read into a pandas dataframe):
{
  "uuid": dytpe('O'),  # these are strings
  "event_ts": dtype('int64'),
  "data": <ray.data.extensions.tensor_extension.TensorDtype>,
  "label": dtype('bool'),
}
Each row of the data column is a float tensor of shape (1125, 15).
Dataset + Trainer Setup
At a high-level, my code for loading the dataset(s) and running the trainer is the following:
# read datasets
train_dataset = ray.data.read_parquet(
    TRAIN_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
val_dataset = ray.data.read_parquet(
    VAL_CORPUS_S3_PREFIX,
    parallelism=VAL_READ_PARALLELISM,
)
# create preprocessor
preprocessor = BatchMapper(lambda df: dataset_preprocessing(df))
# set train loop config
config = {'epochs': NUM_EPOCHS, ...}
trainer = DummyTrainer(
    train_loop_config=config,
    scaling_config=ScalingConfig(
        num_workers=32,  
        use_gpu=True,
        # resources_per_worker={"gpu-node": 0.125},
        _max_cpu_fraction_per_node=0.8,
    ),
    datasets={
        "train": train_dataset,
        "val": val_dataset,
    },
    dataset_config={
        "train": DatasetConfig(fit=True, split=True, global_shuffle=False),
        "val": DatasetConfig(fit=False, split=False, global_shuffle=False),
    },
    run_config=RunConfig(
        callbacks=[TBXLoggerCallback()]
    ),
    preprocessor=preprocessor,
)
result = trainer.fit()
The dataset_preprocessing function contains mostly vectorized pandas functions that perform actions such as normalizing the data column with a precomputed vector of means and standard deviations. The function also does have to read a few large dictionaries from the object store (these are used to add columns to the dataframe, I will likely push this step into my dataset generation in the future).
DummyTrainer Logs
When ingest + training starts I see the following:
(TunerInternal pid=2535) 2022-07-24 15:02:22,495	WARNING trial_runner.py:328 -- The maximum number of pending trials has been automatically set to the number of available cluster CPUs, which is high (211 CPUs/pending trials). If you're running an experiment with a large number of trials, this could lead to scheduling overhead. In this case, consider setting the `TUNE_MAX_PENDING_TRIALS_PG` environment variable to the desired maximum number of concurrent trials.
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Starting dataset preprocessing
(TunerInternal pid=2535) == Status ==
(TunerInternal pid=2535) Current time: 2022-07-24 15:02:25 (running for 00:00:03.20)
(TunerInternal pid=2535) Memory usage on this node: 16.5/747.8 GiB
(TunerInternal pid=2535) Using FIFO scheduling algorithm.
(TunerInternal pid=2535) Resources requested: 1.0/192 CPUs, 32.0/32 GPUs, 0.0/1820.0 GiB heap, 0.0/1192.09 GiB objects (0.0/4.0 accelerator_type:V100, 0.0/4.0 gpu-node)
(TunerInternal pid=2535) Result logdir: /home/ray/ray_results/DummyTrainer_2022-07-24_15-02-22
(TunerInternal pid=2535) Number of trials: 1/1 (1 RUNNING)
(TunerInternal pid=2535) +--------------------------+----------+---------------------+
(TunerInternal pid=2535) | Trial name               | status   | loc                 |
(TunerInternal pid=2535) |--------------------------+----------+---------------------|
(TunerInternal pid=2535) | DummyTrainer_4b53b_00000 | RUNNING  | 192.168.36.111:7820 |
(TunerInternal pid=2535) +--------------------------+----------+---------------------+
(TunerInternal pid=2535) 
I don’t think the warning matters as I’m only running 1 trial, and if I understand correctly the Memory usage on this node: 16.5/747.8 GiB is likely referring to the head node. My first real concern is with resources requested:
Resources requested: 1.0/192 CPUs, 32.0/32 GPUs, 0.0/1820.0 GiB heap, 0.0/1192.09 GiB objects (0.0/4.0 accelerator_type:V100, 0.0/4.0 gpu-node)
I’m not sure if it is normal / bad that I’m only seeing 1 CPU and 0 memory requested?
After ~4-5 mins the dataset finishes preprocessing and I see a variety of logs returned from different nodes. I’ve tried to distill the highlights into the following:
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Preprocessed datasets in 255.53620773599687 seconds
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Preprocessor BatchMapper(fn=<lambda>)
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Preprocessor transform stats:
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) 
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Stage 1 read->map_batches: 20/20 blocks executed in 122.81s
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote wall time: 116.7s min, 122.76s max, 119.37s mean, 2387.49s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote cpu time: 115.19s min, 120.64s max, 117.66s mean, 2353.18s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output num rows: 98 min, 239 max, 148 mean, 2975 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output size bytes: 12393270 min, 30224069 max, 18811156 mean, 376223129 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) 
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Stage 2 randomize_block_order: 20/20 blocks executed in 0s
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote wall time: 116.7s min, 122.76s max, 119.37s mean, 2387.49s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote cpu time: 115.19s min, 120.64s max, 117.66s mean, 2353.18s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output num rows: 98 min, 239 max, 148 mean, 2975 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output size bytes: 12393270 min, 30224069 max, 18811156 mean, 376223129 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
....
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 1 read->map_batches: 20/20 blocks executed in 132.7s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 123.21s min, 130.3s max, 125.29s mean, 2505.77s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 118.78s min, 127.4s max, 121.3s mean, 2426.01s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 533 min, 880 max, 670 mean, 13406 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) 
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 2 randomize_block_order: 20/20 blocks executed in 0s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 123.21s min, 130.3s max, 125.29s mean, 2505.77s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 118.78s min, 127.4s max, 121.3s mean, 2426.01s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 533 min, 880 max, 670 mean, 13406 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) 
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 3 split: 1/1 blocks executed in 2.96s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 29.32ms min, 29.32ms max, 29.32ms mean, 29.32ms total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 29.32ms min, 29.32ms max, 29.32ms mean, 29.32ms total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 1027008000.0 min, 1027008000.0 max, 1027008000 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 418 min, 418 max, 418 mean, 418 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 52863546 min, 52863546 max, 52863546 mean, 52863546 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used
....
....
(BaseWorkerMixin pid=7968, ip=xxx.xxx.xx.111) Starting train loop on worker [0-31]  # repeated for all 32 workers
....
....
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Time to read all data 0.03665820999958669 seconds
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) P50/P95/Max batch delay (s) 0.03427452100004302 0.03427452100004302 0.03427452100004302
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Num epochs read 1
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Num batches read 1
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Mean throughput 1375.27 MiB/s
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Starting train loop on worker 31
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Time to read all data 0.03883258899804787 seconds
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) P50/P95/Max batch delay (s) 0.035741562001931015 0.035741562001931015 0.035741562001931015
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Num epochs read 1
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Num batches read 1
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Mean throughput 1298.26 MiB/s
...
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Time to read all data 0.04808187500020722 seconds
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) P50/P95/Max batch delay (s) 0.04587110400098027 0.04587110400098027 0.04587110400098027
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Num epochs read 1
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Num batches read 1
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Mean throughput 1048.51 MiB/s
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Time to read all data 0.1946309669983748 seconds
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) P50/P95/Max batch delay (s) 0.1920619730008184 0.1920619730008184 0.1920619730008184
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Num epochs read 1
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Num batches read 1
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Mean throughput 259.01 MiB/s
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Time to read all data 0.18603510400134837 seconds
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) P50/P95/Max batch delay (s) 0.1831144299976586 0.1831144299976586 0.1831144299976586
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Num epochs read 1
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Num batches read 1
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Mean throughput 270.98 MiB/s
....
....
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Dataset iterator time breakdown:
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In ray.wait(): 264.24us
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In ray.get(): 245.47ms
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In next_batch(): 2.83ms
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In format_batch(): 38.39us
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In user code: 1.78ms
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Total time: 271.48ms
....
....
(TunerInternal pid=2535) Result for DummyTrainer_4b53b_00000:
(TunerInternal pid=2535)   _time_this_iter_s: 0.6157190799713135
(TunerInternal pid=2535)   _timestamp: 1658700408
(TunerInternal pid=2535)   _training_iteration: 1
(TunerInternal pid=2535)   batch_delay: 0.26968138799929875
(TunerInternal pid=2535)   batches_read: 1
(TunerInternal pid=2535)   bytes_read: 52863546
(TunerInternal pid=2535)   date: 2022-07-24_15-06-48
(TunerInternal pid=2535)   done: false
(TunerInternal pid=2535)   epochs_read: 1
(TunerInternal pid=2535)   experiment_id: e15cc259f2df4c56a5cca416b26562ac
(TunerInternal pid=2535)   hostname: my-release-ray-gpu-worker-type-bwrcr
(TunerInternal pid=2535)   iterations_since_restore: 1
(TunerInternal pid=2535)   node_ip: 192.168.36.111
(TunerInternal pid=2535)   pid: 7820
(TunerInternal pid=2535)   time_since_restore: 263.05760073661804
(TunerInternal pid=2535)   time_this_iter_s: 263.05760073661804
(TunerInternal pid=2535)   time_total_s: 263.05760073661804
(TunerInternal pid=2535)   timestamp: 1658700408
(TunerInternal pid=2535)   timesteps_since_restore: 0
(TunerInternal pid=2535)   training_iteration: 1
(TunerInternal pid=2535)   trial_id: 4b53b_00000
(TunerInternal pid=2535)   warmup_time: 0.010242223739624023
....
....
(TunerInternal pid=2535) 2022-07-24 15:06:54,171	WARNING util.py:216 -- The `process_trial_save` operation took 3.805 s, which may be a performance bottleneck.
(TunerInternal pid=2535) 2022-07-24 15:06:54,171	WARNING trial_runner.py:948 -- Consider turning off forced head-worker trial checkpoint syncs by setting sync_on_checkpoint=False. Note that this may result in faulty trial restoration if a failure occurs while the checkpoint is being synced from the worker to the head node.
(TunerInternal pid=2535) == Status ==
(TunerInternal pid=2535) Current time: 2022-07-24 15:06:55 (running for 00:04:33.02)
(TunerInternal pid=2535) Memory usage on this node: 16.7/747.8 GiB
(TunerInternal pid=2535) Using FIFO scheduling algorithm.
(TunerInternal pid=2535) Resources requested: 0/192 CPUs, 0/32 GPUs, 0.0/1820.0 GiB heap, 0.0/1192.09 GiB objects (0.0/4.0 accelerator_type:V100, 0.0/4.0 gpu-node)
(TunerInternal pid=2535) Result logdir: /home/ray/ray_results/DummyTrainer_2022-07-24_15-02-22
(TunerInternal pid=2535) Number of trials: 1/1 (1 TERMINATED)
(TunerInternal pid=2535) +--------------------------+------------+---------------------+--------+------------------+--------------+----------------+---------------+
(TunerInternal pid=2535) | Trial name               | status     | loc                 |   iter |   total time (s) |   bytes_read |   batches_read |   epochs_read |
(TunerInternal pid=2535) |--------------------------+------------+---------------------+--------+------------------+--------------+----------------+---------------|
(TunerInternal pid=2535) | DummyTrainer_4b53b_00000 | TERMINATED | 192.168.36.111:7820 |      1 |          263.058 |     52863546 |              1 |             1 |
(TunerInternal pid=2535) +--------------------------+------------+---------------------+--------+------------------+--------------+----------------+---------------+
(TunerInternal pid=2535) 
(TunerInternal pid=2535) 
2022-07-24 15:06:55,682	ERROR checkpoint_manager.py:137 -- The requested checkpoint is not available on this node, most likely because you are using Ray client or disabled checkpoint synchronization. To avoid this, enable checkpoint synchronization to cloud storage by specifying a `SyncConfig`. The checkpoint may be available on a different node - please check this location on worker nodes: /home/ray/ray_results/DummyTrainer_2022-07-24_15-02-22/DummyTrainer_4b53b_00000_0_2022-07-24_15-02-22/checkpoint_-00001
INFO:root:all done
(TunerInternal pid=2535) 2022-07-24 15:06:55,667	INFO tune.py:745 -- Total run time: 273.19 seconds (273.02 seconds for the tuning loop).
It seems that the dataset preprocessing is a major bottleneck and if I’m reading these logs correctly it appears that a single node (which I believe to be the head node) was preprocessing all of the blocks:
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 1 read->map_batches: 20/20 blocks executed in 132.7s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 123.21s min, 130.3s max, 125.29s mean, 2505.77s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 118.78s min, 127.4s max, 121.3s mean, 2426.01s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 533 min, 880 max, 670 mean, 13406 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
Also, based on the resources requested I’m wondering if it was using a single CPU to do so?
My main question is whether anyone can tell if I’m failing to specify that my 32 GPU workers should be fetching and preprocessing blocks into the object store on their respective nodes using their own CPU allocations? If I’m failing to do this, how can I modify my Trainer code to make this happen?
As a proposed solution, would modifying my ray.data.read_parquet function calls to be:
train_dataset = ray.data.read_parquet(
    TRAIN_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
    ray_remote_args={"resources": {"gpu-node": 1/32}},
)
Be sufficient to ensure that 1 CPU per GPU worker is reading a block at a time?





 .
 .