Nested list output in parallelized execution

Hello there ray community,

I am using ray to parallelize some computations on numpy arrays.

The portion of the code related to ray is the following:

        blocks_p= [p_points[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_N= [N_array[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_i_dir= [self.i_dir[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_x_tar= [self.x_tar[:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_y_tar= [self.y_tar[:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        results = ray.get([surf_seed2_par.remote(block_p,block_N,self.Nx_g,self.Ny_g,block_i_dir,block_x_tar,block_y_tar,self.n1_g,self.n2_g,self.z_out_g) for block_p,block_N,block_i_dir,block_x_tar,block_y_tar in zip(blocks_p,blocks_N,blocks_i_dir,blocks_x_tar,blocks_y_tar)])
        partial_p = []
        partial_N = []
        partial_Nx = []
        partial_Ny = []
        partial_Nz = []
        partial_px = []
        partial_py = []
        partial_pz = []
        for i in range(0,len(results),1):
            partial_p.append(results[i][0][:])
            partial_N.append(results[i][1][:])
        for j in range(0,len(partial_p),1):
            partial_Nx.append(partial_N[j][0])
            partial_Ny.append(partial_N[j][1])
            partial_Nz.append(partial_N[j][2])
            partial_px.append(partial_p[j][0])
            partial_py.append(partial_p[j][1])
            partial_pz.append(partial_p[j][2])
        result_partial_xi = np.concatenate(partial_p,axis=2)
        result_partial_eta = np.concatenate(partial_N,axis=2)

In this case, the arrays p_points, N_array, i_dir are (3,Nx,Ny) numpy arrays and x_tar, y_tar are (Nx,Ny) numpy arrays.

My remote function surf_seed2_par returns two vectors of sizes: (3,Nx,block_size). At the end I obtain a nested lists from the remote function with length: (ncpus,2,3,Nx,block_size).

However, since the rest of my algorithm runs again in a single CPU, I need to stack these lists so that I can recover two (3,Nx,Ny) arrays at the end. This is the reason why I am using those two for loops in between.

Once I run this code and compare it to the non-parallelized implementation, the parallelized version takes significantly longer to run. When trying to estimate a bit the time that is spent in betwen, I can see a significant increase in time in relation to the auto_init_wrapper function included in auto_init_hook.py and obviously I see a significant increase due to the list comprehension used to extract the data back into a numpy array.

Any suggestion on what could I do to improve the performance in this case?

Hi again ray community,

So I replaced the for loop for some numpy operations. I first split the list into two homogeneous sections that can be directly transformed into numpy arrays. After that, I manipulate the numpy arrays to obtain the shape that I desire:

blocks_p= [p_points[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_N= [N_array[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_i_dir= [self.i_dir[:,:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_x_tar= [self.x_tar[:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_y_tar= [self.y_tar[:,start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        results = ray.get([surf_seed2_par.remote(block_p,block_N,self.Nx_g,self.Ny_g,block_i_dir,block_x_tar,block_y_tar,self.n1_g,self.n2_g,self.z_out_g) for block_p,block_N,block_i_dir,block_x_tar,block_y_tar in zip(blocks_p,blocks_N,blocks_i_dir,blocks_x_tar,blocks_y_tar)])
        rp1 = np.asarray(results[0:-1])
        rp2 = np.asarray(results[-1])
        rp1_con = rp1.transpose(1,2,0,3).reshape(6,self.Nx,-1)
        r_joint = np.concatenate((rp1_con,rp2),axis=2)
        results_p = np.stack((r_joint[0,:],r_joint[1,:],r_joint[2,:]),axis=0)
        results_N = np.stack((r_joint[3,:],r_joint[4,:],r_joint[5,:]),axis=0)

I have to say that this approach is still slow. Once profiling this function, I see that a lot of time is used for doing the “list comprehension”. Any idea about what could be done in order to reduce the time needed for putting the results back into the shape that I need?