Slow Large-Scale Ingest w/Ray AIR (Ray Data + Ray Train)

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi there,

I’ve been working with Ray Datasets + Ray Train for a few weeks now and have generally been struggling to get them to work together for a large scale data ingest + training job. I initially was using the two separately in Ray 1.13 but recently upgraded to Ray 3.0.0.dev0 because I thought some of the improvements in AIR would help me overcome my issues. At the moment I’m trying to use the DummyTrainer to debug my ingest pipeline, but am confused by some of the logs (shared below) and would greatly appreciate a second pair of eyes / advice on how to tweak my setup.

As a TL;DR, my main question is whether anyone can tell if I’m failing to specify that my 32 GPU workers should be fetching and preprocessing blocks into the object store on their respective nodes using their own CPU allocations? And if I’m failing to do this, how can I modify my Trainer code to make this happen?

Ray Version
3.0.0.dev0 (I’m using the nightly build)

Ray Cluster Setup
I’m trying to parallelize the training of a deep learning model across 4 p3.16xlarge instances. Each node has:

  • 8 GPUs
  • 32 physical CPUs
  • 488 GB memory

I also have my head node running on a r5dn.24xlarge with 700 GB of memory.

Currently my cluster is running on kubernetes with one pod for the head node and 4 pods (one per node) for the GPU workers. At the moment I’m fixing the min and max number of GPU workers to be 4 so that I don’t have to worry about issues related to autoscaling. Finally, I’m allocating 256GB to the object store on each node.

Dataset
I have a dataset of 4,000 parquet files with an average file size of ~100 MB (i.e. 400GB total). When these files are pulled into memory I’ve seen them use ~3-5x memory so the total dataset size in memory is probably in the ballpark of 1 - 2 TB.

However, for the purpose of debugging my ingest pipeline I’ve limited my dataset to just 40 files in the experiment below.

My parquet files have the following schema (when read into a pandas dataframe):

{
  "uuid": dytpe('O'),  # these are strings
  "event_ts": dtype('int64'),
  "data": <ray.data.extensions.tensor_extension.TensorDtype>,
  "label": dtype('bool'),
}

Each row of the data column is a float tensor of shape (1125, 15).

Dataset + Trainer Setup
At a high-level, my code for loading the dataset(s) and running the trainer is the following:

# read datasets
train_dataset = ray.data.read_parquet(
    TRAIN_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
val_dataset = ray.data.read_parquet(
    VAL_CORPUS_S3_PREFIX,
    parallelism=VAL_READ_PARALLELISM,
)

# create preprocessor
preprocessor = BatchMapper(lambda df: dataset_preprocessing(df))

# set train loop config
config = {'epochs': NUM_EPOCHS, ...}

trainer = DummyTrainer(
    train_loop_config=config,
    scaling_config=ScalingConfig(
        num_workers=32,  
        use_gpu=True,
        # resources_per_worker={"gpu-node": 0.125},
        _max_cpu_fraction_per_node=0.8,
    ),
    datasets={
        "train": train_dataset,
        "val": val_dataset,
    },
    dataset_config={
        "train": DatasetConfig(fit=True, split=True, global_shuffle=False),
        "val": DatasetConfig(fit=False, split=False, global_shuffle=False),
    },
    run_config=RunConfig(
        callbacks=[TBXLoggerCallback()]
    ),
    preprocessor=preprocessor,
)
result = trainer.fit()

The dataset_preprocessing function contains mostly vectorized pandas functions that perform actions such as normalizing the data column with a precomputed vector of means and standard deviations. The function also does have to read a few large dictionaries from the object store (these are used to add columns to the dataframe, I will likely push this step into my dataset generation in the future).

DummyTrainer Logs
When ingest + training starts I see the following:

(TunerInternal pid=2535) 2022-07-24 15:02:22,495	WARNING trial_runner.py:328 -- The maximum number of pending trials has been automatically set to the number of available cluster CPUs, which is high (211 CPUs/pending trials). If you're running an experiment with a large number of trials, this could lead to scheduling overhead. In this case, consider setting the `TUNE_MAX_PENDING_TRIALS_PG` environment variable to the desired maximum number of concurrent trials.
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Starting dataset preprocessing
(TunerInternal pid=2535) == Status ==
(TunerInternal pid=2535) Current time: 2022-07-24 15:02:25 (running for 00:00:03.20)
(TunerInternal pid=2535) Memory usage on this node: 16.5/747.8 GiB
(TunerInternal pid=2535) Using FIFO scheduling algorithm.
(TunerInternal pid=2535) Resources requested: 1.0/192 CPUs, 32.0/32 GPUs, 0.0/1820.0 GiB heap, 0.0/1192.09 GiB objects (0.0/4.0 accelerator_type:V100, 0.0/4.0 gpu-node)
(TunerInternal pid=2535) Result logdir: /home/ray/ray_results/DummyTrainer_2022-07-24_15-02-22
(TunerInternal pid=2535) Number of trials: 1/1 (1 RUNNING)
(TunerInternal pid=2535) +--------------------------+----------+---------------------+
(TunerInternal pid=2535) | Trial name               | status   | loc                 |
(TunerInternal pid=2535) |--------------------------+----------+---------------------|
(TunerInternal pid=2535) | DummyTrainer_4b53b_00000 | RUNNING  | 192.168.36.111:7820 |
(TunerInternal pid=2535) +--------------------------+----------+---------------------+
(TunerInternal pid=2535) 

I don’t think the warning matters as I’m only running 1 trial, and if I understand correctly the Memory usage on this node: 16.5/747.8 GiB is likely referring to the head node. My first real concern is with resources requested:

Resources requested: 1.0/192 CPUs, 32.0/32 GPUs, 0.0/1820.0 GiB heap, 0.0/1192.09 GiB objects (0.0/4.0 accelerator_type:V100, 0.0/4.0 gpu-node)

I’m not sure if it is normal / bad that I’m only seeing 1 CPU and 0 memory requested?

After ~4-5 mins the dataset finishes preprocessing and I see a variety of logs returned from different nodes. I’ve tried to distill the highlights into the following:

(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Preprocessed datasets in 255.53620773599687 seconds
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Preprocessor BatchMapper(fn=<lambda>)
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Preprocessor transform stats:
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) 
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Stage 1 read->map_batches: 20/20 blocks executed in 122.81s
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote wall time: 116.7s min, 122.76s max, 119.37s mean, 2387.49s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote cpu time: 115.19s min, 120.64s max, 117.66s mean, 2353.18s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output num rows: 98 min, 239 max, 148 mean, 2975 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output size bytes: 12393270 min, 30224069 max, 18811156 mean, 376223129 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) 
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) Stage 2 randomize_block_order: 20/20 blocks executed in 0s
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote wall time: 116.7s min, 122.76s max, 119.37s mean, 2387.49s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Remote cpu time: 115.19s min, 120.64s max, 117.66s mean, 2353.18s total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output num rows: 98 min, 239 max, 148 mean, 2975 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Output size bytes: 12393270 min, 30224069 max, 18811156 mean, 376223129 total
(DummyTrainer pid=7820, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
....
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 1 read->map_batches: 20/20 blocks executed in 132.7s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 123.21s min, 130.3s max, 125.29s mean, 2505.77s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 118.78s min, 127.4s max, 121.3s mean, 2426.01s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 533 min, 880 max, 670 mean, 13406 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) 
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 2 randomize_block_order: 20/20 blocks executed in 0s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 123.21s min, 130.3s max, 125.29s mean, 2505.77s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 118.78s min, 127.4s max, 121.3s mean, 2426.01s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 533 min, 880 max, 670 mean, 13406 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) 
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 3 split: 1/1 blocks executed in 2.96s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 29.32ms min, 29.32ms max, 29.32ms mean, 29.32ms total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 29.32ms min, 29.32ms max, 29.32ms mean, 29.32ms total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 1027008000.0 min, 1027008000.0 max, 1027008000 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 418 min, 418 max, 418 mean, 418 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 52863546 min, 52863546 max, 52863546 mean, 52863546 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used
....
....
(BaseWorkerMixin pid=7968, ip=xxx.xxx.xx.111) Starting train loop on worker [0-31]  # repeated for all 32 workers
....
....
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Time to read all data 0.03665820999958669 seconds
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) P50/P95/Max batch delay (s) 0.03427452100004302 0.03427452100004302 0.03427452100004302
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Num epochs read 1
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Num batches read 1
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=6301, ip=xxx.xxx.xx.111) Mean throughput 1375.27 MiB/s
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Starting train loop on worker 31
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Time to read all data 0.03883258899804787 seconds
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) P50/P95/Max batch delay (s) 0.035741562001931015 0.035741562001931015 0.035741562001931015
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Num epochs read 1
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Num batches read 1
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=6308, ip=xxx.xxx.xx.111) Mean throughput 1298.26 MiB/s
...
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Time to read all data 0.04808187500020722 seconds
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) P50/P95/Max batch delay (s) 0.04587110400098027 0.04587110400098027 0.04587110400098027
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Num epochs read 1
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Num batches read 1
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=8296, ip=xxx.xxx.xx.222) Mean throughput 1048.51 MiB/s
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Time to read all data 0.1946309669983748 seconds
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) P50/P95/Max batch delay (s) 0.1920619730008184 0.1920619730008184 0.1920619730008184
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Num epochs read 1
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Num batches read 1
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=8298, ip=xxx.xxx.xx.222) Mean throughput 259.01 MiB/s
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Time to read all data 0.18603510400134837 seconds
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) P50/P95/Max batch delay (s) 0.1831144299976586 0.1831144299976586 0.1831144299976586
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Num epochs read 1
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Num batches read 1
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Num bytes read 50.41 MiB
(BaseWorkerMixin pid=8294, ip=xxx.xxx.xx.222) Mean throughput 270.98 MiB/s
....
....
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Dataset iterator time breakdown:
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In ray.wait(): 264.24us
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In ray.get(): 245.47ms
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In next_batch(): 2.83ms
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In format_batch(): 38.39us
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * In user code: 1.78ms
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Total time: 271.48ms
....
....
(TunerInternal pid=2535) Result for DummyTrainer_4b53b_00000:
(TunerInternal pid=2535)   _time_this_iter_s: 0.6157190799713135
(TunerInternal pid=2535)   _timestamp: 1658700408
(TunerInternal pid=2535)   _training_iteration: 1
(TunerInternal pid=2535)   batch_delay: 0.26968138799929875
(TunerInternal pid=2535)   batches_read: 1
(TunerInternal pid=2535)   bytes_read: 52863546
(TunerInternal pid=2535)   date: 2022-07-24_15-06-48
(TunerInternal pid=2535)   done: false
(TunerInternal pid=2535)   epochs_read: 1
(TunerInternal pid=2535)   experiment_id: e15cc259f2df4c56a5cca416b26562ac
(TunerInternal pid=2535)   hostname: my-release-ray-gpu-worker-type-bwrcr
(TunerInternal pid=2535)   iterations_since_restore: 1
(TunerInternal pid=2535)   node_ip: 192.168.36.111
(TunerInternal pid=2535)   pid: 7820
(TunerInternal pid=2535)   time_since_restore: 263.05760073661804
(TunerInternal pid=2535)   time_this_iter_s: 263.05760073661804
(TunerInternal pid=2535)   time_total_s: 263.05760073661804
(TunerInternal pid=2535)   timestamp: 1658700408
(TunerInternal pid=2535)   timesteps_since_restore: 0
(TunerInternal pid=2535)   training_iteration: 1
(TunerInternal pid=2535)   trial_id: 4b53b_00000
(TunerInternal pid=2535)   warmup_time: 0.010242223739624023
....
....
(TunerInternal pid=2535) 2022-07-24 15:06:54,171	WARNING util.py:216 -- The `process_trial_save` operation took 3.805 s, which may be a performance bottleneck.
(TunerInternal pid=2535) 2022-07-24 15:06:54,171	WARNING trial_runner.py:948 -- Consider turning off forced head-worker trial checkpoint syncs by setting sync_on_checkpoint=False. Note that this may result in faulty trial restoration if a failure occurs while the checkpoint is being synced from the worker to the head node.
(TunerInternal pid=2535) == Status ==
(TunerInternal pid=2535) Current time: 2022-07-24 15:06:55 (running for 00:04:33.02)
(TunerInternal pid=2535) Memory usage on this node: 16.7/747.8 GiB
(TunerInternal pid=2535) Using FIFO scheduling algorithm.
(TunerInternal pid=2535) Resources requested: 0/192 CPUs, 0/32 GPUs, 0.0/1820.0 GiB heap, 0.0/1192.09 GiB objects (0.0/4.0 accelerator_type:V100, 0.0/4.0 gpu-node)
(TunerInternal pid=2535) Result logdir: /home/ray/ray_results/DummyTrainer_2022-07-24_15-02-22
(TunerInternal pid=2535) Number of trials: 1/1 (1 TERMINATED)
(TunerInternal pid=2535) +--------------------------+------------+---------------------+--------+------------------+--------------+----------------+---------------+
(TunerInternal pid=2535) | Trial name               | status     | loc                 |   iter |   total time (s) |   bytes_read |   batches_read |   epochs_read |
(TunerInternal pid=2535) |--------------------------+------------+---------------------+--------+------------------+--------------+----------------+---------------|
(TunerInternal pid=2535) | DummyTrainer_4b53b_00000 | TERMINATED | 192.168.36.111:7820 |      1 |          263.058 |     52863546 |              1 |             1 |
(TunerInternal pid=2535) +--------------------------+------------+---------------------+--------+------------------+--------------+----------------+---------------+
(TunerInternal pid=2535) 
(TunerInternal pid=2535) 
2022-07-24 15:06:55,682	ERROR checkpoint_manager.py:137 -- The requested checkpoint is not available on this node, most likely because you are using Ray client or disabled checkpoint synchronization. To avoid this, enable checkpoint synchronization to cloud storage by specifying a `SyncConfig`. The checkpoint may be available on a different node - please check this location on worker nodes: /home/ray/ray_results/DummyTrainer_2022-07-24_15-02-22/DummyTrainer_4b53b_00000_0_2022-07-24_15-02-22/checkpoint_-00001
INFO:root:all done
(TunerInternal pid=2535) 2022-07-24 15:06:55,667	INFO tune.py:745 -- Total run time: 273.19 seconds (273.02 seconds for the tuning loop).

It seems that the dataset preprocessing is a major bottleneck and if I’m reading these logs correctly it appears that a single node (which I believe to be the head node) was preprocessing all of the blocks:

(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) Stage 1 read->map_batches: 20/20 blocks executed in 132.7s
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote wall time: 123.21s min, 130.3s max, 125.29s mean, 2505.77s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Remote cpu time: 118.78s min, 127.4s max, 121.3s mean, 2426.01s total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Peak heap memory usage (MiB): 11217568000.0 min, 11880920000.0 max, 11479703200 mean
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output num rows: 533 min, 880 max, 670 mean, 13406 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
(BaseWorkerMixin pid=8874, ip=xxx.xxx.xx.111) * Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used

Also, based on the resources requested I’m wondering if it was using a single CPU to do so?

My main question is whether anyone can tell if I’m failing to specify that my 32 GPU workers should be fetching and preprocessing blocks into the object store on their respective nodes using their own CPU allocations? If I’m failing to do this, how can I modify my Trainer code to make this happen?

As a proposed solution, would modifying my ray.data.read_parquet function calls to be:

train_dataset = ray.data.read_parquet(
    TRAIN_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
    ray_remote_args={"resources": {"gpu-node": 1/32}},
)

Be sufficient to ensure that 1 CPU per GPU worker is reading a block at a time?

Hey @mrusso, thanks for the detailed post! It is definitely odd and unexpected that the tasks are only being scheduled on a single node.

To isolate the problem further, can you try to just read the training data, to see if it’s running into the same problem?

train_dataset = ray.data.read_parquet(
    TRAIN_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
).fully_executed()

print(train_dataset.stats())

cc @Chen_Shen

Hi @matthewdeng, thanks for the quick reply! I’ll do this now and report back in a few minutes.

I am getting the following:

(base) ray@test-python:~$ python test_ray_train_air.py 
Passing the following kwargs to ray.init() on the server: logging_format
2022-07-24 16:46:30,745	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-24 16:46:31,857	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 4.0x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-24 16:46:32,731	WARNING read_api.py:291 -- ⚠️  The number of blocks in this dataset (10) limits its parallelism to 10 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
2022-07-24 16:46:33,503	WARNING read_api.py:291 -- ⚠️  The number of blocks in this dataset (10) limits its parallelism to 10 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:07<00:00,  2.68it/s]
Read progress: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:01<00:00, 12.55it/s]
Stage 0 read: 1/10 blocks executed in 3.95s, 9/10 blocks split from parent
* Remote wall time: 3.58s min, 3.58s max, 3.58s mean, 3.58s total
* Remote cpu time: 2.19s min, 2.19s max, 2.19s mean, 2.19s total
* Peak heap memory usage (MiB): 1364496000.0 min, 1364496000.0 max, 1364496000 mean
* Output num rows: 572 min, 745 max, 651 mean, 6518 total
* Output size bytes: 357298676 min, 469676660 max, 411135838 mean, 4111358383 total
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used

Stage 0 read: 1/10 blocks executed in 5.34s, 9/10 blocks split from parent
* Remote wall time: 4.12s min, 4.12s max, 4.12s mean, 4.12s total
* Remote cpu time: 2.38s min, 2.38s max, 2.38s mean, 2.38s total
* Peak heap memory usage (MiB): 1460064000.0 min, 1460064000.0 max, 1460064000 mean
* Output num rows: 533 min, 880 max, 688 mean, 6888 total
* Output size bytes: 330515420 min, 559249458 max, 436326284 mean, 4363262843 total
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used

Stage 1 union: 

----------------------
Stage 0 read: 1/10 blocks executed in 2.78s, 9/10 blocks split from parent
* Remote wall time: 1.24s min, 1.24s max, 1.24s mean, 1.24s total
* Remote cpu time: 793.93ms min, 793.93ms max, 793.93ms mean, 793.93ms total
* Peak heap memory usage (MiB): 1398672000.0 min, 1398672000.0 max, 1398672000 mean
* Output num rows: 104 min, 249 max, 164 mean, 1646 total
* Output size bytes: 56542791 min, 147614706 max, 95880483 mean, 958804836 total
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used

Stage 0 read: 1/10 blocks executed in 2.08s, 9/10 blocks split from parent
* Remote wall time: 892.6ms min, 892.6ms max, 892.6ms mean, 892.6ms total
* Remote cpu time: 490.46ms min, 490.46ms max, 490.46ms mean, 490.46ms total
* Peak heap memory usage (MiB): 827580000.0 min, 827580000.0 max, 827580000 mean
* Output num rows: 98 min, 164 max, 134 mean, 1340 total
* Output size bytes: 52984614 min, 94997536 max, 76140551 mean, 761405518 total
* Tasks per node: 1 min, 1 max, 1 mean; 1 nodes used

Stage 1 union: 

One minor detail that I left out before is that both my train and validation datasets are split across two prefixes, so I ultimately ran the following:

# read datasets
train_fst_dataset = ray.data.read_parquet(
    TRAIN_FST_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
train_snd_dataset = ray.data.read_parquet(
    TRAIN_SND_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
val_fst_dataset = ray.data.read_parquet(
    VAL_FST_CORPUS_S3_PREFIX,
    parallelism=VAL_READ_PARALLELISM,
)
val_snd_dataset = ray.data.read_parquet(
    VAL_SND_CORPUS_S3_PREFIX,
    parallelism=VAL_READ_PARALLELISM,
)

# merge datasets
train_full_dataset = train_fst_dataset.union(train_snd_dataset).fully_executed()
val_full_dataset = val_fst_dataset.union(val_snd_dataset).fully_executed()

print(train_full_dataset.stats())
print("----------------------")
print(val_full_dataset.stats())

Hmm looks like the stats output is missing some information and could be improved for this case.

Can you try with just a single prefix for now?

train_fst_dataset = ray.data.read_parquet(
    TRAIN_FST_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
).fully_executed()

print(train_fst_dataset.stats())

Also, what is the value of TRAIN_READ_PARALLELISM?

Sure thing! Here’s the output in that case:

(base) ray@test-python:~$ python test_ray_train_air.py 
Passing the following kwargs to ray.init() on the server: logging_format
2022-07-24 16:59:19,602	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
Read progress: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 10/10 [00:05<00:00,  3.68it/s]2022-07-24 
Stage 0 read: 10/10 blocks executed in 4.83s
* Remote wall time: 2.93s min, 4.18s max, 3.56s mean, 35.65s total
* Remote cpu time: 1.76s min, 2.21s max, 1.97s mean, 19.72s total
* Peak heap memory usage (MiB): 1270620000.0 min, 1406144000.0 max, 1358628000 mean
* Output num rows: 572 min, 745 max, 651 mean, 6518 total
* Output size bytes: 376551685 min, 490438818 max, 429084594 mean, 4290845940 total
* Tasks per node: 2 min, 3 max, 2 mean; 4 nodes used

Oh also TRAIN_READ_PARALLELISM is currently set to 4000.

Nice, that one is indeed spreading the initial workload. As a next step, can you try this?

train_fst_dataset = ray.data.read_parquet(
    TRAIN_FST_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
train_snd_dataset = ray.data.read_parquet(
    TRAIN_SND_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
train_full_dataset = train_fst_dataset.union(train_snd_dataset)
train_full_dataset_mapped = train_full_dataset.map_batches(lambda x: x)
print(train_full_dataset_mapped.stats())

Interesting, it looks like it reverted to only using one node:

(base) ray@test-python:~$ python test_ray_train_air.py 
Passing the following kwargs to ray.init() on the server: logging_format
2022-07-24 17:07:46,803	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-24 17:07:48,323	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 4.0x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
Read->Map_Batches: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:08<00:00,  2.49it/s]
Stage 1 read->map_batches: 20/20 blocks executed in 8.04s
* Remote wall time: 3.61s min, 6.33s max, 4.64s mean, 92.79s total
* Remote cpu time: 2.19s min, 3.93s max, 2.78s mean, 55.64s total
* Peak heap memory usage (MiB): 1449528000.0 min, 2229548000.0 max, 1588135400 mean
* Output num rows: 533 min, 880 max, 670 mean, 13406 total
* Output size bytes: 345506057 min, 570441520 max, 434507898 mean, 8690157978 total
* Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used

Ah looks like there might be a bug(?) .

Can you try the same thing but with the following change?

train_full_dataset_mapped = train_full_dataset.map_batches(lambda x: x, scheduling_strategy="SPREAD")

Also I need to go offline for now, so I apologize in advance for the delay in my next response!

1 Like

This lead to a failure that I think is a bug – although this bug might be specific to my using the TensorDType in my parquet:

(base) ray@test-python:~$ python test_ray_train_air.py 
Passing the following kwargs to ray.init() on the server: logging_format
2022-07-24 17:20:30,299	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-24 17:20:31,745	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 4.0x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
Read: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:07<00:00,  2.51it/s]
Map_Batches:  35%|██████████████████████████████████████████████████████▎                                                                                                    | 7/20 [00:02<00:04,  2.81it/s](_map_block_nosplit pid=9040, ip=192.168.35.202) 2022-07-24 17:20:41,953	INFO worker.py:754 -- Task failed with retryable exception: TaskID(eb920e1fde22f682ffffffffffffffffffffffff07000000).
(_map_block_nosplit pid=9040, ip=192.168.35.202) Traceback (most recent call last):
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "python/ray/_raylet.pyx", line 670, in ray._raylet.execute_task
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "python/ray/_raylet.pyx", line 674, in ray._raylet.execute_task
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/_internal/compute.py", line 449, in _map_block_nosplit
(_map_block_nosplit pid=9040, ip=192.168.35.202)     for new_block in block_fn(block, *fn_args, **fn_kwargs):
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/dataset.py", line 460, in transform
(_map_block_nosplit pid=9040, ip=192.168.35.202)     view = BlockAccessor.for_block(view).to_batch_format(batch_format)
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/block.py", line 319, in to_batch_format
(_map_block_nosplit pid=9040, ip=192.168.35.202)     return self.to_native()
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/_internal/table_block.py", line 134, in to_native
(_map_block_nosplit pid=9040, ip=192.168.35.202)     native = self.to_pandas()
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray/data/_internal/arrow_block.py", line 189, in to_pandas
(_map_block_nosplit pid=9040, ip=192.168.35.202)     return self._table.to_pandas()
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "pyarrow/array.pxi", line 766, in pyarrow.lib._PandasConvertible.to_pandas
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "pyarrow/table.pxi", line 1815, in pyarrow.lib.Table._to_pandas
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 781, in table_to_blockmanager
(_map_block_nosplit pid=9040, ip=192.168.35.202)     ext_columns_dtypes = _get_extension_dtypes(
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/pyarrow/pandas_compat.py", line 830, in _get_extension_dtypes
(_map_block_nosplit pid=9040, ip=192.168.35.202)     pandas_dtype = _pandas_api.pandas_dtype(dtype)
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "pyarrow/pandas-shim.pxi", line 146, in pyarrow.lib._PandasAPIShim.pandas_dtype
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "pyarrow/pandas-shim.pxi", line 149, in pyarrow.lib._PandasAPIShim.pandas_dtype
(_map_block_nosplit pid=9040, ip=192.168.35.202)   File "/home/ray/anaconda3/lib/python3.8/site-packages/pandas/core/dtypes/common.py", line 1777, in pandas_dtype
(_map_block_nosplit pid=9040, ip=192.168.35.202)     npdtype = np.dtype(dtype)
(_map_block_nosplit pid=9040, ip=192.168.35.202) TypeError: data type 'TensorDtype' not understood

If I instead use the following:

def mini_func(df):
    # cast data back to np array
    df['data'] = TensorArray(df['data'].to_numpy())
    df['data'] = df['data'].astype(TensorDtype())

    return df

# merge datasets
train_full_dataset = train_fst_dataset.union(train_snd_dataset)
train_full_dataset_mapped = train_full_dataset.map_batches(lambda x: mini_func(x), scheduling_strategy="SPREAD")
print(train_full_dataset_mapped.stats())

Then the map_batches succeeds, however it seems to still perform all of the reading on a single node:

(base) ray@test-python:~$ python test_ray_train_air.py 
Passing the following kwargs to ray.init() on the server: logging_format
2022-07-24 17:29:53,179	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-24 17:29:53,718	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 4.0x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
Read: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:07<00:00,  2.63it/s]
Map_Batches: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:04<00:00,  4.18it/s]
Stage 1 read: 20/20 blocks executed in 7.61s
* Remote wall time: 2.76s min, 5.6s max, 3.5s mean, 69.96s total
* Remote cpu time: 2.0s min, 3.64s max, 2.31s mean, 46.12s total
* Peak heap memory usage (MiB): 2962196000.0 min, 2962204000.0 max, 2962202400 mean
* Output num rows: 533 min, 880 max, 670 mean, 13406 total
* Output size bytes: 350877707 min, 579310278 max, 441263275 mean, 8825265507 total
* Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used

Stage 2 map_batches: 20/20 blocks executed in 4.79s
* Remote wall time: 738.28ms min, 1.55s max, 998.65ms mean, 19.97s total
* Remote cpu time: 793.59ms min, 1.63s max, 1.07s mean, 21.49s total
* Peak heap memory usage (MiB): 1769176000.0 min, 3712928000.0 max, 2369014600 mean
* Output num rows: 533 min, 880 max, 670 mean, 13406 total
* Output size bytes: 345506057 min, 570441520 max, 434507898 mean, 8690157978 total
* Tasks per node: 5 min, 5 max, 5 mean; 4 nodes used

(As a side note, I have to perform the type casting trick for my dataset_preprocessing function):

def dataset_preprocessing(df):
    # do other things
    # ...

    # cast data to numpy array
    arr = df['data'].to_numpy()

    # normalize data
    normalized_arr = (time_slice_arr - means_arr)/stds_arr

    # put data back into column
    df['data'] = TensorArray(normalized_arr)
    df['data'] = df['data'].astype(TensorDtype())

    return df

I’m not sure if this is doubling my memory requirements by creating a copy of the data in arr, but I’m also not sure that there’s a better way to do this :thinking:

In this latest run, how long does this take to run? Is this still taking over 100 seconds? (Trying to understand where the bottleneck is). Anyways, it does unexpected that the reads are scheduled on one node, and I can look further into this.

Regarding TensorArray/TensorDtype, we’re actively working on some fixes around this, which should reduce the amount of manual casting needed to be done. cc @clarkzinzow who can add more insights about the memory usage.

1 Like

hi @mrusso thanks for the detailed info ! A few observations:

  • Given you’re only using 40 files in the experiment, our default parallelism from logs showed we spawned 20 tasks each responsible for 2 files in your setup.
    • They might ALL run on your headnode in your setup if they’re not scheduled with SPREAD policy.
    • Can you give us a few screenshots of ray dashboard regarding the sent/received throughput while you’re running your e2e experiments ?
  • For the 1.0/192 CPUs, we only reserved 1 CPU for trainer workers, but it can be configured by resources_per_worker like the following snippet. Ideally you want each trainer worker fully occupy 1 GPU but also have enough CPU resources to fetch data + move them from DRAM to GRAM.
    •   trainer = TorchTrainer(
            train_loop_per_worker=train_loop_per_worker,
            train_loop_config={"batch_size": 64, "num_epochs": num_epochs},
            datasets={"train": dataset},
            preprocessor=preprocessor,
            scaling_config=ScalingConfig(
                num_workers=num_workers, 
                use_gpu=True,
                resources_per_worker={"CPU": 12, "GPU": 1}
            ),
        )
      
  • One possibility of an ideal and more affordable setup for your workload is spawn X commodity CPU hosts feeding into Y GPU hosts, where X >> Y. Each CPU host gets it’s own share of data IO + preprocessing + provide NIC in the cluster to send processed tensor to GPU hosts, where CPUs on GPU hosts are primarily busy with tensor IO and data movement.

One example is an image model preprocessing with

  • 4 of g3.16xlarge
  • 20 of m5.8xlarge

that achieved ~1500MB/s total throughput to all GPU hosts. If you can provide more details regarding your current training and network throughput we can give more concrete suggestions.

1 Like

So the previous run I shared took ~10 seconds which was much faster than before. I just retried the experiment with my actual preprocessing function to see if this would solve (most of) my bottleneck – or if there’s something wrong with my preprocessing function.

Here’s what I ran in a nutshell:

# place a dictionary mapping event timestamps --> column values in the object store
ids_to_my_column = pd.Series(df.my_column.values, index=df.event_ts).to_dict()
ids_to_my_column_ref = ray.put(ids_to_my_column)

# repeat ^^^this for a few more columns
# ...

# here's an abridged version of my preprocessing function
def dataset_preprocessing(df, ids_to_my_column_ref, ..., col_means, col_stds):
    # retrieve dict from object store and add column
    ids_to_my_column = ray.get(ids_to_my_column_ref)
    df['my_column'] = df.event_ts.apply(lambda event_ts: ids_to_my_column[event_ts])
    del ids_to_my_column

   # repeat this^ for a couple more ids_to_column dictionaries
   # ...

    # do some operations to filter unwanted rows
    # ...

    # cast data to numpy array
    arr = df['data'].to_numpy()

    # normalize data
    normalized_arr = (arr - col_means)/col_stds

    # put data back into column
    df['data'] = TensorArray(normalized_arr)
    df['data'] = df['data'].astype(TensorDtype())

    return df

# create preprocessing function
dataset_preprocessing = partial(
    full_dataset_preprocessing,
    ids_to_my_column_ref=ids_to_my_column_ref,
    ...
    col_means=COLS_TO_MEANS,
    col_stds=COLS_TO_STDS,
)

# read datasets
train_fst_dataset = ray.data.read_parquet(
    TRAIN_FST_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
train_snd_dataset = ray.data.read_parquet(
    TRAIN_SND_CORPUS_S3_PREFIX,
    parallelism=TRAIN_READ_PARALLELISM,
)
val_fst_dataset = ray.data.read_parquet(
    VAL_FST_CORPUS_S3_PREFIX,
    parallelism=VAL_READ_PARALLELISM,
)
val_snd_dataset = ray.data.read_parquet(
    VAL_SND_CORPUS_S3_PREFIX,
    parallelism=VAL_READ_PARALLELISM,
)

# merge datasets
train_full_dataset = train_fst_dataset.union(train_snd_dataset)
val_full_dataset = val_fst_dataset.union(val_snd_dataset)

# call map batches
train_full_dataset_mapped = train_full_dataset.map_batches(lambda x: dataset_preprocessing(x), scheduling_strategy="SPREAD")
val_full_dataset_mapped = val_full_dataset.map_batches(lambda x: dataset_preprocessing(x), scheduling_strategy="SPREAD")

print(train_full_dataset_mapped.stats())
print("------------------------------------")
print(val_full_dataset_mapped.stats())

The output that was ultimately printed was the following:

2022-07-25 16:44:04,238	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-25 16:44:05,801	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 4.0x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
2022-07-25 16:44:07,283	WARNING read_api.py:291 -- ⚠️  The number of blocks in this dataset (10) limits its parallelism to 10 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
2022-07-25 16:44:08,489	WARNING read_api.py:291 -- ⚠️  The number of blocks in this dataset (10) limits its parallelism to 10 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.
Read: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:07<00:00,  2.71it/s]
Map_Batches: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [01:56<00:00,  5.80s/it]
Read: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [00:01<00:00, 11.09it/s]
Map_Batches: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20/20 [01:52<00:00,  5.64s/it]
Stage 1 read: 20/20 blocks executed in 7.42s
* Remote wall time: 3.05s min, 5.93s max, 4.29s mean, 85.87s total
* Remote cpu time: 1.72s min, 3.31s max, 2.25s mean, 45.06s total
* Peak heap memory usage (MiB): 1233064000.0 min, 1808196000.0 max, 1369677000 mean
* Output num rows: 533 min, 880 max, 670 mean, 13406 total
* Output size bytes: 350877707 min, 579310278 max, 441263275 mean, 8825265507 total
* Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used

Stage 2 map_batches: 20/20 blocks executed in 116.07s
* Remote wall time: 107.2s min, 114.01s max, 111.19s mean, 2223.79s total
* Remote cpu time: 103.02s min, 112.02s max, 105.18s mean, 2103.6s total
* Peak heap memory usage (MiB): 11199164000.0 min, 12091284000.0 max, 11600106200 mean
* Output num rows: 533 min, 880 max, 670 mean, 13406 total
* Output size bytes: 67407505 min, 111292096 max, 84771411 mean, 1695428238 total
* Tasks per node: 5 min, 5 max, 5 mean; 4 nodes used

------------------------------------
Stage 1 read: 20/20 blocks executed in 1.8s
* Remote wall time: 598.21ms min, 1.75s max, 1.06s mean, 21.2s total
* Remote cpu time: 232.9ms min, 863.11ms max, 458.18ms mean, 9.16s total
* Peak heap memory usage (MiB): 1360832000.0 min, 12091284000.0 max, 4135776200 mean
* Output num rows: 98 min, 249 max, 149 mean, 2986 total
* Output size bytes: 64514108 min, 163918484 max, 98285263 mean, 1965705263 total
* Tasks per node: 20 min, 20 max, 20 mean; 1 nodes used

Stage 2 map_batches: 20/20 blocks executed in 112.88s
* Remote wall time: 103.54s min, 112.28s max, 105.51s mean, 2110.19s total
* Remote cpu time: 102.77s min, 111.65s max, 104.54s mean, 2090.71s total
* Peak heap memory usage (MiB): 10952952000.0 min, 12074588000.0 max, 11460047200 mean
* Output num rows: 98 min, 239 max, 148 mean, 2975 total
* Output size bytes: 12393270 min, 30224069 max, 18811156 mean, 376223129 total
* Tasks per node: 5 min, 5 max, 5 mean; 4 nodes used

It looks like the read operations are taking place on a single node, but I think this is causing the map_batches stage to take almost 2 minutes for each dataset because the workers have to read the blocks from the single node that performed the reads before applying the map_batches preprocessing function. Empirically I saw that for each map_batches stage (which took 1:56 and 1:52 respectively) the progress bar sat at 0/20 for ~1:45 before finishing all 20/20 of the tasks in ~10 seconds. I believe that the first 1:45 was simply spent moving blocks in the object store between the head node and workers.

Per @Jiao_Dong 's suggestion I screen-recorded the dashboard during the experiment and captured some additional information. Right around the time the first read operation started I captured the following screenshot which appears to show the head node with the dictionary objects in its plasma store and possibly also the first blocks that it read:

After ~7 seconds the map_batches stage starts. The following two screenshots show the map_batches stage right near the start and at some point during the middle of execution. It appears that the GPU workers are largely receiving blocks from other machines (mainly the head node), although the first screenshot also seems to suggest that one of the GPU workers may have also been broadcasting some piece of data (not sure what that might be though):

near the beginning of map_batches:

screenshot in the middle of map_batches:

Based on all of this, it seems to me like spreading the read operation across the 4 GPU workers would likely solve my bottleneck problem.

Hi @Jiao_Dong,

Thanks so much for your response! I played around a bit with the resources_per_worker and can definitely try giving each worker more CPUs to help with feeding data into their respective GPUs.

I’m very interested in your suggestion about using commodity CPU hosts to handle the bulk of the preprocessing workload. I actually previously tried to do exactly this in Ray 1.13 with Ray Datasets + Ray Train but I ran into a number of challenges.

Now that I’m using Ray AIR I’m curious how I would specify that I want my preprocessor to operate on one set of resources while my trainers operate on another. To make this a bit more concrete, I previously had the following two worker types:

### 1 worker runs on 1 r5dn.12xlarge node w/24 physical CPUs and 384 GiB memory
# The key for each podType is a user-defined string.
rayTrainWorkerType:
    # minWorkers is the minimum number of Ray workers of this pod type to keep running.
    minWorkers: 0
    # maxWorkers is the maximum number of Ray workers of this pod type to which Ray will scale.
    maxWorkers: 25
    # memory is the memory used by this Pod type.
    # (Used for both requests and limits.)
    memory: 375Gi
    # CPU is the number of CPUs used by this pod type.
    # (Used for both requests and limits. Must be an integer, as Ray does not support fractional CPUs.)
    CPU: 24
    # GPU is the number of NVIDIA GPUs used by this pod type.
    # (Optional, requires GPU nodes with appropriate setup. See https://docs.ray.io/en/master/cluster/kubernetes-gpu.html)
    GPU: 0
    # rayResources is an optional string-int mapping signaling additional resources to Ray.
    # "CPU", "GPU", and "memory" are filled automatically based on the above settings, but can be overriden;
    # For example, rayResources: {"CPU": 0} can be used in the head podType to prevent Ray from scheduling tasks on the head.
    # See https://docs.ray.io/en/master/advanced.html#dynamic-remote-parameters for an example of usage of custom resources in a Ray task.
    rayResources: {"train-worker-node": 1}
    # Optionally, set a node selector for this Pod type. See https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector
    nodeSelector: {"node-class": "compute-train"}

### 1 worker runs on 1 p3.16xlarge node w/32 physical cores and 488 GB of memory
# The key for each podType is a user-defined string.
rayGPUWorkerType:
    # minWorkers is the minimum number of Ray workers of this pod type to keep running.
    minWorkers: 4
    # maxWorkers is the maximum number of Ray workers of this pod type to which Ray will scale.
    maxWorkers: 4
    # memory is the memory used by this Pod type.
    # (Used for both requests and limits.)
    memory: 475Gi
    # CPU is the number of CPUs used by this pod type.
    # (Used for both requests and limits. Must be an integer, as Ray does not support fractional CPUs.)
    CPU: 32
    # GPU is the number of NVIDIA GPUs used by this pod type.
    # (Optional, requires GPU nodes with appropriate setup. See https://docs.ray.io/en/master/cluster/kubernetes-gpu.html)
    GPU: 8
    # rayResources is an optional string-int mapping signaling additional resources to Ray.
    # "CPU", "GPU", and "memory" are filled automatically based on the above settings, but can be overriden;
    # For example, rayResources: {"CPU": 0} can be used in the head podType to prevent Ray from scheduling tasks on the head.
    # See https://docs.ray.io/en/master/advanced.html#dynamic-remote-parameters for an example of usage of custom resources in a Ray task.
    rayResources: {"gpu-node": 1}
    # Optionally, set a node selector for this Pod type. See https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector
    nodeSelector: {"node-class": "compute-gpu"}

Previously in Ray 1.13 my CPU hosts / preprocessor workers could be requested using resources={"train-worker-node": 1} and my GPU workers could be requested using resources={"gpu-node": 1}, for example.

In AIR I can specify that I want my trainers to run on my GPU workers by setting use_gpu=True in the ScalingConfig, but I don’t currently see how I can tell my preprocessor that I want it to execute on my CPU workers? E.g. it would be really nice if I could do something like:

preprocessor = BatchMapper(lambda df: dataset_preprocessing(df), resources={"train-worker-node": 25})

Or maybe:

trainer = TorchTrainer(
    ...
    dataset_config={
        "train": DatasetConfig(fit=True, split=True, resources={"train-worker-node": 20}),
        "val": DatasetConfig(fit=False, split=False, resources={"train-worker-node": 5}),
    },
    ...
)

To suggest that I want to have my preprocessing done on that nodegroup.

Let me double check on this, but I think it’s more likely that dataset_preprocessing itself is taking that long. The progress bar increments whenever a block is created - if each of the 20 map tasks is running concurrently and takes ~1:45 it would explain why there is an initial pause and then they all complete around the same time.

One way you could sanity check this is to run a smaller job on a single node, which would ensure that there is no object transfer occurring.

Regarding only seeing 1 CPU utilized and setting CPU in resources_per_worker, keep in mind that these are only logical resources used for bookkeeping, and they won’t directly impact the parallelism/throughput of your job (i.e. setting "CPU": 12 won’t reserve 12 cores). However, this can be used to reduce the number of preprocessing tasks (each of which requires 1 CPU) that can be launched on the node.

So I just dropped my cluster to using 1 GPU worker and re-ran my dataset_preprocessing function with some log lines to help profile where time is being spent in the function. It turns out almost all of the time is being eaten up in calling ray.get() to fetch my dictionaries as well as the time spent deleting them:

Function w/time.time() calls for reference:

def dataset_preprocessing(df, ids_to_split_ref, ..., col_means, col_stds):
   # add event_id column
    t0 = time.time()
    df['event_id'] = df['uuid'] + df['event_ts'].astype(str)

    # add split column
    t1 = time.time()
    ids_to_split = ray.get(ids_to_split_ref)
    df['split'] = df.event_id.apply(lambda event_id: ids_to_split[event_id])
    del ids_to_split

    # add automatic label column
    ids_to_auto_label = ray.get(ids_to_auto_label_ref)
    df['auto_label'] = df.event_id.apply(lambda event_id: ids_to_auto_label[event_id])
    del ids_to_auto_label

    # add manual label column
    ids_to_label = ray.get(ids_to_label_ref)
    df['label'] = df.event_id.apply(lambda event_id: ids_to_label[event_id])
    del ids_to_label

    # filter for negatives and manually labeled positives
    t2 = time.time()
    df = df[~df.auto_label.astype(bool) | (df.uuid.isin(pos_uuids) & df.label)]

    # filter out rows w/bad data
    df = df[~df.uuid.isin(bad_uuids)]

    # add source column
    t3 = time.time()
    ids_to_source = ray.get(ids_to_source_ref)
    t4 = time.time()
    df['source'] = df.event_id.apply(lambda event_id: ids_to_source[event_id])
    t5 = time.time()
    del ids_to_source

    # get raw tensor data as array
    t6 = time.time()
    arr = df['data'].to_numpy()
    t7 = time.time()

    ### map data to range of interest
    # get index of event timestamp
    detection_idx = int(arr.shape[1]//2)

    # create indices for beginning and end of range
    before_idx = detection_idx - secs_before * frequency
    after_idx = detection_idx + secs_after * frequency

    # slice array
    slice_arr = arr[:, before_idx:after_idx + 1, :14]
    t8 = time.time()

    ### normalize column values
    # construct means and stds arrays
    means, stds = [], []
    for feat_col in ordered_feat_cols:
        means.append(col_means[feat_col])
        stds.append(col_stds[feat_col])
    
    means_arr = np.array(means)
    stds_arr = np.array(stds)

    # normalize data
    normalized_arr = (slice_arr - means_arr)/stds_arr

    # put data back into column
    t9 = time.time()
    df['data'] = TensorArray(normalized_arr)
    df['data'] = df['data'].astype(TensorDtype())
    t10 = time.time()

    logging.info("######################")
    logging.info(f"add event_id     : {t1-t0:.3f}")
    logging.info(f"add 3 cols       : {t2-t1:.3f}")
    logging.info(f"filter rows      : {t3-t2:.3f}")
    logging.info(f"get 1 col        : {t4-t3:.3f}")
    logging.info(f"apply 1 col      : {t5-t4:.3f}")
    logging.info(f"del 1 col        : {t6-t5:.3f}")
    logging.info(f"data to numpy    : {t7-t6:.3f}")
    logging.info(f"data slice       : {t8-t7:.3f}")
    logging.info(f"normalize        : {t9-t8:.3f}")
    logging.info(f"data from numpy  : {t10-t9:.3f}")
    logging.info("######################")

    return df

I downsized the corpus so that there were only 4 batches, here’s the log output for the time spent in each batch:

(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:add event_id     : 0.001
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:add 3 cols       : 63.545
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:filter rows      : 0.288
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:get 1 col        : 17.461
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:apply 1 col      : 0.001
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:del 1 col        : 4.345
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:data to numpy    : 0.000
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:data slice       : 0.000
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:normalize        : 0.069
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:data from numpy  : 0.096
(_map_block_nosplit pid=1373, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:add event_id     : 0.001
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:add 3 cols       : 63.044
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:filter rows      : 0.344
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:get 1 col        : 17.797
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:apply 1 col      : 0.002
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:del 1 col        : 4.420
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:data to numpy    : 0.000
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:data slice       : 0.000
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:normalize        : 0.082
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:data from numpy  : 0.118
(_map_block_nosplit pid=1284, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:add event_id     : 0.001
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:add 3 cols       : 63.139
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:filter rows      : 0.289
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:get 1 col        : 18.260
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:apply 1 col      : 0.001
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:del 1 col        : 4.336
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:data to numpy    : 0.000
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:data slice       : 0.000
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:normalize        : 0.069
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:data from numpy  : 0.098
(_map_block_nosplit pid=1371, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:######################
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:add event_id     : 0.001
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:add 3 cols       : 62.491
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:filter rows      : 0.320
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:get 1 col        : 18.674
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:apply 1 col      : 0.002
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:del 1 col        : 4.352
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:data to numpy    : 0.000
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:data slice       : 0.000
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:normalize        : 0.083
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:data from numpy  : 0.113
(_map_block_nosplit pid=1372, ip=192.168.31.60) INFO:root:######################

Essentially the logs above are saying that it takes ~17-18 seconds to ray.get() a single one of these dictionaries from the object store, and another ~4.4 seconds to delete them from memory.

I created one of these objects in python memory and it was ~670 MB:

>>> ids_to_source = pd.Series(full_df.source.values, index=full_df.event_id).to_dict()
>>> sys.getsizeof(ids_to_source)
671088736

Calling ray memory after placing the object in the object store also showed the following:

$ kubectl exec -it my-release-ray-head-type-c4km6 -- ray memory
======== Object references status: 2022-07-26 16:06:14.539189 ========
Grouping by node address...        Sorting by object size...        Display allentries per group...


--- Summary for node address: xxx.xxx.xx.xxx ---
Mem Used by Objects  Local References  Pinned        Used by task   Captured in Objects  Actor Handles
95.0 B               1, (95.0 B)       0, (0.0 B)    0, (0.0 B)     0, (0.0 B)           0, (0.0 B)   

--- Object references for node address: 192.168.21.207 ---
IP Address       PID    Type    Call Site               Status          Size    Reference Type      Object Ref                                              
xxx.xxx.xx.xxx   5938   Driver  disabled                FINISHED        95.0 B  LOCAL_REFERENCE     00ffffffffffffffffffffffffffffffffffffff0400000001000000

To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1

--- Aggregate object store stats across all nodes ---
Plasma memory usage 0 MiB, 1 objects, 0.0% full, 0.0% needed
Objects consumed by Ray tasks: 116979 MiB.  # <--- from earlier work, can ignore

$ kubectl exec -it my-release-ray-head-type-c4km6 -- ray memory --stats-only
======== Object references status: 2022-07-26 16:10:32.835621 ========
--- Aggregate object store stats across all nodes ---
Plasma memory usage 1104 MiB, 2 objects, 0.25% full, 0.25% needed
Objects consumed by Ray tasks: 116979 MiB. # <--- from earlier work, can ignore

The good news is that I can easily get around this problem by adding my columns to my dataset in preprocessing (before the last mile preprocessing done in Ray Datasets). I’m not sure if it makes sense that it would it take ~17-18s to get read this large of an object from memory, but assuming that’s expected I think I’ve found my bottleneck :+1: .

Some good news and bad news, I removed my bottleneck by adding my columns in preprocessing and I also eliminated the need to call .union() by combining the files for the fst and snd datasets under the same prefix in S3.

After making these changes I ran my workload on 1 node to check that it fixed the issue and found that it did. I then scaled up to 4 nodes to to see if the reads would be spread across all 4 workers and that was also successful. I finally ran my entire dataset through the pipeline and got the following performance:

Metadata Fetch Progress:   0%|          | 0/100 [00:00<?, ?it/s]
Metadata Fetch Progress:   1%|          | 1/100 [00:04<06:41,  4.05s/it]
Metadata Fetch Progress:   6%|▌         | 6/100 [00:04<00:49,  1.91it/s]
Metadata Fetch Progress:   9%|▉         | 9/100 [00:04<00:31,  2.92it/s]
Metadata Fetch Progress:  11%|█         | 11/100 [00:04<00:23,  3.82it/s]
Metadata Fetch Progress:  13%|█▎        | 13/100 [00:04<00:17,  4.85it/s]
Metadata Fetch Progress:  16%|█▌        | 16/100 [00:04<00:11,  7.12it/s]
Metadata Fetch Progress:  23%|██▎       | 23/100 [00:05<00:05, 13.74it/s]
Metadata Fetch Progress:  27%|██▋       | 27/100 [00:05<00:04, 16.84it/s]
Metadata Fetch Progress:  31%|███       | 31/100 [00:05<00:03, 17.79it/s]
Metadata Fetch Progress:  34%|███▍      | 34/100 [00:05<00:03, 19.02it/s]
Metadata Fetch Progress:  37%|███▋      | 37/100 [00:05<00:03, 16.38it/s]
Metadata Fetch Progress:  40%|████      | 40/100 [00:06<00:04, 13.56it/s]
Metadata Fetch Progress:  42%|████▏     | 42/100 [00:06<00:04, 13.76it/s]
Metadata Fetch Progress:  46%|████▌     | 46/100 [00:06<00:03, 17.42it/s]
Metadata Fetch Progress:  51%|█████     | 51/100 [00:06<00:02, 22.51it/s]
Metadata Fetch Progress:  54%|█████▍    | 54/100 [00:06<00:01, 23.87it/s]
Metadata Fetch Progress:  57%|█████▋    | 57/100 [00:06<00:01, 22.96it/s]
Metadata Fetch Progress:  61%|██████    | 61/100 [00:06<00:01, 25.29it/s]
Metadata Fetch Progress:  65%|██████▌   | 65/100 [00:06<00:01, 25.43it/s]
Metadata Fetch Progress:  68%|██████▊   | 68/100 [00:07<00:01, 24.74it/s]
Metadata Fetch Progress:  71%|███████   | 71/100 [00:07<00:01, 17.41it/s]
Metadata Fetch Progress:  75%|███████▌  | 75/100 [00:07<00:01, 20.31it/s]
Metadata Fetch Progress:  80%|████████  | 80/100 [00:07<00:00, 24.88it/s]
Metadata Fetch Progress:  83%|████████▎ | 83/100 [00:07<00:00, 24.96it/s]
Metadata Fetch Progress:  87%|████████▋ | 87/100 [00:07<00:00, 26.55it/s]
Metadata Fetch Progress:  91%|█████████ | 91/100 [00:08<00:00, 26.05it/s]
Metadata Fetch Progress:  94%|█████████▍| 94/100 [00:08<00:00, 22.36it/s]
Metadata Fetch Progress:  97%|█████████▋| 97/100 [00:08<00:00, 21.28it/s]
Metadata Fetch Progress: 100%|██████████| 100/100 [00:11<00:00,  8.83it/s]
2022-07-27 10:14:00,408	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 3.9x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
Metadata Fetch Progress:   0%|          | 0/100 [00:00<?, ?it/s]
Metadata Fetch Progress:   1%|          | 1/100 [00:02<04:21,  2.64s/it]
Metadata Fetch Progress:   5%|▌         | 5/100 [00:02<00:39,  2.39it/s]
Metadata Fetch Progress:  17%|█▋        | 17/100 [00:02<00:08, 10.34it/s]
Metadata Fetch Progress:  35%|███▌      | 35/100 [00:02<00:02, 25.00it/s]
Metadata Fetch Progress:  52%|█████▏    | 52/100 [00:03<00:01, 40.81it/s]
Metadata Fetch Progress:  72%|███████▏  | 72/100 [00:03<00:00, 61.17it/s]
Metadata Fetch Progress:  87%|████████▋ | 87/100 [00:03<00:00, 69.49it/s]
Metadata Fetch Progress: 100%|██████████| 100/100 [00:03<00:00, 27.21it/s]
2022-07-27 10:14:04,837	WARNING read_api.py:280 -- ⚠️  The blocks of this dataset are estimated to be 1.8x larger than the target block size of 512 MiB. This may lead to out-of-memory errors during processing. Consider reducing the size of input files or using `.repartition(n)` to increase the number of dataset blocks.
Read->Map_Batches:   0%|                                                                                                                                                           | 0/2000 [00:00<?, ?it/s]/home/ray/anaconda3/lib/python3.8/site-packages/ray/util/client/worker.py:597: UserWarning: More than 10MB of messages have been created to schedule tasks on the server. This can be slow on Ray Client due to communication overhead over the network. If you're running many fine-grained tasks, consider running them inside a single remote function. See the section on "Too fine-grained tasks" in the Ray Design Patterns document for more details: https://docs.google.com/document/d/167rnnDFIVRhHhK4mznEIemOtj63IOhtIPvSYaPgI4Fg/edit#heading=h.f7ins22n6nyl. If your functions frequently use large objects, consider storing the objects remotely with ray.put. An example of this is shown in the "Closure capture of large / unserializable object" section of the Ray Design Patterns document, available here: https://docs.google.com/document/d/167rnnDFIVRhHhK4mznEIemOtj63IOhtIPvSYaPgI4Fg/edit#heading=h.1afmymq455wu
  warnings.warn(
Read->Map_Batches: 100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 2000/2000 [01:39<00:00, 20.06it/s]
Read->Map_Batches: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 800/800 [00:25<00:00, 30.95it/s]
Stage 1 read->map_batches: 2000/2000 blocks executed in 99.74s
* Remote wall time: 862.48ms min, 13.41s max, 5.9s mean, 11792.84s total
* Remote cpu time: 360.13ms min, 8.86s max, 2.73s mean, 5464.03s total
* Peak heap memory usage (MiB): 640260000.0 min, 7955336000.0 max, 2846639156 mean
* Output num rows: 104 min, 2481 max, 664 mean, 1329072 total
* Output size bytes: 13140928 min, 313454142 max, 83972004 mean, 167944009460 total
* Tasks per node: 498 min, 502 max, 500 mean; 4 nodes used

------------------------------------
Stage 1 read->map_batches: 800/800 blocks executed in 25.86s
* Remote wall time: 1.45s min, 6.82s max, 3.24s mean, 2594.93s total
* Remote cpu time: 644.01ms min, 2.77s max, 1.41s mean, 1131.15s total
* Peak heap memory usage (MiB): 1145380000.0 min, 7955336000.0 max, 3456867490 mean
* Output num rows: 172 min, 743 max, 347 mean, 278200 total
* Output size bytes: 21733082 min, 93884023 max, 43940275 mean, 35152220120 total
* Tasks per node: 195 min, 208 max, 200 mean; 4 nodes used

Overall it took ~100 seconds to preprocess my train dataset and ~26 seconds to preprocess my validation dataset (which is great!). I then decided to switch back to using the DummyTrainer with the following configuration:

# create preprocessor
preprocessor = BatchMapper(lambda df: dataset_preprocessing(df))

# create and run trainer
trainer = DummyTrainer(
    train_loop_config={"epochs": 1},
    scaling_config=ScalingConfig(
        num_workers=NUM_GPU_WORKERS,  # 32
        use_gpu=True,
        _max_cpu_fraction_per_node=0.8,
    ),
    datasets={
        "train": train_dataset,
        "val": val_dataset,
    },
    dataset_config={
        "train": DatasetConfig(fit=True, split=True, global_shuffle=False),
        "val": DatasetConfig(fit=False, split=False, global_shuffle=False),
    },
    run_config=RunConfig(
        callbacks=[TBXLoggerCallback()]
    ),
    preprocessor=preprocessor,
)
result = trainer.fit()

Unfortunately, the train dataset preprocessing took ~2.5x longer than before:

(BaseWorkerMixin pid=18343, ip=192.168.41.5) Stage 1 read->map_batches: 2000/2000 blocks executed in 254.63s
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Remote wall time: 863.69ms min, 101.21s max, 11.49s mean, 22977.92s total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Remote cpu time: 375.03ms min, 8.43s max, 2.98s mean, 5962.46s total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Peak heap memory usage (MiB): 634988000.0 min, 6799772000.0 max, 2942425746 mean
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Output num rows: 104 min, 2481 max, 664 mean, 1329072 total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Output size bytes: 13140928 min, 313454142 max, 83972004 mean, 167944009460 total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Tasks per node: 245 min, 843 max, 500 mean; 4 nodes used
(BaseWorkerMixin pid=18343, ip=192.168.41.5) 
(BaseWorkerMixin pid=18343, ip=192.168.41.5) Stage 2 randomize_block_order: 2000/2000 blocks executed in 0s
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Remote wall time: 863.69ms min, 101.21s max, 11.49s mean, 22977.92s total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Remote cpu time: 375.03ms min, 8.43s max, 2.98s mean, 5962.46s total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Peak heap memory usage (MiB): 634988000.0 min, 6799772000.0 max, 2942425746 mean
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Output num rows: 104 min, 2481 max, 664 mean, 1329072 total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Output size bytes: 13140928 min, 313454142 max, 83972004 mean, 167944009460 total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Tasks per node: 245 min, 843 max, 500 mean; 4 nodes used
(BaseWorkerMixin pid=18343, ip=192.168.41.5) 
(BaseWorkerMixin pid=18343, ip=192.168.41.5) Stage 3 split: 63/63 blocks executed in 1.63s
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Remote wall time: 37.17ms min, 97.95s max, 12.78s mean, 805.4s total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Remote cpu time: 37.28ms min, 4.05s max, 2.88s mean, 181.54s total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Peak heap memory usage (MiB): 1869336000.0 min, 4986224000.0 max, 3108774666 mean
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Output num rows: 455 min, 1106 max, 659 mean, 41533 total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Output size bytes: 57494653 min, 139756406 max, 83304615 mean, 5248190791 total
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Tasks per node: 6 min, 30 max, 15 mean; 4 nodes used

However the validation dataset was preprocessed in nearly the same amount of time as before:

(DummyTrainer pid=14581, ip=192.168.41.5) Stage 1 read->map_batches: 800/800 blocks executed in 30.69s
(DummyTrainer pid=14581, ip=192.168.41.5) * Remote wall time: 1.38s min, 6.93s max, 3.14s mean, 2512.81s total
(DummyTrainer pid=14581, ip=192.168.41.5) * Remote cpu time: 691.65ms min, 2.67s max, 1.45s mean, 1161.07s total
(DummyTrainer pid=14581, ip=192.168.41.5) * Peak heap memory usage (MiB): 1299140000.0 min, 6799772000.0 max, 3692958840 mean
(DummyTrainer pid=14581, ip=192.168.41.5) * Output num rows: 172 min, 743 max, 347 mean, 278200 total
(DummyTrainer pid=14581, ip=192.168.41.5) * Output size bytes: 21733082 min, 93884023 max, 43940275 mean, 35152220120 total
(DummyTrainer pid=14581, ip=192.168.41.5) * Tasks per node: 31 min, 288 max, 200 mean; 4 nodes used
(DummyTrainer pid=14581, ip=192.168.41.5) 
(DummyTrainer pid=14581, ip=192.168.41.5) Stage 2 randomize_block_order: 800/800 blocks executed in 0s
(DummyTrainer pid=14581, ip=192.168.41.5) * Remote wall time: 1.38s min, 6.93s max, 3.14s mean, 2512.81s total
(DummyTrainer pid=14581, ip=192.168.41.5) * Remote cpu time: 691.65ms min, 2.67s max, 1.45s mean, 1161.07s total
(DummyTrainer pid=14581, ip=192.168.41.5) * Peak heap memory usage (MiB): 1299140000.0 min, 6799772000.0 max, 3692958840 mean
(DummyTrainer pid=14581, ip=192.168.41.5) * Output num rows: 172 min, 743 max, 347 mean, 278200 total
(DummyTrainer pid=14581, ip=192.168.41.5) * Output size bytes: 21733082 min, 93884023 max, 43940275 mean, 35152220120 total
(DummyTrainer pid=14581, ip=192.168.41.5) * Tasks per node: 31 min, 288 max, 200 mean; 4 nodes used

The tasks per node seems to suggest that – unlike before – the scheduling might not happening according to the SPREAD strategy, as it seems that some workers are processing as much as 3.5x more tasks than others. I’m wondering if this might be the root cause of the slowdown and if so is there a way to specify that I want the BatchMapper to spread the workload?

Some other things of interest:

(BaseWorkerMixin pid=18343, ip=192.168.41.5) Dataset iterator time breakdown:
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * In ray.wait(): 14.32ms
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * In ray.get(): 20.78s
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * In next_batch(): 2.12s
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * In format_batch(): 582.12us
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * In user code: 6.12s
(BaseWorkerMixin pid=18343, ip=192.168.41.5) * Total time: 29.06s
...
aseWorkerMixin pid=21572, ip=192.168.2.159) P50/P95/Max batch delay (s) 2.4949573230005626 3.7621722525000223 4.194920468999953
(BaseWorkerMixin pid=21572, ip=192.168.2.159) Num epochs read 1
(BaseWorkerMixin pid=21572, ip=192.168.2.159) Num batches read 11
(BaseWorkerMixin pid=21572, ip=192.168.2.159) Num bytes read 5004.74 MiB
(BaseWorkerMixin pid=21572, ip=192.168.2.159) Mean throughput 172.23 MiB/s
(BaseWorkerMixin pid=26312, ip=192.168.42.109) Time to read all data 29.060114934000012 seconds
(BaseWorkerMixin pid=26312, ip=192.168.42.109) P50/P95/Max batch delay (s) 2.406928956000229 3.1735702439996203 3.415531849999752
(BaseWorkerMixin pid=26312, ip=192.168.42.109) Num epochs read 1
(BaseWorkerMixin pid=26312, ip=192.168.42.109) Num batches read 11
(BaseWorkerMixin pid=26312, ip=192.168.42.109) Num bytes read 5004.75 MiB
(BaseWorkerMixin pid=26312, ip=192.168.42.109) Mean throughput 172.22 MiB/s

My batch delay seems a little high so maybe I’ll benefit from setting prefetch_blocks=10 or something like that.

Also, I’m currently setting the parallelism for my train dataset equal to the number of blocks in my train dataset (i.e. 2000), would decreasing this to say 500 potentially help reduce the number of tasks and speed up performance?