Out of disk space with fallocate

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.
    I use Ray 1.11.0. I start a head node with ray start --node-ip-address=$ip_prefix --block --head --port=6379 --num-cpus=0`` and two worker nodes with ray start --block --address=$ip_head --num-cpus=32. Each node has 100G memory.

I try to run a script that uses Ray to implement reduce. size is 12810241024 and rank_num is 64.

import ray
import numpy as np
import time
import sys
import os

@ray.remote(num_cpus=1)
class s:
    def __init__(self,rank):
        self.rank = rank
    
    def set(self,data):
        start=time.perf_counter()
        self.data=data
        return time.perf_counter()-start
    
    def get(self):
        return self.data

    def reduce_sum(self,rank,obj_handler):
        if rank==self.rank:
            r=0
            g=0
            a=0
            start=time.perf_counter()
            for o in range(1,len(obj_handler)):
                remote=time.perf_counter()
                f=obj_handler[o].get.remote()
                get=time.perf_counter()
                ff=ray.get(f)
                add=time.perf_counter()
                # self.data=np.copy(self.data)+ff
                endx=time.perf_counter()
                r+=get-remote
                g+=add-get
                a+=endx-add
            end=time.perf_counter()
            return [end-start,r,g,a]
        else:
            pass

    def broad(self,rank,obj_handler):
        if self.rank == rank:
            start=time.perf_counter()
            for o in range(1,len(obj_handler)):
                obj_handler[o].set.remote(self.data)
            return time.perf_counter()-start
        else:
            pass

if __name__ == '__main__':
    rank_num=int(sys.argv[1])
    size=int(sys.argv[2])
    ray.init(address='auto')
    print(ray.cluster_resources())
    print(f'{size}',flush=True)
    data=np.ones(size)
    print(data.nbytes)
    allstart=time.perf_counter()
    rank_list=[s.remote(i) for i in range(rank_num)]
    init=time.perf_counter()
    set_list=[ra.set.remote(data) for ra in rank_list]
    set_remote=time.perf_counter()
    unset_list=set_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait=time.perf_counter()
    st=ray.get(set_list)
    set_get=time.perf_counter()
    init1=time.perf_counter()
    re_list=[ra.reduce_sum.remote(0,rank_list) for ra in rank_list]
    set_remote1=time.perf_counter()
    unset_list=re_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait1=time.perf_counter()
    ret=ray.get(re_list)
    set_get1=time.perf_counter()
    init3=time.perf_counter()
    show_list=[ra.get.remote() for ra in rank_list]
    set_remote3=time.perf_counter()
    unset_list=show_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait3=time.perf_counter()
    show=ray.get(show_list)
    set_get3=time.perf_counter()
    set_average_time=0
    set_remote_all=0
    set_wait_time=0
    set_get_time=0
    reduce_all_time=0
    reduce_remote_time=0
    reduce_get_time=0
    reduce_add_time=0
    reduce_sum_remote=0
    reduce_sum_wait_time=0
    reduce_sum_get_time=0
    broadcast_time=0
    broad_remote=0
    broad_wait_time=0
    broad_get_time=0
    get_remote=0
    get_wait_time=0
    get_get_time=0
    all_time=0
    loop=2
    print(set_get3-allstart)
    for _ in range(loop):
        allstart=time.perf_counter()
        # rank_list=[s.remote(i) for i in range(rank_num)]
        init=time.perf_counter()
        set_list=[ra.set.remote(data) for ra in rank_list]
        set_remote=time.perf_counter()
        set_wait=time.perf_counter()
        st=ray.get(set_list)
        set_get=time.perf_counter()
        init1=time.perf_counter()
        re_list=[ra.reduce_sum.remote(0,rank_list) for ra in rank_list]
        set_remote1=time.perf_counter()
        set_wait1=time.perf_counter()
        ret=ray.get(re_list)
        set_get1=time.perf_counter()
        init3=time.perf_counter()
        show_list=[ra.get.remote() for ra in rank_list]
        set_remote3=time.perf_counter()
        set_wait3=time.perf_counter()
        show=ray.get(show_list)
        set_get3=time.perf_counter()
        set_average_time+=np.array(st).mean()
        set_remote_all+=set_remote-init
        set_wait_time+=set_wait-set_remote
        set_get_time+=set_get-set_wait
        reduce_all_time+=ret[0][0]
        reduce_remote_time+=ret[0][1]
        reduce_get_time+=ret[0][2]
        reduce_add_time+=ret[0][3]
        reduce_sum_remote+=set_remote1-init1
        reduce_sum_wait_time+=set_wait1-set_remote1
        reduce_sum_get_time+=set_get1-set_wait1
        get_remote+=set_remote3-init3
        get_wait_time+=set_wait3-set_remote3
        get_get_time+=set_get3-set_wait3
        all_time+=set_get3-allstart
        print(set_get3-allstart)
    print(f'set_average_time {set_average_time/loop}',flush=True)
    print(f'set_remote {set_remote_all/loop} set_wait_time {set_wait_time/loop} set_get_time {set_get_time/loop}',flush=True)
    print(f'reduce_all_time {reduce_all_time/loop} reduce_remote_time {reduce_remote_time/loop} reduce_get_time {reduce_get_time/loop} reduce_add_time {reduce_add_time/loop}',flush=True)
    print(f'reduce_sum_remote {reduce_sum_remote/loop} reduce_sum_wait_time {reduce_sum_wait_time/loop} reduce_sum_get_time {reduce_sum_get_time/loop}',flush=True)
    print(f'get_remote {get_remote/loop} get_wait_time {get_wait_time/loop} get_get_time {get_get_time/loop}',flush=True)
    print(f'all_time {all_time/loop}',flush=True)

The script run failed. The output shows below:

e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,227 E 78271 78304] (raylet) dlmalloc.cc:198: Out of disk space with fallocate error: No space left on device
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,227 E 78271 78304] (raylet) object_lifecycle_manager.cc:212: Plasma fallback allocator failed, likely out of disk space.
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,416 E 78271 78304] (raylet) dlmalloc.cc:198: Out of disk space with fallocate error: No space left on device
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,416 E 78271 78304] (raylet) object_lifecycle_manager.cc:212: Plasma fallback allocator failed, likely out of disk space.
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,626 E 78271 78304] (raylet) dlmalloc.cc:198: Out of disk space with fallocate error: No space left on device
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,626 E 78271 78304] (raylet) object_lifecycle_manager.cc:212: Plasma fallback allocator failed, likely out of disk space.
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,861 E 78271 78304] (raylet) dlmalloc.cc:198: Out of disk space with fallocate error: No space left on device
e[2me[33m(raylet)e[0m [2022-09-07 16:21:20,861 E 78271 78304] (raylet) object_lifecycle_manager.cc:212: Plasma fallback allocator failed, likely out of disk space.
e[2me[33m(raylet)e[0m [2022-09-07 16:21:21,118 E 78271 78304] (raylet) dlmalloc.cc:198: Out of disk space with fallocate error: No space left on device
e[2me[33m(raylet)e[0m [2022-09-07 16:21:21,118 E 78271 78304] (raylet) object_lifecycle_manager.cc:212: Plasma fallback allocator failed, likely out of disk space.
e[2me[33m(raylet)e[0m [2022-09-07 16:21:21,393 E 78271 78304] (raylet) dlmalloc.cc:198: Out of disk space with fallocate error: No space left on dev

It confused me. I think my script can’t consume so much memory. Do I use ray in the wrong way? How can I reduce the memory consume?

Hmm it’s a bit hard to tell what’s going on without more information.

I tried running the script on my laptop and I believe you should expect about 2x the memory usage of the total objects. Before the script exits, can you run ray memory and provide the output? Also, what is the disk size on each node, including the head node?

I’d guess that this is probably happening because of the ray.get call in the driver that fetches the data from each worker. The show list from the previous loop is still in scope, so the next time you call it, you need at least 64GB * 2 = 128GB of memory on the driver node to hold all of the results at once.