How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I run the script below in 17 nodes and each node has 32 cores. My command ispython reduce_b roadcast.py 512 1
.
import re
import ray
import numpy as np
import time
import sys
@ray.remote
class s:
def __init__(self,rank):
self.rank = rank
def set(self,data):
start=time.perf_counter()
self.data=data
return time.perf_counter()-start
def get(self):
return self.data
def ip(self):
return ray.services.get_node_ip_address()
def reduce_sum(self,rank,obj_handler):
if rank==self.rank:
r=0
g=0
a=0
start=time.perf_counter()
# l=[]
# remote=time.perf_counter()
for o in range(1,len(obj_handler)):
# l.append(obj_handler[o].get.remote())
remote=time.perf_counter()
f=obj_handler[o].get.remote()
get=time.perf_counter()
ff=ray.get(f)
add=time.perf_counter()
self.data=np.copy(self.data)+ff
endx=time.perf_counter()
r+=get-remote
g+=add-get
a+=endx-add
# get=time.perf_counter()
# ll=ray.get(l)
# add=time.perf_counter()
# self.re=np.array(ll).sum(axis=0)
end=time.perf_counter()
return [end-start,r,g,a]
else:
pass
def broad(self,rank,obj_handler):
if self.rank == rank:
start=time.perf_counter()
for o in range(1,len(obj_handler)):
obj_handler[o].set.remote(self.data)
return time.perf_counter()-start
else:
pass
if __name__ == '__main__':
rank_num=int(sys.argv[1])
size=int(sys.argv[2])
ray.init(address='auto')
print(ray.cluster_resources())
print(f'{size}',flush=True)
data=np.ones(size)
allstart=time.perf_counter()
rank_list=[s.remote(i) for i in range(rank_num)]
init=time.perf_counter()
set_list=[ra.set.remote(data) for ra in rank_list]
set_remote=time.perf_counter()
unset_list=set_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait=time.perf_counter()
st=ray.get(set_list)
set_get=time.perf_counter()
init1=time.perf_counter()
re_list=[ra.reduce_sum.remote(0,rank_list) for ra in rank_list]
set_remote1=time.perf_counter()
unset_list=re_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait1=time.perf_counter()
ret=ray.get(re_list)
set_get1=time.perf_counter()
init2=time.perf_counter()
bo_list=[ra.broad.remote(0,rank_list) for ra in rank_list]
set_remote2=time.perf_counter()
unset_list=bo_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait2=time.perf_counter()
bot=ray.get(bo_list)
set_get2=time.perf_counter()
init3=time.perf_counter()
show_list=[ra.get.remote() for ra in rank_list]
set_remote3=time.perf_counter()
unset_list=show_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait3=time.perf_counter()
show=ray.get(show_list)
set_get3=time.perf_counter()
set_average_time=0
set_remote_all=0
set_wait_time=0
set_get_time=0
reduce_all_time=0
reduce_remote_time=0
reduce_get_time=0
reduce_add_time=0
reduce_sum_remote=0
reduce_sum_wait_time=0
reduce_sum_get_time=0
broadcast_time=0
broad_remote=0
broad_wait_time=0
broad_get_time=0
get_remote=0
get_wait_time=0
get_get_time=0
all_time=0
for _ in range(5):
allstart=time.perf_counter()
rank_list=[s.remote(i) for i in range(rank_num)]
init=time.perf_counter()
set_list=[ra.set.remote(data) for ra in rank_list]
set_remote=time.perf_counter()
unset_list=set_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait=time.perf_counter()
st=ray.get(set_list)
set_get=time.perf_counter()
init1=time.perf_counter()
re_list=[ra.reduce_sum.remote(0,rank_list) for ra in rank_list]
set_remote1=time.perf_counter()
unset_list=re_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait1=time.perf_counter()
ret=ray.get(re_list)
set_get1=time.perf_counter()
init2=time.perf_counter()
bo_list=[ra.broad.remote(0,rank_list) for ra in rank_list]
set_remote2=time.perf_counter()
unset_list=bo_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait2=time.perf_counter()
bot=ray.get(bo_list)
set_get2=time.perf_counter()
init3=time.perf_counter()
show_list=[ra.get.remote() for ra in rank_list]
set_remote3=time.perf_counter()
unset_list=show_list
while len(unset_list) !=0:
_,unset_list=ray.wait(unset_list,fetch_local=False)
set_wait3=time.perf_counter()
show=ray.get(show_list)
set_get3=time.perf_counter()
set_average_time+=np.array(st).mean()
set_remote_all+=set_remote-init
set_wait_time+=set_wait-set_remote
set_get_time+=set_get-set_wait
reduce_all_time+=ret[0][0]
reduce_remote_time+=ret[0][1]
reduce_get_time+=ret[0][2]
reduce_add_time+=ret[0][3]
reduce_sum_remote+=set_remote1-init1
reduce_sum_wait_time+=set_wait1-set_remote1
reduce_sum_get_time+=set_get1-set_wait1
broadcast_time+=bot[0]
broad_remote+=set_remote2-init2
broad_wait_time+=set_wait2-set_remote2
broad_get_time+=set_get2-set_wait2
get_remote+=set_remote3-init3
get_wait_time+=set_wait3-set_remote3
get_get_time+=set_get3-set_wait3
all_time+=set_get3-allstart
print(f'set_average_time {set_average_time/5}',flush=True)
print(f'set_remote {set_remote_all/5} set_wait_time {set_wait_time/5} set_get_time {set_get_time/5}',flush=True)
print(f'reduce_all_time {reduce_all_time/5} reduce_remote_time {reduce_remote_time/5} reduce_get_time {reduce_get_time/5} reduce_add_time {reduce_add_time/5}',flush=True)
print(f'reduce_sum_remote {reduce_sum_remote/5} reduce_sum_wait_time {reduce_sum_wait_time/5} reduce_sum_get_time {reduce_sum_get_time/5}',flush=True)
print(f'broadcast_time {broadcast_time/5}',flush=True)
print(f'broad_remote {broad_remote/5} broad_wait_time {broad_wait_time/5} broad_get_time {broad_get_time/5}',flush=True)
print(f'get_remote {get_remote/5} get_wait_time {get_wait_time/5} get_get_time {get_get_time/5}',flush=True)
print(f'all_time {all_time/5}',flush=True)
ray.shutdown()
there is no time log outputting. And error comes out. The log is here
After I change for _ in range(5)
to for _ in range(1)
, it works fine and no error comes out.
How does it happen?