SIGABRT error comes out

How severe does this issue affect your experience of using Ray?

  • High: It blocks me to complete my task.
    I run the script below in 17 nodes and each node has 32 cores. My command is python reduce_b roadcast.py 512 1.
import re
import ray
import numpy as np
import time
import sys

@ray.remote
class s:
    def __init__(self,rank):
        self.rank = rank
    
    def set(self,data):
        start=time.perf_counter()
        self.data=data
        return time.perf_counter()-start
    
    def get(self):
        return self.data

    def ip(self):
        return ray.services.get_node_ip_address()
    
    def reduce_sum(self,rank,obj_handler):
        if rank==self.rank:
            r=0
            g=0
            a=0
            start=time.perf_counter()
            # l=[]
            # remote=time.perf_counter()
            for o in range(1,len(obj_handler)):
                # l.append(obj_handler[o].get.remote())
                remote=time.perf_counter()
                f=obj_handler[o].get.remote()
                get=time.perf_counter()
                ff=ray.get(f)
                add=time.perf_counter()
                self.data=np.copy(self.data)+ff
                endx=time.perf_counter()
                r+=get-remote
                g+=add-get
                a+=endx-add
            # get=time.perf_counter()
            # ll=ray.get(l)
            # add=time.perf_counter()
            # self.re=np.array(ll).sum(axis=0)
            end=time.perf_counter()
            return [end-start,r,g,a]
        else:
            pass

    def broad(self,rank,obj_handler):
        if self.rank == rank:
            start=time.perf_counter()
            for o in range(1,len(obj_handler)):
                obj_handler[o].set.remote(self.data)
            return time.perf_counter()-start
        else:
            pass

if __name__ == '__main__':
    rank_num=int(sys.argv[1])
    size=int(sys.argv[2])
    ray.init(address='auto')
    print(ray.cluster_resources())
    print(f'{size}',flush=True)
    data=np.ones(size)
    allstart=time.perf_counter()
    rank_list=[s.remote(i) for i in range(rank_num)]
    init=time.perf_counter()
    set_list=[ra.set.remote(data) for ra in rank_list]
    set_remote=time.perf_counter()
    unset_list=set_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait=time.perf_counter()
    st=ray.get(set_list)
    set_get=time.perf_counter()
    init1=time.perf_counter()
    re_list=[ra.reduce_sum.remote(0,rank_list) for ra in rank_list]
    set_remote1=time.perf_counter()
    unset_list=re_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait1=time.perf_counter()
    ret=ray.get(re_list)
    set_get1=time.perf_counter()
    init2=time.perf_counter()
    bo_list=[ra.broad.remote(0,rank_list) for ra in rank_list]
    set_remote2=time.perf_counter()
    unset_list=bo_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait2=time.perf_counter()
    bot=ray.get(bo_list)
    set_get2=time.perf_counter()
    init3=time.perf_counter()
    show_list=[ra.get.remote() for ra in rank_list]
    set_remote3=time.perf_counter()
    unset_list=show_list
    while len(unset_list) !=0:
        _,unset_list=ray.wait(unset_list,fetch_local=False)
    set_wait3=time.perf_counter()
    show=ray.get(show_list)
    set_get3=time.perf_counter()
    set_average_time=0
    set_remote_all=0
    set_wait_time=0
    set_get_time=0
    reduce_all_time=0
    reduce_remote_time=0
    reduce_get_time=0
    reduce_add_time=0
    reduce_sum_remote=0
    reduce_sum_wait_time=0
    reduce_sum_get_time=0
    broadcast_time=0
    broad_remote=0
    broad_wait_time=0
    broad_get_time=0
    get_remote=0
    get_wait_time=0
    get_get_time=0
    all_time=0
    for _ in range(5):
        allstart=time.perf_counter()
        rank_list=[s.remote(i) for i in range(rank_num)]
        init=time.perf_counter()
        set_list=[ra.set.remote(data) for ra in rank_list]
        set_remote=time.perf_counter()
        unset_list=set_list
        while len(unset_list) !=0:
            _,unset_list=ray.wait(unset_list,fetch_local=False)
        set_wait=time.perf_counter()
        st=ray.get(set_list)
        set_get=time.perf_counter()
        init1=time.perf_counter()
        re_list=[ra.reduce_sum.remote(0,rank_list) for ra in rank_list]
        set_remote1=time.perf_counter()
        unset_list=re_list
        while len(unset_list) !=0:
            _,unset_list=ray.wait(unset_list,fetch_local=False)
        set_wait1=time.perf_counter()
        ret=ray.get(re_list)
        set_get1=time.perf_counter()
        init2=time.perf_counter()
        bo_list=[ra.broad.remote(0,rank_list) for ra in rank_list]
        set_remote2=time.perf_counter()
        unset_list=bo_list
        while len(unset_list) !=0:
            _,unset_list=ray.wait(unset_list,fetch_local=False)
        set_wait2=time.perf_counter()
        bot=ray.get(bo_list)
        set_get2=time.perf_counter()
        init3=time.perf_counter()
        show_list=[ra.get.remote() for ra in rank_list]
        set_remote3=time.perf_counter()
        unset_list=show_list
        while len(unset_list) !=0:
            _,unset_list=ray.wait(unset_list,fetch_local=False)
        set_wait3=time.perf_counter()
        show=ray.get(show_list)
        set_get3=time.perf_counter()
        set_average_time+=np.array(st).mean()
        set_remote_all+=set_remote-init
        set_wait_time+=set_wait-set_remote
        set_get_time+=set_get-set_wait
        reduce_all_time+=ret[0][0]
        reduce_remote_time+=ret[0][1]
        reduce_get_time+=ret[0][2]
        reduce_add_time+=ret[0][3]
        reduce_sum_remote+=set_remote1-init1
        reduce_sum_wait_time+=set_wait1-set_remote1
        reduce_sum_get_time+=set_get1-set_wait1
        broadcast_time+=bot[0]
        broad_remote+=set_remote2-init2
        broad_wait_time+=set_wait2-set_remote2
        broad_get_time+=set_get2-set_wait2
        get_remote+=set_remote3-init3
        get_wait_time+=set_wait3-set_remote3
        get_get_time+=set_get3-set_wait3
        all_time+=set_get3-allstart
    print(f'set_average_time {set_average_time/5}',flush=True)
    print(f'set_remote {set_remote_all/5} set_wait_time {set_wait_time/5} set_get_time {set_get_time/5}',flush=True)
    print(f'reduce_all_time {reduce_all_time/5} reduce_remote_time {reduce_remote_time/5} reduce_get_time {reduce_get_time/5} reduce_add_time {reduce_add_time/5}',flush=True)
    print(f'reduce_sum_remote {reduce_sum_remote/5} reduce_sum_wait_time {reduce_sum_wait_time/5} reduce_sum_get_time {reduce_sum_get_time/5}',flush=True)
    print(f'broadcast_time {broadcast_time/5}',flush=True)
    print(f'broad_remote {broad_remote/5} broad_wait_time {broad_wait_time/5} broad_get_time {broad_get_time/5}',flush=True)
    print(f'get_remote {get_remote/5} get_wait_time {get_wait_time/5} get_get_time {get_get_time/5}',flush=True)
    print(f'all_time {all_time/5}',flush=True)
    ray.shutdown()

there is no time log outputting. And error comes out. The log is here

After I change for _ in range(5) to for _ in range(1), it works fine and no error comes out.
How does it happen?

Could you try

@ray.remote(num_cpus=1)
class s:
   ...