Entire ray cluster dying unexpectedly

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.

I’ve finally managed to get Ray running on our slurm cluster, and started about 90 experiments using tune.run(). That was running fine for about 22 hours, before the entire ray cluster crashed. The only error message I can see is in the console output of the head node, nothing in the logs on either the head node or the worker that seems to have caused this. Any idea what’s going on?

The only thing I could find that might be related is this github issue, but even that seems like it might not be the same error: [2.0rc1][nightly-test] long_running_distributed_pytorch_pbt_failure failed · Issue #27709 · ray-project/ray · GitHub

Traceback (most recent call last):
  File ".../main.py", line 499, in <module>
    main(args, args.num_cpus, group=args.experiment_group, name=args.experiment_name, ray_local_mode=args.ray_local_mode)
  File ".../main.py", line 475, in main
    tune.run(experiments, callbacks=callbacks, raise_on_failed_trial=False)
  File ".../lib/python3.9/site-packages/ray/tune/tune.py", line 427, in run
    return ray.get(remote_future)
  File ".../lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 104, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File ".../lib/python3.9/site-packages/ray/util/client/api.py", line 42, in get
    return self.worker.get(vals, timeout=timeout)
  File ".../lib/python3.9/site-packages/ray/util/client/worker.py", line 434, in get
    res = self._get(to_get, op_timeout)
  File ".../lib/python3.9/site-packages/ray/util/client/worker.py", line 462, in _get
    raise err
types.RayTaskError(TuneError): ray::run() (pid=42004, ip=10.31.143.135)
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 964, in _on_training_result
    self._process_trial_results(trial, result)
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 1048, in _process_trial_results
    decision = self._process_trial_result(trial, result)
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 1103, in _process_trial_result
    self._callbacks.on_trial_result(
  File ".../lib/python3.9/site-packages/ray/tune/callback.py", line 329, in on_trial_result
    callback.on_trial_result(**info)
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 529, in on_trial_result
    self._sync_trial_dir(trial, force=False, wait=False)
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 494, in _sync_trial_dir
    sync_process.wait()
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 127, in wait
    raise exception
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 108, in entrypoint
    result = self._fn(*args, **kwargs)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 64, in sync_dir_between_nodes
    return _sync_dir_between_different_nodes(
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 176, in _sync_dir_between_different_nodes
    return ray.get(unpack_future)
ray.exceptions.RayTaskError: ray::_unpack_from_actor() (pid=256724, ip=10.31.143.135)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 393, in _unpack_from_actor
    for buffer in _iter_remote(pack_actor):
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 354, in _iter_remote
    buffer = ray.get(actor.next.remote())
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::_PackActor.__init__() (pid=243457, ip=10.31.141.53, repr=<ray.tune.utils.file_transfer._PackActor object at 0x2bb728e38c70>)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 314, in __init__
    self.stream = _pack_dir(source_dir=source_dir, files_stats=files_stats)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 278, in _pack_dir
    tar.add(os.path.join(source_dir, key), arcname=key)
  File ".../lib/python3.9/tarfile.py", line 1988, in add
    self.addfile(tarinfo, f)
  File ".../lib/python3.9/tarfile.py", line 2016, in addfile
    copyfileobj(fileobj, self.fileobj, tarinfo.size, bufsize=bufsize)
  File ".../lib/python3.9/tarfile.py", line 249, in copyfileobj
    raise exception("unexpected end of data")
OSError: unexpected end of data

During handling of the above exception, another exception occurred:

ray::run() (pid=42004, ip=10.31.143.135)
  File ".../lib/python3.9/site-packages/ray/tune/tune.py", line 722, in run
    runner.step()
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 872, in step
    self._wait_and_handle_event(next_trial)
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 851, in _wait_and_handle_event
    raise TuneError(traceback.format_exc())
ray.tune.error.TuneError: Traceback (most recent call last):
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 839, in _wait_and_handle_event
    self._on_training_result(
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 964, in _on_training_result
    self._process_trial_results(trial, result)
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 1048, in _process_trial_results
    decision = self._process_trial_result(trial, result)
  File ".../lib/python3.9/site-packages/ray/tune/execution/trial_runner.py", line 1103, in _process_trial_result
    self._callbacks.on_trial_result(
  File ".../lib/python3.9/site-packages/ray/tune/callback.py", line 329, in on_trial_result
    callback.on_trial_result(**info)
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 529, in on_trial_result
    self._sync_trial_dir(trial, force=False, wait=False)
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 494, in _sync_trial_dir
    sync_process.wait()
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 127, in wait
    raise exception
  File ".../lib/python3.9/site-packages/ray/tune/syncer.py", line 108, in entrypoint
    result = self._fn(*args, **kwargs)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 64, in sync_dir_between_nodes
    return _sync_dir_between_different_nodes(
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 176, in _sync_dir_between_different_nodes
    return ray.get(unpack_future)
ray.exceptions.RayTaskError: ray::_unpack_from_actor() (pid=256724, ip=10.31.143.135)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 393, in _unpack_from_actor
    for buffer in _iter_remote(pack_actor):
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 354, in _iter_remote
    buffer = ray.get(actor.next.remote())
ray.exceptions.RayActorError: The actor died because of an error raised in its creation task, ray::_PackActor.__init__() (pid=243457, ip=10.31.141.53, repr=<ray.tune.utils.file_transfer._PackActor object at 0x2bb728e38c70>)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 314, in __init__
    self.stream = _pack_dir(source_dir=source_dir, files_stats=files_stats)
  File ".../lib/python3.9/site-packages/ray/tune/utils/file_transfer.py", line 278, in _pack_dir
    tar.add(os.path.join(source_dir, key), arcname=key)
  File ".../lib/python3.9/tarfile.py", line 1988, in add
    self.addfile(tarinfo, f)
  File ".../lib/python3.9/tarfile.py", line 2016, in addfile
    copyfileobj(fileobj, self.fileobj, tarinfo.size, bufsize=bufsize)
  File ".../lib/python3.9/tarfile.py", line 249, in copyfileobj
    raise exception("unexpected end of data")
OSError: unexpected end of data

Hm, interesting. Is there anything that could cause the tarfile to be truncated? i.e. disk is full?

cc @bveeramani, have you seen this error before in tune?

I doubt the disk is full, though it could be that slurm is imposing some kind of limit somewhere. Where exactly would that tarfile be saved on disk? Or generally, where does Ray store temporary files? The tarfile is not one I’m intentionally creating, I’m just passing RLlib trainables to tune.run.

Also, I just had another run crash with the same error message, this time after just a couple of hours.

I’ve asked the Tune team to chime in, but I’m guessing it’s using tarfiles to sync a directory across nodes from _sync_dir_between_different_nodes in the stacktrace.

Also, I just had another run crash with the same error message, this time after just a couple of hours.

This is at least good, we may not have to wait a full day to get a chance to see this again :slight_smile:

From reading ray/file_transfer.py at b0edf21a0cf83b4cd6c1876ed69ead1b0787adc5 · ray-project/ray · GitHub, looks like Tune is creating an actor on the sending node and receiving node, then streaming bytes which are then unpacked via tar.

Notably, the bytes never touch disk on the destination node until after they’re unpacked by tar. So not likely a disk-space issue on the target node.

Can we narrow down exactly what files are being transferred? I’m afraid of the case where the directory is changed during the tar process on the source node.

I’m re-running this now and logging df -h before and after I calll the main script, if it is indeed a disk space issue that might tell us where.

Anything I can do to help figure that out?

Also, all the nodes have access to a shared filesystem. Is it worth a try passing that to ray start with --storage?

Hmm I don’t think the --storage flag will help, since the syncing code is checking by IP. Could be wrong, though.

Okay, looks like there’s a dedicated Tune callback for this behavior: SyncerCallback ray/syncer.py at b0edf21a0cf83b4cd6c1876ed69ead1b0787adc5 · ray-project/ray · GitHub

Can you share your code so we can see why this codepath is being triggered / if it’s expected? It looks like it’s syncing the trial results back from worker nodes to the head node. With the code we can confirm this and narrow down why it’s failing to sync (or you can turn it off if you don’t need it).

OK, I’ve tried to clean up our code to narrow it down to only the relevant (but hopefully all the relevant) parts. I’ve removed lots of CLI arg parsing code etc., but I can share that too if the below isn’t clearing things up.

Basically what we do is:

  1. We register a custom RL environment.
  2. We set up a WandbLoggerCallback for tune.
  3. We create a list of Experiment objects.
  4. We pass that list of experiments to tune.run().

One thing to note is that we enable checkpointing in the experiments. In this particular context I could disable that as a temporary workaround. But in general we do checkpoints for some experiments where we want to use trained weights for other things downstream. We don’t necessarily need the checkpoints synced to the head node though, it’s all a shared filesystem anyway.

I suspect it could be either the checkpointing or the wandb logger callback that might trigger the syncer callback?

import copy
import os
from uuid import uuid4

import numpy as np
import ray

from ray import tune
from ray.air.callbacks.wandb import WandbLoggerCallback

from my_project import env_creator, get_trainer_and_config, parse_cli_args


args = parse_cli_args()

# Init ray
ray.init(address=args.ray_address, include_dashboard=False)

# Register custom RLlib env
tune.register_env("my_env", lambda config: env_creator(config))

# Set local folder
curr_folder = os.path.dirname(os.path.realpath(__file__))
local_dir = curr_folder + "/ray_results/" + uuid4().hex + "/"

# Set up Weights And Biases logging if API key is set in environment variable.
if "WANDB_API_KEY" in os.environ:
    callbacks = [
        WandbLoggerCallback(
            project=args.project,
            api_key=os.environ["WANDB_API_KEY"],
            log_config=True,
            resume=False,
            group=args.group,
            entity="harvardparkesateams",
        )
    ]
else:
    callbacks = []
    print("No wandb API key specified, running without wandb logging.")


# Set up list of experiments
experiments = []
for config_str in args.config_list:
    trainer, config, stop = get_trainer_and_config(config_str)
    for i in range(args.num_seeds):
        this_config = copy.deepcopy(config)
        this_config["seed"] = i
        exp = tune.Experiment(
            f"exp_{config_str}_{i}",
            run=trainer,
            config=this_config,
            stop=stop,
            local_dir=local_dir,
            checkpoint_at_end=True,
            checkpoint_freq=25,
            max_failures=3,
        )
        experiments.append(exp)

# Run experiments
tune.run(experiments, callbacks=callbacks, raise_on_failed_trial=False)

# Shut down ray
ray.shutdown()

Thanks for sharing the code, this makes it easier to think about. I am not an expert on Tune but this usage looks totally fine to me.

I mean, I don’t think the long term answer is to disable syncing here. You can do that for a workaround as you say. I don’t have a lot of time today to work on this unfortunately, but I think the next step is to dump more information when this error happens. So, write an exception handler for this particular OSError around the syncer actor and when it occurs, copy the directory-to-be-copied and other important details (like the list of files the actor is copying) to somewhere safe for manual inspection. Then we can run the syncer actor manually on that directory and discover if there is a bug in the actor code due to the contents of the directory, or we can find that the directory changed during streaming.