Using ray results in a single CPU execution

Hello there Ray community,

I just found out this nice framework and I would like to use it to parallelize some computation that by default runs on a single CPU in my python implementation.

I have followed a few examples available on internet and my “code” looks something like this:

def newton_root(xi,eta,h,H):
    return newton(xinp1_eval,x0=xi,fprime=xinp1_grad_eval,args=(eta,xi,h,H))

This is the function that I am trying to parallelize. I am just trying to solve a root finding problem for a set of points in a 2D numpy array. In this case, xi, eta are numpy arrays and H is a RectBivariateSpline object from scipy.

I then try to use the remote function by calling it through another function:

def root_pq_par(self):
        xi_g = ray.put(self.xi.flatten())
        eta_g = ray.put(self.eta.flatten())
        h_g = ray.put(self.h*np.ones(self.xi.flatten().shape))
        # H_g = ray.put(repeat(self.H,self.xi.flatten().shape[0]))
        H_g = ray.put(self.H)
        result = ray.get([newton_root.remote(xi_g,eta_g,h_g,H_g)])
        self.xi_np1 = np.copy(result)

So in this case, I first put the numpy arrays and the spline function in the shared memory space.

I then try to obtain the results at the end with ray.get

The situation that I am having is that when testing and using this code, I observe that I am using a single CPU to perform the computations (in windows task manager I can see that many instances of the python interpreter are available but all of them have 0% of CPU usage and only one has a really low usage). In fact, if I compare the speed of this approach to the speed obtained from a normal python execution I observe no difference at all.

What could I be missing here?

Thanks for the feedback!

Hi - welcome to the Ray community. Can you also show the ray.get code?

Hi there,

I was able to solve my problem.

The original ray.get code is shown in the extraction that I included before:

def root_pq_par(self):
        xi_g = ray.put(self.xi.flatten())
        eta_g = ray.put(self.eta.flatten())
        h_g = ray.put(self.h*np.ones(self.xi.flatten().shape))
        # H_g = ray.put(repeat(self.H,self.xi.flatten().shape[0]))
        H_g = ray.put(self.H)
        result = ray.get([newton_root.remote(xi_g,eta_g,h_g,H_g)])
        self.xi_np1 = np.copy(result)

I realized that if the xi_g, eta_g vectors are not explicitly separated, then ray does not distribute them among my CPUs!

Instead this is what I ended up doing:

        H_g = ray.put(self.H)
        C_g = ray.put(self.C_ini)
        blocks_xi = [self.xi.flatten()[start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        blocks_eta = [self.eta.flatten()[start_index:end_index] for i,(start_index,end_index) in enumerate(self.index_pairs)]
        # result_xi, result_eta = ray.get([newton_root.remote(block_xi,block_eta,self.h_g,H_g) for block_xi,block_eta in zip(blocks_xi,blocks_eta)])
        result_xi = ray.get([newton_root.remote(block_xi,block_eta,self.h_g,H_g,C_g) for block_xi,block_eta in zip(blocks_xi,blocks_eta)])
        partial_xi = []
        partial_eta = []
        partial_eta_h = []
        partial_xi_h= []

        for i in range(0,len(result_xi),1):
            partial_xi.append(result_xi[i][0][:])
            partial_eta.append(result_xi[i][1][:])
            partial_eta_h.append(result_xi[i][2][:])
            partial_xi_h.append(result_xi[i][3][:])
        result_partial_xi = np.concatenate(partial_xi,axis=0)
        result_partial_eta = np.concatenate(partial_eta,axis=0)
        results_eta_h= np.concatenate(partial_eta_h,axis=0)
        results_xi_h= np.concatenate(partial_xi_h,axis=0)

This means, I have to pre-separate the arrays that will be used into blocks so that each block can be sent to a different CPU. Is this correct? (At least this worked for me)