Horovod on Ray -- Not sure if Bug or Not

I have posted the following issue on the horovod github page, but I wanted to also share this here to see if I can get any useful information. Also, I’m not sure whether or not what I’m encountering is a bug . So, any information that can help clear that out is appreciated.

Here’s the issue,

Environment:

  1. Framework: PyTorch
  2. Framework version: 1.5.0
  3. Horovod version: 0.23.0
  4. MPI version: MVAPICH2 2.3.7/mpi4py 3.1.4
  5. CUDA version: 11.2
  6. NCCL version: N/A
  7. Python version: 3.7.15
  8. Spark / PySpark version: N/A
  9. Ray version: 2.2.0
  10. OS and version: Centos 7
  11. GCC version: 9.4.0
  12. CMake version: 3.22.2

Checklist:

  1. Did you search issues to find if somebody asked this question before?
    Issue #3055 is sort of related

Bug report:
Please describe erroneous behavior you’re observing and steps to reproduce it.

I’m trying to run horovod + ray in such a way where I can leverage MPI for training-related communication. In the next lines, I will go through the build command I used, the error I got, and finally the program I’m running (which is really pytorch_synthetic_benchmark.py with ray). I’m running the application on a ray cluster of three nodes, a head node and two worker node.

– The build command I used,
HOROVOD_WITH_PYTORCH=1 HOROVOD_GPU_OPERATIONS=MPI HOROVOD_WITH_GLOO=1 HOROVOD_WITHOUT_TENSORFLOW=1 HOROVOD_WITH_MPI=1 CC=$(which mpicc) CXX=$(which mp icxx) pip install -e .

– command I’m running,
mpirun_rsh --np 2 --hostfile hostfile python pytorch_synthetic_benchmark.py

cat hostfile
gpu01
gpu02
gpu03

– The output of the command,

2023-02-07 21:53:23,535 INFO worker.py:1352 -- Connecting to existing Ray cluster at address: 10.1.1.1:6379...
2023-02-07 21:53:23,543 INFO worker.py:1538 -- Connected to Ray cluster.
2023-02-07 21:53:24,353 INFO worker.py:1352 -- Connecting to existing Ray cluster at address: 10.1.1.1:6379...
2023-02-07 21:53:24,365 INFO worker.py:1538 -- Connected to Ray cluster.

Traceback (most recent call last):
  File "pytorch_synthetic_benchmark.py", line 139, in <module>
    executor.start()
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/runner.py", line 320, in start
    return self._maybe_call_ray(self.adapter.start, **kwargs_)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/runner.py", line 419, in _maybe_call_ray
    return driver_func(**kwargs)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/runner.py", line 563, in start
    node_workers=node_workers)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/utils.py", line 72, in detect_nics
    settings)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/driver_service.py", line 59, in _driver_fn
    return _run_probe(driver, settings, num_hosts)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/driver/driver_service.py", line 126, in _run_probe
    driver.wait_for_initial_registration(settings.start_timeout)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/service/driver_service.py", line 166, in wait_for_initial_registration
    timeout.check_time_out_for('tasks to start')
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/util/timeout.py", line 37, in check_time_out_for
    self._timeout
Exception: Timed out waiting for tasks to start. Please check connectivity between servers. You may need to increase the --start-timeout parameter if you have too many servers. Timeout after 30 seconds.
2023-02-07 21:53:54,964 ERROR worker.py:401 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::BaseHorovodWorker.execute() (pid=24246, ip=10.1.1.2, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x2b9471081ad0>)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/driver_service.py", line 11, in execute_task_fn
    _task_fn(index, num_hosts, driver_addresses, settings)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/task_fn.py", line 31, in _task_fn
    task.wait_for_initial_registration(settings.start_timeout)
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/service/task_service.py", line 253, in wait_for_initial_registration
    timeout.check_time_out_for('tasks to start')
  File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/util/timeout.py", line 37, in check_time_out_for
    self._timeout
Exception: Timed out waiting for tasks to start. Please check connectivity between servers. You may need to increase the --start-timeout parameter if you have too many servers. Timeout after 30 seconds.
[gpu01.cluster:mpispawn_0][child_handler] MPI process (rank: 0, pid: 25620) exited with status 1

The file I’m running,

import argparse
#import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from torchvision import models
import horovod.torch as hvd
import timeit
import numpy as np

# Benchmark settings
parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark',
                                 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                    help='use fp16 compression during allreduce')

parser.add_argument('--model', type=str, default='resnet50',
                    help='model to benchmark')
parser.add_argument('--batch-size', type=int, default=1,
                    help='input batch size')

parser.add_argument('--num-warmup-batches', type=int, default=1,
                    help='number of warm-up batches that don\'t count towards benchmark')
parser.add_argument('--num-batches-per-iter', type=int, default=1,
                    help='number of batches per benchmark iteration')
parser.add_argument('--num-iters', type=int, default=1,
                    help='number of benchmark iterations')

parser.add_argument('--no-cuda', action='store_true', default=False,
                    help='disables CUDA training')

parser.add_argument('--use-adasum', action='store_true', default=False,
                    help='use adasum algorithm to do reduction')

args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()


def benchmark_step(optimizer, model, data, target):
    optimizer.zero_grad()
    output = model(data)
    loss = F.cross_entropy(output, target)
    loss.backward()
    optimizer.step()


def log(s, nl=True):
    if hvd.rank() != 0:
        return
    print(s, end='\n' if nl else '')


def start_bench():
    hvd.init()

    if args.cuda:
        # Horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())

    #cudnn.benchmark = False

    # Set up standard model.
    model = getattr(models, args.model)()

    # By default, Adasum doesn't need scaling up learning rate.
    lr_scaler = hvd.size() if not args.use_adasum else 1

    optimizer = optim.SGD(model.parameters(), lr=0.01 * lr_scaler)

    if args.cuda:
        # Move model to GPU.
        model.cuda()
        # If using GPU Adasum allreduce, scale learning rate by local_size.
        if args.use_adasum and hvd.nccl_built():
            lr_scaler = hvd.local_size()

    # Horovod: (optional) compression algorithm.
    compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none

    # Horovod: wrap optimizer with DistributedOptimizer.
    optimizer = hvd.DistributedOptimizer(optimizer,
                                         named_parameters=model.named_parameters(),
                                         compression=compression,
                                         op=hvd.Adasum if args.use_adasum else hvd.Average)

    # Horovod: broadcast parameters & optimizer state.
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)

    # Set up fixed fake data
    data = torch.randn(args.batch_size, 3, 224, 224)
    target = torch.LongTensor(args.batch_size).random_() % 1000
    if args.cuda:
        data, target = data.cuda(), target.cuda()

    log('Model: %s' % args.model)
    log('Batch size: %d' % args.batch_size)
    device = 'GPU' if args.cuda else 'CPU'
    log('Number of %ss: %d' % (device, hvd.size()))

    # Warm-up
    log('Running warmup...')
    timeit.timeit(lambda: benchmark_step(optimizer, model, data, target), number=args.num_warmup_batches)

    # Benchmark
    log('Running benchmark...')
    img_secs = []
    for x in range(args.num_iters):
        time = timeit.timeit(lambda: benchmark_step(optimizer, model, data, target), number=args.num_batches_per_iter)
        img_sec = args.batch_size * args.num_batches_per_iter / time
        log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device))
        img_secs.append(img_sec)

    # Results
    img_sec_mean = np.mean(img_secs)
    img_sec_conf = 1.96 * np.std(img_secs)
    log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf))
    log('Total img/sec on %d %s(s): %.1f +-%.1f' %
        (hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))

if __name__ == '__main__':
    from horovod.ray import RayExecutor
    import ray

    ray.init()

    num_hosts=3
    num_workers_per_host=1

    settings=RayExecutor.create_settings(ssh_identity_file="./hostfile")
    executor = RayExecutor(
        settings,
        #num_hosts=num_hosts,
        #num_workers_per_host=num_workers_per_host,
        gpus_per_worker=1,
        num_workers=2,
        use_gpu=True)

    executor.start()
    executor.run(start_bench)
    executor.shutdown()

I’m not sure where I’m going wrong here. Would highly appreciate it if you can help identify anything obvious that I may be missing here.