Increase in workers doesn't decrease training time

I have trained a dataset in mobilenetv2 with 2 workers(1 cpu 3gb ram each) . it took 3325 seconds to run 2 epochs. I have increased to 4 workers then also it ran for 3326 seconds.

increasing the number of workers has no effect. is there any thing i need to check?

Can you share what your script looks like?

hey @matthewdeng sharing the file currently working on

cluster is setup with the worker size of 2 cores and 4 gb ram, with env variable OMP_NUM_THREADS=2

And could you show the logs?

Hmm it might depends on where your bottleneck is. One thing I noticed is that your global batch size is constant, so if the training time for a single batch is really small, there will not be noticeable benefit with distributed training.

One naive solution to decrease training time is to set it so that worker batch size is constant and you should approach close to linear speedup, but it may affect the learning curve of your model.

You can also use the ray dashboard for the monitoring. Did you see you have two workers running ?

The following is with two workers.

/home/ray/anaconda3/lib/python3.8/site-packages/tqdm/auto.py:22: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm
file-----------
main--------------
2022-06-04 13:44:57,034	INFO packaging.py:323 -- Pushing file package 'gcs://_ray_pkg_e1ea93f0c19f60918b7912b43b338386.zip' (0.09MiB) to Ray cluster...
2022-06-04 13:44:57,036	INFO packaging.py:332 -- Successfully pushed file package 'gcs://_ray_pkg_e1ea93f0c19f60918b7912b43b338386.zip'.
2022-06-04 13:44:57,051	INFO trainer.py:243 -- Trainer logs will be logged in: /home/ray/ray_results/train_2022-06-04_13-44-57
(BaseWorkerMixin pid=4875) 2022-06-04 13:45:00,748	INFO torch.py:349 -- Setting up process group for: env:// [rank=1, world_size=2]
(BaseWorkerMixin pid=4874) 2022-06-04 13:45:00,749	INFO torch.py:349 -- Setting up process group for: env:// [rank=0, world_size=2]
2022-06-04 13:45:01,817	INFO trainer.py:249 -- Run results will be logged in: /home/ray/ray_results/train_2022-06-04_13-44-57/run_001
(BaseWorkerMixin pid=4875) 1000
(BaseWorkerMixin pid=4874) 1000
(BaseWorkerMixin pid=4875) 2022-06-04 13:45:02,086	INFO torch.py:97 -- Moving model to device: cpu
(BaseWorkerMixin pid=4875) 2022-06-04 13:45:02,087	INFO torch.py:135 -- Wrapping provided model in DDP.
(BaseWorkerMixin pid=4874) 2022-06-04 13:45:02,087	INFO torch.py:97 -- Moving model to device: cpu
(BaseWorkerMixin pid=4874) 2022-06-04 13:45:02,087	INFO torch.py:135 -- Wrapping provided model in DDP.
(BaseWorkerMixin pid=4875) loss: 2.304449  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.304529  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 14.8%, Avg loss: 2.301992 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 16.1%, Avg loss: 2.301196 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.302403  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.303127  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 16.5%, Avg loss: 2.300077 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 17.9%, Avg loss: 2.299322 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.300217  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.301674  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 18.2%, Avg loss: 2.297997 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 19.6%, Avg loss: 2.297340 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.297815  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.300117  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 20.1%, Avg loss: 2.295787 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 21.5%, Avg loss: 2.295250 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.295287  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.298449  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 22.6%, Avg loss: 2.293510 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 23.6%, Avg loss: 2.293094 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.292696  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.296699  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 25.1%, Avg loss: 2.291205 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 25.7%, Avg loss: 2.290898 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.290082  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.294896  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 27.9%, Avg loss: 2.288895 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 28.1%, Avg loss: 2.288690 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.287466  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.293067  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 30.0%, Avg loss: 2.286576 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 30.0%, Avg loss: 2.286469 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.284843  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.291225  [    0/30000]
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 32.2%, Avg loss: 2.284245 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 32.5%, Avg loss: 2.284235 
(BaseWorkerMixin pid=4874) 
(BaseWorkerMixin pid=4875) loss: 2.282205  [    0/30000]
(BaseWorkerMixin pid=4874) loss: 2.289369  [    0/30000]
train time ---------------- 96.40072703361511
(BaseWorkerMixin pid=4875) Test Error: 
(BaseWorkerMixin pid=4875)  Accuracy: 33.9%, Avg loss: 2.281895 
(BaseWorkerMixin pid=4875) 
(BaseWorkerMixin pid=4874) Test Error: 
(BaseWorkerMixin pid=4874)  Accuracy: 34.2%, Avg loss: 2.281981 
(BaseWorkerMixin pid=4874) 
Loss results: [[2.3011959075927733, 2.299322414398193, 2.297340440750122, 2.2952504634857176, 2.2930936336517336, 2.290898323059082, 2.288690376281738, 2.2864691734313967, 2.2842350959777833, 2.281980800628662], [2.301992082595825, 2.300076627731323, 2.29799747467041, 2.2957868576049805, 2.293509531021118, 2.291204643249512, 2.288894844055176, 2.2865764141082763, 2.284245252609253, 2.281894826889038]]

The following is with one worker.

file-----------
main--------------
2022-06-04 13:48:27,765	INFO worker.py:862 -- Using address localhost:9031 set in the environment variable RAY_ADDRESS
2022-06-04 13:48:27,806	INFO worker.py:964 -- Connecting to existing Ray cluster at address: 172.31.75.49:9031
2022-06-04 13:48:27,808	INFO worker.py:981 -- Calling ray.init() again after it has already been called.
2022-06-04 13:48:27,811	INFO trainer.py:243 -- Trainer logs will be logged in: /home/ray/ray_results/train_2022-06-04_13-48-27
(bundle_reservation_check_func pid=4826) E0604 13:48:29.277654244    4873 chttp2_transport.cc:1103]   Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"
2022-06-04 13:48:31,765	INFO trainer.py:249 -- Run results will be logged in: /home/ray/ray_results/train_2022-06-04_13-48-27/run_001
(BaseWorkerMixin pid=5654) 2022-06-04 13:48:31,728	INFO torch.py:349 -- Setting up process group for: env:// [rank=0, world_size=1]
(BaseWorkerMixin pid=5654) 2000
(BaseWorkerMixin pid=5654) 2022-06-04 13:48:32,029	INFO torch.py:97 -- Moving model to device: cpu
(BaseWorkerMixin pid=5654) loss: 2.305619  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 10.8%, Avg loss: 2.304033 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.303440  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 12.2%, Avg loss: 2.301719 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.301149  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 13.6%, Avg loss: 2.299365 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.298802  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 14.9%, Avg loss: 2.297001 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.296452  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 16.1%, Avg loss: 2.294642 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.294113  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 17.1%, Avg loss: 2.292277 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.291781  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 18.6%, Avg loss: 2.289894 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.289440  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 20.3%, Avg loss: 2.287486 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.287072  [    0/60000]
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 22.5%, Avg loss: 2.285050 
(BaseWorkerMixin pid=5654) 
(BaseWorkerMixin pid=5654) loss: 2.284687  [    0/60000]
train time ---------------- 178.2441258430481
(BaseWorkerMixin pid=5654) Test Error: 
(BaseWorkerMixin pid=5654)  Accuracy: 24.9%, Avg loss: 2.282580 
(BaseWorkerMixin pid=5654) 
Loss results: [[2.304033136367798, 2.301719379425049, 2.299365425109863, 2.2970014095306395, 2.2946418285369874, 2.292277193069458, 2.289893627166748, 2.2874859809875487, 2.2850500106811524, 2.282579851150513]]

The training time is 96.40072703361511 vs 178.2441258430481.

feel free to use this to reproduce

from typing import Dict

import torch
import ray.train as train
from ray.train.trainer import Trainer
from ray.train.callbacks import JsonLoggerCallback
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import time
print("file-----------")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 1024),
            nn.ReLU(),
            nn.Linear(1024, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
            nn.ReLU(),
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits


def train_epoch(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) // train.world_size()
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def validate_epoch(dataloader, model, loss_fn):
    size = len(dataloader.dataset) // train.world_size()
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n "
        f"Accuracy: {(100 * correct):>0.1f}%, "
        f"Avg loss: {test_loss:>8f} \n"
    )
    return test_loss


def train_func(config: Dict):
    batch_size = config["batch_size"]
    lr = config["lr"]
    epochs = config["epochs"]

    worker_batch_size = batch_size // train.world_size()
    
    print(worker_batch_size)
    # Create data loaders.
#     train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
#     test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)

    train_kwargs = {'batch_size': worker_batch_size}
    test_kwargs = {'batch_size': worker_batch_size}

    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    
    dataset1 = datasets.MNIST('../data', train=True, download=True,
                   transform=transform)
    dataset2 = datasets.MNIST('../data', train=False,
                       transform=transform)
    train_dataloader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
    test_dataloader = torch.utils.data.DataLoader(dataset2, **test_kwargs)
    
    train_dataloader = train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = train.torch.prepare_data_loader(test_dataloader)

    # Create model.
    model = NeuralNetwork()
    model = train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    loss_results = []

    for _ in range(epochs):
        train_epoch(train_dataloader, model, loss_fn, optimizer)
        loss = validate_epoch(test_dataloader, model, loss_fn)
        train.report(loss=loss)
        loss_results.append(loss)

    return loss_results


def train_fashion_mnist(num_workers=2, use_gpu=False,resources=None):
    trainer = Trainer(backend="torch", num_workers=num_workers, use_gpu=use_gpu,resources_per_worker=resources)
    trainer.start()
    start_time=time.time()
    result = trainer.run(
        train_func=train_func,
        config={"lr": 1e-3, "batch_size": 2000, "epochs": 10},
        callbacks=[JsonLoggerCallback()],
    )
    end_time =time.time()
    print("train time ----------------",end_time-start_time)
    trainer.shutdown()
    print(f"Loss results: {result}")


if __name__ == "__main__":
    print("main--------------")
    

    import ray
    # address ="ray://192.168.29.7:10001"
    
    # vary from 1 to 2    
    workers =1
    # workers =2
    
    use_gpu =False
    resources= {"CPU":2}
    ray.init('auto', ignore_reinit_error=True)
    
    train_fashion_mnist(num_workers=workers, use_gpu=use_gpu,resources=resources)

my hardware config:

rchitecture:                    x86_64
CPU op-mode(s):                  32-bit, 64-bit
Byte Order:                      Little Endian
Address sizes:                   46 bits physical, 48 bits virtual
CPU(s):                          8
On-line CPU(s) list:             0-7
Thread(s) per core:              2
Core(s) per socket:              4
Socket(s):                       1
NUMA node(s):                    1
Vendor ID:                       GenuineIntel
CPU family:                      6
Model:                           85
Model name:                      Intel(R) Xeon(R) Platinum 8175M CPU @ 2.50GHz
Stepping:                        4
CPU MHz:                         2499.998
BogoMIPS:                        4999.99
Hypervisor vendor:               KVM
Virtualization type:             full
L1d cache:                       128 KiB
L1i cache:                       128 KiB
L2 cache:                        4 MiB
L3 cache:                        33 MiB
NUMA node0 CPU(s):               0-7
Vulnerability Itlb multihit:     KVM: Mitigation: VMX unsupported
Vulnerability L1tf:              Mitigation; PTE Inversion
Vulnerability Mds:               Vulnerable: Clear CPU buffers attempted, no microcode; SMT Host state unknown
Vulnerability Meltdown:          Mitigation; PTI
Vulnerability Spec store bypass: Vulnerable
Vulnerability Spectre v1:        Mitigation; usercopy/swapgs barriers and __user pointer sanitization
Vulnerability Spectre v2:        Mitigation; Retpolines, STIBP disabled, RSB filling
Vulnerability Srbds:             Not affected
Vulnerability Tsx async abort:   Vulnerable: Clear CPU buffers attempted, no microcode; SMT Host state unknown
Flags:                           fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflu
                                 sh mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_good
                                  nopl xtopology nonstop_tsc cpuid aperfmperf tsc_known_freq pni pclmulqdq sss
                                 e3 fma cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsa
                                 ve avx f16c rdrand hypervisor lahf_lm abm 3dnowprefetch invpcid_single pti fs
                                 gsbase tsc_adjust bmi1 hle avx2 smep bmi2 erms invpcid rtm mpx avx512f avx512
                                 dq rdseed adx smap clflushopt clwb avx512cd avx512bw avx512vl xsaveopt xsavec
                                  xgetbv1 xsaves ida arat pku ospke

hey @Jimmy and @matthewdeng
thank you so much for supporting

i have run the code you have shared. the training time is decreased with increase in workers

and noted it’s working when the dataloader object is instantiated inside the training function. before it was not working because i instantiated dataloader object outside the training function

Awesome, glad it helps! Thanks for your positive feedback!