I have posted the following issue on the horovod github page, but I wanted to also share this here to see if I can get any useful information. Also, I’m not sure whether or not what I’m encountering is a bug . So, any information that can help clear that out is appreciated.
Here’s the issue,
Environment:
- Framework: PyTorch
- Framework version: 1.5.0
- Horovod version: 0.23.0
- MPI version: MVAPICH2 2.3.7/mpi4py 3.1.4
- CUDA version: 11.2
- NCCL version: N/A
- Python version: 3.7.15
- Spark / PySpark version: N/A
- Ray version: 2.2.0
- OS and version: Centos 7
- GCC version: 9.4.0
- CMake version: 3.22.2
Checklist:
- Did you search issues to find if somebody asked this question before?
Issue #3055 is sort of related
Bug report:
Please describe erroneous behavior you’re observing and steps to reproduce it.
I’m trying to run horovod + ray in such a way where I can leverage MPI for training-related communication. In the next lines, I will go through the build command I used, the error I got, and finally the program I’m running (which is really pytorch_synthetic_benchmark.py
with ray). I’m running the application on a ray cluster of three nodes, a head node and two worker node.
– The build command I used,
HOROVOD_WITH_PYTORCH=1 HOROVOD_GPU_OPERATIONS=MPI HOROVOD_WITH_GLOO=1 HOROVOD_WITHOUT_TENSORFLOW=1 HOROVOD_WITH_MPI=1 CC=$(which mpicc) CXX=$(which mp icxx) pip install -e .
– command I’m running,
mpirun_rsh --np 2 --hostfile hostfile python pytorch_synthetic_benchmark.py
cat hostfile
gpu01
gpu02
gpu03
– The output of the command,
2023-02-07 21:53:23,535 INFO worker.py:1352 -- Connecting to existing Ray cluster at address: 10.1.1.1:6379...
2023-02-07 21:53:23,543 INFO worker.py:1538 -- Connected to Ray cluster.
2023-02-07 21:53:24,353 INFO worker.py:1352 -- Connecting to existing Ray cluster at address: 10.1.1.1:6379...
2023-02-07 21:53:24,365 INFO worker.py:1538 -- Connected to Ray cluster.
Traceback (most recent call last):
File "pytorch_synthetic_benchmark.py", line 139, in <module>
executor.start()
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/runner.py", line 320, in start
return self._maybe_call_ray(self.adapter.start, **kwargs_)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/runner.py", line 419, in _maybe_call_ray
return driver_func(**kwargs)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/runner.py", line 563, in start
node_workers=node_workers)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/utils.py", line 72, in detect_nics
settings)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/driver_service.py", line 59, in _driver_fn
return _run_probe(driver, settings, num_hosts)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/driver/driver_service.py", line 126, in _run_probe
driver.wait_for_initial_registration(settings.start_timeout)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/service/driver_service.py", line 166, in wait_for_initial_registration
timeout.check_time_out_for('tasks to start')
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/util/timeout.py", line 37, in check_time_out_for
self._timeout
Exception: Timed out waiting for tasks to start. Please check connectivity between servers. You may need to increase the --start-timeout parameter if you have too many servers. Timeout after 30 seconds.
2023-02-07 21:53:54,964 ERROR worker.py:401 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::BaseHorovodWorker.execute() (pid=24246, ip=10.1.1.2, repr=<horovod.ray.worker.BaseHorovodWorker object at 0x2b9471081ad0>)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/ray/driver_service.py", line 11, in execute_task_fn
_task_fn(index, num_hosts, driver_addresses, settings)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/task_fn.py", line 31, in _task_fn
task.wait_for_initial_registration(settings.start_timeout)
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/service/task_service.py", line 253, in wait_for_initial_registration
timeout.check_time_out_for('tasks to start')
File "/home/alattar.2/quentin-horvod-tar/deepinstrospect/horovod/runner/common/util/timeout.py", line 37, in check_time_out_for
self._timeout
Exception: Timed out waiting for tasks to start. Please check connectivity between servers. You may need to increase the --start-timeout parameter if you have too many servers. Timeout after 30 seconds.
[gpu01.cluster:mpispawn_0][child_handler] MPI process (rank: 0, pid: 25620) exited with status 1
The file I’m running,
import argparse
#import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from torchvision import models
import horovod.torch as hvd
import timeit
import numpy as np
# Benchmark settings
parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--model', type=str, default='resnet50',
help='model to benchmark')
parser.add_argument('--batch-size', type=int, default=1,
help='input batch size')
parser.add_argument('--num-warmup-batches', type=int, default=1,
help='number of warm-up batches that don\'t count towards benchmark')
parser.add_argument('--num-batches-per-iter', type=int, default=1,
help='number of batches per benchmark iteration')
parser.add_argument('--num-iters', type=int, default=1,
help='number of benchmark iterations')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
def benchmark_step(optimizer, model, data, target):
optimizer.zero_grad()
output = model(data)
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
def log(s, nl=True):
if hvd.rank() != 0:
return
print(s, end='\n' if nl else '')
def start_bench():
hvd.init()
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
#cudnn.benchmark = False
# Set up standard model.
model = getattr(models, args.model)()
# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1
optimizer = optim.SGD(model.parameters(), lr=0.01 * lr_scaler)
if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = hvd.local_size()
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters(),
compression=compression,
op=hvd.Adasum if args.use_adasum else hvd.Average)
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Set up fixed fake data
data = torch.randn(args.batch_size, 3, 224, 224)
target = torch.LongTensor(args.batch_size).random_() % 1000
if args.cuda:
data, target = data.cuda(), target.cuda()
log('Model: %s' % args.model)
log('Batch size: %d' % args.batch_size)
device = 'GPU' if args.cuda else 'CPU'
log('Number of %ss: %d' % (device, hvd.size()))
# Warm-up
log('Running warmup...')
timeit.timeit(lambda: benchmark_step(optimizer, model, data, target), number=args.num_warmup_batches)
# Benchmark
log('Running benchmark...')
img_secs = []
for x in range(args.num_iters):
time = timeit.timeit(lambda: benchmark_step(optimizer, model, data, target), number=args.num_batches_per_iter)
img_sec = args.batch_size * args.num_batches_per_iter / time
log('Iter #%d: %.1f img/sec per %s' % (x, img_sec, device))
img_secs.append(img_sec)
# Results
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf))
log('Total img/sec on %d %s(s): %.1f +-%.1f' %
(hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))
if __name__ == '__main__':
from horovod.ray import RayExecutor
import ray
ray.init()
num_hosts=3
num_workers_per_host=1
settings=RayExecutor.create_settings(ssh_identity_file="./hostfile")
executor = RayExecutor(
settings,
#num_hosts=num_hosts,
#num_workers_per_host=num_workers_per_host,
gpus_per_worker=1,
num_workers=2,
use_gpu=True)
executor.start()
executor.run(start_bench)
executor.shutdown()
I’m not sure where I’m going wrong here. Would highly appreciate it if you can help identify anything obvious that I may be missing here.