Parallelization using ray for a "for loop" add huge overhead in python

I want to parallelize the “for loop” iteration using Ray in python. The code is shown below. “idexs” iterates for 1024 times and all it does is just picks an index (i) and do an array access at self._storage[i] and stores all the information in data.

This is the original code:

def _encode_sample(self, idxes):
        obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
        for i in idxes:
            data = self._storage[i]
            obs_t, action, reward, obs_tp1, done = data
            obses_t.append(np.array(obs_t, copy=False))
            actions.append(np.array(action, copy=False))
            rewards.append(reward)
            obses_tp1.append(np.array(obs_tp1, copy=False))
            dones.append(done)
        return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)

As the idxes contain 1024 random numbers in a list, I want to run the code like, the first thread should run 1 to 256 iteration, the 2nd thread should run the 256 to 512 iteration, the 3rd thread should run 512 to 768 iterations and finally the 4th thread should run 768 to 1024 iterations and then the result should be gathered.

I also tried to modify the to do some parallelization using ray and found that the code takes way longer then the original implementation.

This is my code:

def _encode_sample(self, idxes):

    split_idxes = [idxes[0:256], idxes[256:512], idxes[512:768], idxes[768:1024]]
   
    futures = [_encode_sample_helper.remote(self._storage, split_idxes[idx]) for idx in range(4)]

    obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
    outputs = ray.get(futures)

    for output in outputs:
        #print(len(a), len(b), len(c))
        obses_t.extend(output[0])
        actions.extend(output[1])
        rewards.extend(output[2])
        obses_tp1.extend(output[3])
        dones.extend(output[4])

    return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)

This function is inside a class. I wrote my helper function outside the class, which is shown below:

Ray Core

The whole code block is from maddpg algorithm and this is the LoC link: https://github.com/openai/maddpg/blob/master/maddpg/trainer/replay_buffer.py#L34

I increased the number of agents from 3 to 6 and to 12, (linear growth), but the time grows super-linearly for this function, so I wanted to optimize this function.

Any help and inputs would be appreciated.

I deleted the duplicate version of the question and this is the updated one.

@Jules_Damji , this is the question. Any help from you would be appreciated.

Hi @gogineni_kailashnath,

Welcome to the forum.

The main thing I see that might help is passing a handle to the data to the remote calls. Something like this:

    data_handle = ray.put(self._storage)
    futures = [_encode_sample_helper.remote(data_handle, idxs) for idxs in split_idxes]

Hi @mannyv, I corrected that and looks like my algorithm started training. Still the algorithm is training very slowly. Do you think my workload (for loop in python) is small to get speedup from parallelization?