Ray worker nodes won't scale down

How severe does this issue affect your experience of using Ray?
-Medium: AWS is expensive, idle nodes that arent killed immediately is pricey.

I am using AWS instances and scaling them for some long running jobs.
I am setting up python31012 with the latest ray==2.7

I am able to set up the cluster, with 20 ish worker nodes and submitting 30 tasks. I submit tasks by opening a jupyter notebook on a GCP (external server, for clarity) and connecting to ray cluster via ray.init() with the ray://<aws_cluster:10001>

I submit 30 remote tasks to the ray cluster.
It takes a few minutes to spawn 20 workers. But when it does it retrieves 20 tasks and calculates them (maybe an hour ish each).

In my jupyter notebook, I am checking every 1 minute for finished tasks and downloading the results.

As the 20 workers finish their first 20 jobs (remaining 10 to be calculate), the 10 is retrieved and 10 workers start crunching numbers.

However, the remaining 10 nodes, who have nothing to do, (out of tasks) do not get removed.
The idle_timeout_minutes: 0.1 so it should be immediate. (no number works)

In checking the memory, it seems that there is there is an ObjectReference holding the results in the node it was calcualting on. And therefore the node doesn’t think it is idling??

Whatever the problem, to have 10 nodes idling till ALL tasks are completed and I close the ray on the GCP server via ray.close() is 10 nodes x 1 hour == (very pricey)

I have tried setting TTL for results (doesnt seem to exist)? Googling suggests some ObjectReference counter deletion, but my understanding is that deleting object reference in GCP wont help (wrong server).

The following also doesnt work: (since i know which tasks are completed I can try to cancel them)
ray.cancel(task_ref, force=True)

THis issue is driving me nuts. I collect my results immediately, I need to close the idle node immediately.

If we rescale the problem to 5 tasks with 4 workers I create the following:

Input Tasks from my GCP server (external connecting to AWS cluster via ray.init()):

## Task Submissions
all_tasks = []
for i in range(0, 5):
   task = myfunction.remote(i)
   all_tasks.append(task)

## Wait for tasks and save results
while len(processed_and_saved_tasks) < len(all_tasks):
    completed_tasks, still_running_tasks = ray.wait(all_tasks, num_returns=len(all_tasks), timeout=0)
    for task in completed_tasks:
        if task in processed_and_saved_tasks:
            continue
        try:
            data = ray.get(task) # loading of a job is a bit slow... :(
        except Exception as e:
            # if the process has an error, in the code, it gets elevated here??
            print(e)
            continue
         save_my_data(data)
        processed_and_saved_tasks.append(task)
    time.sleep(10)
print('Saved and completed!')

After saving, the node’s idle. But the memory object for that node is used…

Here is the Resources/Memory of the Ray CLuster on AWS for a 1 head, 3 worker system after all tasks completed.

Note the usage from “ray status” sets the usage at 0/4 CPU.

Resources

Usage:
0.0/4.0 CPU
0B/21.10GiB memory
1.34MiB/8.76GiB object_store_memory

Demands:
(no resource demands)

======== Object references status: 2023-11-06 15:29:54.109080 ========
Grouping by node address… Sorting by object size… Display allentries per group…

— Summary for node address: XXX.XXX.XXX —
Mem Used by Objects Local References Pinned Used by task Captured in Objects Actor Handles
1405714.0 B 6, (1405714.0 B) 0, (0.0 B) 0, (0.0 B) 0, (0.0 B) 0, (0.0 B)

— Object references for node address: 172.31.25.63 —
IP Address PID Type Call Site Status Size Reference Type Object Ref
XXX.31.25.63 97624 Driver disabled FINISHED 1304.0 B LOCAL_REFERENCE 00ffffffffffffffffffffffffffffffffffffff0500000001e1f505

XXX.31.25.63 97624 Driver disabled FINISHED 280882.0 B LOCAL_REFERENCE 3ca0590f168eaec5ffffffffffffffffffffffff0500000001000000

XXX.31.25.63 97624 Driver disabled FINISHED 280882.0 B LOCAL_REFERENCE 68d7b3a94be6e983ffffffffffffffffffffffff0500000001000000

XXX.31.25.63 97624 Driver disabled FINISHED 280882.0 B LOCAL_REFERENCE 983a6ae24ac09041ffffffffffffffffffffffff0500000001000000

XXX.31.25.63 97624 Driver disabled FINISHED 280882.0 B LOCAL_REFERENCE 4482c0d3e15a41a8ffffffffffffffffffffffff0500000001000000

XXX.31.25.63 97624 Driver disabled FINISHED 280882.0 B LOCAL_REFERENCE 9f79440f8f098da0ffffffffffffffffffffffff0500000001000000

To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1

— Aggregate object store stats across all nodes —
Plasma memory usage 2 MiB, 9 objects, 0.02% full, 0.01% needed
Objects consumed by Ray tasks: 6 MiB.

For future peeps:

  1. It is doing object reference counting, even from the GCP local server. Deletion of ALL references of the constructed tasks, does start clearing nodes.

However, I am curious as to whether or not you can specify in the cluster.yaml setting to put all return calculated values into the HEAD node – allowing the worker node to die, while saving the results.
Understandably this can cause spillage – but should still be an allowable setting to make to decrease the chance of long-standing idle worker nodes.

Hi @bopbopbopbopbop objects have meta and the physical data. One of them might prevent the node from being deleted.

One way to do this might be to retrieve the object and remote the object ref in the place where it’s used.

More effort is putting to make the node able to scale down even there is tasks.