TorchTrainer: Collective operation timeout: WorkNCCL

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

Hi Team,

The Ray Train job fails after the completion of all the epochs (Huggingface trainable).
In the training arguments, if load_best_model_at_end: true is passed, the below error occurs, but if load_best_model_at_end: false it succeeds.
Error log:

023-07-12 19:04:22,276	ERROR trial_runner.py:1450 -- Trial TorchTrainer_ea2da_00000: Error happened when processing _ExecutorEventType.TRAINING_RESULT.
ray.exceptions.RayTaskError(RuntimeError): e[36mray::_Inner.train()e[39m (pid=442, ip=100.96.180.94, repr=TorchTrainer)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 384, in train
    raise skipped from exception_cause(skipped)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(RuntimeError): e[36mray::_RayTrainWorker__execute.get_next()e[39m (pid=555, ip=100.96.180.94, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7efcf541aa10>)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 31, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 129, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray_wrappers/utils/convert_to_ray_trainable.py", line 95, in trainable_fn
    entrypoint_module.main(cli_args)
  File "/tmp/ray/session_2023-07-12_18-13-29_686159_25/runtime_resources/working_dir_files/_ray_pkg_c58f328d772e73e3/train/deberta/deberta_irc_comments.py", line 304, in main
    train_result = trainer.train(resume_from_checkpoint=checkpoint)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/trainer.py", line 1662, in train
    return inner_training_loop(
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/trainer.py", line 2049, in _inner_training_loop
    self._load_best_model()
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/trainer.py", line 2184, in _load_best_model
    deepspeed_engine, optimizer, lr_scheduler = deepspeed_init(
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/deepspeed.py", line 378, in deepspeed_init
    deepspeed_engine, optimizer, _, lr_scheduler = deepspeed.initialize(**kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/__init__.py", line 165, in initialize
    engine = DeepSpeedEngine(args=args,
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 266, in __init__
    self._configure_distributed_model(model)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 1067, in _configure_distributed_model
    self._broadcast_model()
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 997, in _broadcast_model
    dist.broadcast(p, groups._get_broadcast_src_rank(), group=self.data_parallel_group)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/comm/comm.py", line 120, in log_wrapper
    return func(*args, **kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/comm/comm.py", line 217, in broadcast
    return cdb.broadcast(tensor=tensor, src=src, group=group, async_op=async_op)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/comm/torch.py", line 80, in broadcast
    return torch.distributed.broadcast(tensor=tensor, src=src, group=group, async_op=async_op)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1436, in wrapper
    return func(*args, **kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1559, in broadcast
    work.wait()
RuntimeError: [Rank 0] Caught collective operation timeout: WorkNCCL(SeqNum=424071, OpType=BROADCAST, TensorShape=[128100, 768], Timeout(ms)=1800000) ran for 1800681 milliseconds before timing out.
e[2me[36m(TorchTrainer pid=442, ip=100.96.180.94)e[0m 23/07/12 19:04:26 WARN avro17.Avro17Adapter: you are using an older version of avro 1.7. please consider upgrading to latest 1.7.*
e[2me[36m(RayTrainWorker pid=560, ip=100.96.180.94)e[0m [WARNING|trainer.py:2235] 2023-07-12 18:34:21,459 >> Could not locate the best model at ./checkpoint-2500/pytorch_model.bin, if you are running a distributed training on multiple nodes, you should activate `--save_on_each_node`.e[32m [repeated 4x across cluster]e[0m
2023-07-12 19:04:38,435	ERROR tune.py:941 -- Trials did not complete: [TorchTrainer_ea2da_00000]
12-Jul-23 19:04:38 [Worker ID: DRIVER] [INFO] Syncing logs from /tmp/ray/session_latest/logs/ for detailed logging at the ray head and worker level
e[2me[36m(TorchTrainer pid=442, ip=100.96.180.94)e[0m 2023-07-12 19:04:38,431	WARNING util.py:244 -- Uploading trial artifacts took 15.405 s, which may be a performance bottleneck. Consider saving fewer/smaller artifacts to the trial log directory, or disable artifact syncing with `SyncConfig(sync_artifacts=False)`.
23/07/12 19:04:39 WARN avro17.Avro17Adapter: you are using an older version of avro 1.7. please consider upgrading to latest 1.7.*
12-Jul-23 19:04:58 [Worker ID: DRIVER] [INFO] Final Sync with HDFS completed!
ray.exceptions.RayTaskError(RuntimeError): e[36mray::_Inner.train()e[39m (pid=442, ip=100.96.180.94, repr=TorchTrainer)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/tune/trainable/trainable.py", line 384, in train
    raise skipped from exception_cause(skipped)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 54, in check_for_failure
    ray.get(object_ref)
ray.exceptions.RayTaskError(RuntimeError): e[36mray::_RayTrainWorker__execute.get_next()e[39m (pid=555, ip=100.96.180.94, repr=<ray.train._internal.worker_group.RayTrainWorker object at 0x7efcf541aa10>)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/train/_internal/worker_group.py", line 31, in __execute
    raise skipped from exception_cause(skipped)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray/train/_internal/utils.py", line 129, in discard_return_wrapper
    train_func(*args, **kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/ray_wrappers/utils/convert_to_ray_trainable.py", line 95, in trainable_fn
    entrypoint_module.main(cli_args)
  File "/tmp/ray/session_2023-07-12_18-13-29_686159_25/runtime_resources/working_dir_files/_ray_pkg_c58f328d772e73e3/train/deberta/deberta_irc_comments.py", line 304, in main
    train_result = trainer.train(resume_from_checkpoint=checkpoint)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/trainer.py", line 1662, in train
    return inner_training_loop(
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/trainer.py", line 2049, in _inner_training_loop
    self._load_best_model()
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/trainer.py", line 2184, in _load_best_model
    deepspeed_engine, optimizer, lr_scheduler = deepspeed_init(
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/transformers/deepspeed.py", line 378, in deepspeed_init
    deepspeed_engine, optimizer, _, lr_scheduler = deepspeed.initialize(**kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/__init__.py", line 165, in initialize
    engine = DeepSpeedEngine(args=args,
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 266, in __init__
    self._configure_distributed_model(model)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 1067, in _configure_distributed_model
    self._broadcast_model()
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/runtime/engine.py", line 997, in _broadcast_model
    dist.broadcast(p, groups._get_broadcast_src_rank(), group=self.data_parallel_group)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/comm/comm.py", line 120, in log_wrapper
    return func(*args, **kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/comm/comm.py", line 217, in broadcast
    return cdb.broadcast(tensor=tensor, src=src, group=group, async_op=async_op)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/deepspeed/comm/torch.py", line 80, in broadcast
    return torch.distributed.broadcast(tensor=tensor, src=src, group=group, async_op=async_op)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1436, in wrapper
    return func(*args, **kwargs)
  File "/home/jobuser/build/environments/satellites/python/lib/python3.10/site-packages/torch/distributed/distributed_c10d.py", line 1559, in broadcast
    work.wait()
RuntimeError: [Rank 0] Caught collective operation timeout: WorkNCCL(SeqNum=424071, OpType=BROADCAST, TensorShape=[128100, 768], Timeout(ms)=1800000) ran for 1800681 milliseconds before timing out.
 

I have configured the TorchConfig as shown below for the timeout, but it doesn’t change the timeout value as per the logs.

trainer = TorchTrainer(
                train_loop_per_worker=ray_job_args["train_loop_per_worker"],
                train_loop_config=ray_job_args["train_loop_config"],
                scaling_config=ray_job_args["scaling_config"],
                run_config=ray_job_args["run_config"],
                torch_config=TorchConfig(
                    timeout_s=24000
                ),
            )

Can you please suggest if I can set any global environment variables for a timeout for the NCCL or configure it elsewhere?

RuntimeError: [Rank 0] Caught collective operation timeout: WorkNCCL(SeqNum=424071, OpType=BROADCAST, TensorShape=[128100, 768], Timeout(ms)=1800000) ran for 1800681 milliseconds before timing out.

Your input would be highly appreciated, thank you!

Regards,
Vivek

@kai @xwjiang2010, could you please suggest any workaround for this issue?

Hi @saivivek15,

it looks like this issue has been discussed e.g. here before.

Can you try two things (ideally separately). One, can you set

export NCCL_P2P_LEVEL=NVL

second, it looks like you’re using HDFS. One user in the thread reported issues with HDFS sync taking too long on rank 0. Can you try saving the checkpoints to a local directory instead?

1 Like