The pending tasks/actors remain on Ray Cluster when the driver die unexpected

Hi there,

I summited a ray job via a python script on the terminal. I know if I press ctrl+c to kill the process then the pending tasks/actors will be removed on the Ray cluster(Killing driver does not kill tasks in Ray on minikube - #8 by Alex).

However, if the python program exit unexpectedly then the remaining pending tasks/actors will live on the Ray cluster forever. This issue will also cause the Ray cluster never scale down.

Example code as shown

from time import sleep
import ray


@ray.remote
def go():
    sleep(10)
    1 / 0
    return "OK"


objs = [go.remote() for _ in range(50)]
print(ray.get(objs))

I also found some discussions on StackOverflow: python - How to kill ray tasks when the driver is dead - Stack Overflow

Hey @Andrew_Li , good question. I am afraid ray is not able to clean up this properly with a dead driver that exited in error.

You might have to reconnect to the cluster and manually clean up the actors/tasks there.

@sangcho Could you confirm this is the case, and do you know if there is anything in ray job tooling that we could deal with such unexpected failure in a cleaner way?

Thanks @rickyyx ,

I would lose every python ObjectRef since the drive exited in error. How can I clean up the pending actors/tasks with ray.cancel() or ray.kill() after reconnecting to the cluster? Do you have any tips?

One more question, is it possible to re-create the ObjectRef by the string? e.g.:

@ray.remote
def go():
    ...


obj = go.remote()

# obj ---> ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

new_obj = ray.create_objectref_from_string("c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000")

No, you cannot re-create the ObjectRef from a string.

Actually when I look at your example code, why would driver exit unexpectedly? 1 / 0 happens inside a remote function so it will only crash the worker processes.

Do you have a complete repro that I can try out?

I’m a bit late to the party but I’m having a similar issue. How exactly do you clean up tasks when you’ve lost the object_ref? I can use the CLI to discover the the TASK_ID (it’s stuck in scheduled) but as far as I can tell there isn’t a way to kill the task. Am I supposed to manually destroy the cluster whenever I get an orphaned task?

In my case - the tasks failed were orphaned when the following happened:

  1. The autoscaler ran into an Azure resource limit and failed to scale from 0 → n workers
  2. Network connectivity from outside the cluster was interrupted so the external ray submit cluster.yaml test.py disconnected

Ran into the same issue; this is somewhat problematic from a sysadmin point of view. Are the users expected to destroy and bring up the cluster themselves…?

1 Like

They are expected to be cleaned up automatically. If not that’s a bug we should fix.

@idantene @joshv how did you guys verify there was a leak? Is it from the output of ray status?

Actually I could easily reproduce it by

import ray
ray.init()
@ray.remote
def f():
    import time
    time.sleep(300)

refs = [f.remote() for _ in range(30000)]
ray.get(refs)

And ctrl + C → call ray status. Interestingly, when I checked raylet, it doesn’t have any pending requests. So I think it is a reporting problem (but the tasks are actually not leaked).

Let me create an issue and we will prioritize fixing it.

Thanks @sangcho!

My current workaround is to disable any CTRL+C in the code and maintaining a list of tasks that potentially should be cancelled:

def cleanup_ray(task_list):
    """Shuts ray connection down and cancels any outstanding tasks"""
    import signal

    def _noop(signum, frame):
        return

    # Do not allow CTRL+C while cleaning-up
    signal.signal(signal.SIGINT, _noop)

    def _cleanup():
        logger.info("Cleaning up Ray runtime...")
        for task in task_list:
            try:
                ray.cancel(task, force=True)
            except TaskCancelledError:
                continue
        ray.shutdown()
    return _cleanup

# Elsewhere in the code
# Task references are pushed/popped to/from this list as
# they're created and completed
tasks_to_cancel = list()
ray.init(address=f"ray://{RAY_HEAD_NODE}")
atexit.register(cleanup_ray(tasks_to_cancel))

Hi @idantene, as @sangcho mentioned, it should not be necessary to handle ctrl+c or task cleanup yourself, as Ray should do this for you (and if not it’s a bug). It may happen asynchronously, but any tasks or objects should get cleaned up within seconds.

I tried this myself on the script that you provided by killing the driver and confirmed that all of the workers do get cleaned up. But if you have more info on how to reproduce what you’re seeing, that would be great. Thanks!

ps (base) swang@swang-X1-Carbon:~/ray$ ps -ef | grep test.py
swang    1421215 1394970 43 10:14 pts/4    00:00:01 python test.py
swang    1421293 1421224  0 10:14 pts/8    00:00:00 grep --color=auto test.py
(base) swang@swang-X1-Carbon:~/ray$ ps -ef | grep ray::  # These are the workers.
swang    1421363 1421295 33 10:14 pts/4    00:00:01 ray::go()
swang    1421364 1421295 34 10:14 pts/4    00:00:01 ray::go()
swang    1421365 1421295 32 10:14 pts/4    00:00:01 ray::go()
swang    1421366 1421295 33 10:14 pts/4    00:00:01 ray::go()
swang    1421367 1421295 32 10:14 pts/4    00:00:01 ray::go()
swang    1421368 1421295 33 10:14 pts/4    00:00:01 ray::go()
swang    1421369 1421295 35 10:14 pts/4    00:00:01 ray::go()
swang    1421370 1421295 33 10:14 pts/4    00:00:01 ray::go()
swang    1421631 1421224  0 10:14 pts/8    00:00:00 grep --color=auto ray::
(base) swang@swang-X1-Carbon:~/ray$ kill -9 1421215
(base) swang@swang-X1-Carbon:~/ray$ ps -ef | grep ray::
swang    1421635 1421224  0 10:14 pts/8    00:00:00 grep --color=auto ray::

Just re quoting how we discovered the issue during some early testing in case it got missed in the earlier discussion. To expand on the resource limit issue - the specific issue was that we had a hard limit on public IP’s and the autoscaler failed to scale from 0 → N workers.