I want to parallelize the “for loop” iteration using Ray in python. The code is shown below. “idexs” iterates for 1024 times and all it does is just picks an index (i) and do an array access at self._storage[i] and stores all the information in data.
This is the original code:
def _encode_sample(self, idxes):
obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], []
for i in idxes:
data = self._storage[i]
obs_t, action, reward, obs_tp1, done = data
obses_t.append(np.array(obs_t, copy=False))
actions.append(np.array(action, copy=False))
rewards.append(reward)
obses_tp1.append(np.array(obs_tp1, copy=False))
dones.append(done)
return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
As the idxes contain 1024 random numbers in a list, I want to run the code like, the first thread should run 1 to 256 iteration, the 2nd thread should run the 256 to 512 iteration, the 3rd thread should run 512 to 768 iterations and finally the 4th thread should run 768 to 1024 iterations and then the result should be gathered.
I also tried to modify the to do some parallelization using ray and found that the code takes way longer then the original implementation.
This is my code:
def _encode_sample(self, idxes):
split_idxes = [idxes[0:256], idxes[256:512], idxes[512:768], idxes[768:1024]] futures = [_encode_sample_helper.remote(self._storage, split_idxes[idx]) for idx in range(4)] obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], [] outputs = ray.get(futures) for output in outputs: #print(len(a), len(b), len(c)) obses_t.extend(output[0]) actions.extend(output[1]) rewards.extend(output[2]) obses_tp1.extend(output[3]) dones.extend(output[4]) return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones)
This function is inside a class. I wrote my helper function outside the class, which is shown below:
The whole code block is from maddpg algorithm and this is the LoC link: https://github.com/openai/maddpg/blob/master/maddpg/trainer/replay_buffer.py#L34
I increased the number of agents from 3 to 6 and to 12, (linear growth), but the time grows super-linearly for this function, so I wanted to optimize this function.
Any help and inputs would be appreciated.