How to identify cause of "Exit signal" killing active worker

I have a Ray Task performing a minute-long computation and then returning a small ~8MB result. In my program I batch my long input into chunks for input into this Ray Task:

for input_chunk in chunker(all_inputs, chunk_size=100):
    tasks = fancy_ray_function.remote(input_chunk)
    results = ray.get(tasks)

A few chunks in, my script hangs/freezes waiting for ray.get() to return a value. Looking at the logs, I’ve found the Task is completing successfully, but then a Exit Signal removes the worker, which I assume is the cause of the freezing/hanging:

20[2021-09-24 13:52:16,354 I 460 460] core_worker.cc:2332: Finished executing task 25de4c7fa0336f7effffffffffffffffffffffff13000000, status=OK
21[2021-09-24 13:52:31,084 I 460 476] core_worker.cc:769: Exit signal received, this process will exit after all outstanding tasks have finished, exit_type=IDLE_EXIT
22[2021-09-24 13:52:31,085 I 460 460] core_worker.cc:325: Removed worker 8d36544720137d36b4c848a4ef61ec06e4b155f538bf468019523488
23[2021-09-24 13:52:31,088 I 460 460] core_worker.cc:197: Destructing CoreWorkerProcess. pid: 460
24[2021-09-24 13:52:31,088 I 460 460] io_service_pool.cc:47: IOServicePool is stopped.

This feels like a race condition, but I can’t tell if it’s an actual bug or something wrong with my use of Ray. How can I determine where this Exit signal is coming from?

Workaround

To workaround this intermittent failure, I could set a timeout in ray.get() and resubmit the failing input chunks. However, I’d still like to get to the core of the problem.

Looking at the Raylet logs:

312[2021-09-24 13:52:31,085 I 62 62] node_manager.cc:1194: NodeManager::DisconnectClient, disconnect_type=0, has creation task exception = 0

I think the problem might be how I’m submitting and collecting results. I think I should be chunking multiple submissions together, instead of one at a time.

21[2021-09-24 13:52:31,084 I 460 476] core_worker.cc:769: Exit signal received, this process will exit after all outstanding tasks have finished, exit_type=IDLE_EXIT

This means your worker is exitted because it was idle for some time (when there are more than num_cpus processes). I think your result still should be available even after the worker exits.

If you collect the result by ray.wait instead of ray.get, would that solve the problem? for example,

for input_chunk in chunker(all_inputs, chunk_size=100):
    tasks = fancy_ray_function.remote(input_chunk)
    # Instead of this
    # results = ray.get(tasks)
    unready = tasks
    while unready:
        ready, unready = ray.wait(tasks)
        ray.get(ready)

This will help you fetching the result 1 by 1 instead of all together, which can stabilize performance (e.g., avoiding straggling task result or fetching large objects all at once)

A small fix to your code:

for input_chunk in chunker(all_inputs, chunk_size=100):
    tasks = fancy_ray_function.remote(input_chunk)
    # Instead of this
    # results = ray.get(tasks)
    unready = tasks
    while unready:
        ready, unready = ray.wait(unready)
        ray.get(ready)

Solved the problem for me.

Another approach is to use concurrent.futures:

import concurrent.futures
import random
import time

import ray

@ray.remote
def append_a(in_str):
    print(in_str)
    time.sleep(random.uniform(0, 1.5))
    return f"{in_str}a"


ray.init()

with concurrent.futures.ThreadPoolExecutor(max_workers=80) as threader:
    inputs = ["a", "b", "c", "d", "e", "f", "g"]
    outputs = []

    submitted = threader.map(append_a.remote, inputs)

    futures = {task.future(): inp for task, inp in zip(submitted, inputs)}

    for future in concurrent.futures.as_completed(futures):
        outputs.append((futures[future], future.result()))

print(outputs)