Hello there ray community,
I am using ray to parallelize some computations on numpy arrays.
The portion of the code related to ray is the following:
blocks_p= [p_points[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
blocks_N= [N_array[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
blocks_i_dir= [self.i_dir[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
blocks_x_tar= [self.x_tar[:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
blocks_y_tar= [self.y_tar[:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
results = ray.get([surf_seed2_par.remote(block_p,block_N,self.Nx_g,self.Ny_g,block_i_dir,block_x_tar,block_y_tar,self.n1_g,self.n2_g,self.z_out_g) for block_p,block_N,block_i_dir,block_x_tar,block_y_tar in zip(blocks_p,blocks_N,blocks_i_dir,blocks_x_tar,blocks_y_tar)])
partial_p = []
partial_N = []
partial_Nx = []
partial_Ny = []
partial_Nz = []
partial_px = []
partial_py = []
partial_pz = []
for i in range(0,len(results),1):
partial_p.append(results[i][0][:])
partial_N.append(results[i][1][:])
for j in range(0,len(partial_p),1):
partial_Nx.append(partial_N[j][0])
partial_Ny.append(partial_N[j][1])
partial_Nz.append(partial_N[j][2])
partial_px.append(partial_p[j][0])
partial_py.append(partial_p[j][1])
partial_pz.append(partial_p[j][2])
result_partial_xi = np.concatenate(partial_p,axis=2)
result_partial_eta = np.concatenate(partial_N,axis=2)
In this case, the arrays p_points
, N_array
, i_dir
are (3,Nx,Ny) numpy arrays and x_tar
, y_tar
are (Nx,Ny) numpy arrays.
My remote function surf_seed2_par
returns two vectors of sizes: (3,Nx,block_size). At the end I obtain a nested lists from the remote function with length: (ncpus,2,3,Nx,block_size).
However, since the rest of my algorithm runs again in a single CPU, I need to stack these lists so that I can recover two (3,Nx,Ny) arrays at the end. This is the reason why I am using those two for loops in between.
Once I run this code and compare it to the non-parallelized implementation, the parallelized version takes significantly longer to run. When trying to estimate a bit the time that is spent in betwen, I can see a significant increase in time in relation to the auto_init_wrapper
function included in auto_init_hook.py
and obviously I see a significant increase due to the list comprehension used to extract the data back into a numpy array.
Any suggestion on what could I do to improve the performance in this case?