Handling Exceptions from list of tasks using ray.get

What is the correct way of handling an exception from a a list of tasks executed with ray.get?

ray.init()


@ray.remote(max_retries=5)
def f(i):
    try:
        save(i)
    except Exception:
        raise Exception

 x = ray.get[f.remote(i) for i in range(20)]

whereby it is possible that one of the tasks in f could raise an exception, but you would like the other tasks to complete.

A use case for this would be saving data to disk. If a save from one of the tasks fails for a dataset in one of the task workers, then a solution would be to retry the function.

What would be the best way to handle this problem. Also I am not referring to a workercrasherror, but I guess it can be thought of as inevitably crashing when an Exception is raised.

I have tried:

try:
    x = ray.get([f.remote(i) for i in range(20)])
except RayError as e: 
    print(e.pid) 

However, I only every get back the pid of 1 of the tasks that fails and not multiple pid(s) if more than one task raises and Exception.

Have you tried using ray.wait? E.g.,

# There will be at max `num_returns` length of ready per each function call
ready, unready = ray.wait([f.remote(i) for i in range(20)], num_returns=1) 
while unready:
    try:
        ray.get(ready)
    except Execption as e:
        # do whatever you want
    ready, unready = ray.wait(unready, num_returns=1) 

I am not sure this would solve the problem.

If the exception occurs, would it be assigned to unready. If so, this would trigger an infinite loop.

Otherwise, I am unsure how to gracefully catch an errors from 1 or more tasks in a call to ray.get[list_of_tasks]

If the exception occurs, would it be assigned to unready . If so, this would trigger an infinite loop.

No, if the object ref contains the exception, it is considered as ready (ready, but has the error message as a content <= impl detail).

Otherwise, I am unsure how to gracefully catch an errors from 1 or more tasks in a call to ray.get[list_of_tasks]

As I mentioned in the code block, setting num_returns=1 will always ensure that the length of ready is always 1. So you can handle each object ref using this way.

Wouldn’t this approach defeat the benefits of parallelism?

ray.wait will return the future that is ready for the first time, and ray.get is guaranteed to return right away because ray.wait prepares the object locally already. It is a standard pattern in ray. Also note that num_returns is 1 by default (so it is not different from using regular ray.wait). Which part do you think this defeats the benefit of parallelism?

I was thinking num_returns=1

num_returns is 1 by default. So it should work in the same way as using regular ray.wait (note all tasks are running in parallel in the background already. You just bring 1 “ready” object locally using ray.wait)