Can I have two ray actors in the same process?

Currently I do something like this:

memory = ReplayBuffer.remote()
workers = [Worker(memory) for _ in range(10)]
[w.fill_memory.remote() for w in workers]
while True:
   ray.get(memory.sample.remote())

Essentially, I have a some workers that fill a data storage buffer (memory), and then I start them running with their fill_memory method which will just populate the buffer. However, as they are filling the buffer I would also like to be sampling from it. This causes a bit of a bottleneck, as the data needs to now be sent from the process where the memory is to the main process. Is there a way of having this memory in the same process as the main process, so that the workers can still be populating it in the background but when sampling data there is no overhead?

Thanks!

Is your memory just pure bytearray? What’s the type of the buffer?

Thanks for the reply @sangcho ! It’s a list/deque of tuples of numpy arrays. The definition is given by:

@ray.remote
class ReplayBuffer(object):
    def __init__(self, max_size, batch_size):
        self.memory = deque(maxlen=max_size)
        self.batch_size = batch_size

    def generate_batches(self):
        return random.sample(self.memory, self.batch_size)

    def push(self, data):
        for d in data:
            self.memory.append(d)

edit: I think what I may want is for some kind of shared memory?

In this case, when you call ray.get from your driver script, the objects are zero-copied, so you don’t need to worry about overhead (or overhead is very small)

how did you verify the overhead? Can you tell me a bit about the methodology you used?

Thanks again for the reply @sangcho !
I profiled my code (using PyCharm), and can see that the bottleneck comes from the sampling. Below I’ll copy some code that you can run to see what I mean (it is imperfect as it uses time to measure timings, but if you want to properly profile you can).

On my machine sampling this data from the remote buffer takes around 0.002 minutes vs. 0.00001 minutes from the buffer stored locally. I know that this may seem small but when you consider I will be doing this sampling for 1M or more times during training, the small difference can make a big difference. So, is there any way in which I can have the replay buffer stored more locally so that the sampling times are comparable that you know of?

Code:

import ray
from collections import deque
import random
import numpy as np
import time


DATA_SIZE = 40
N_WORKERS = 2


@ray.remote
class ReplayBuffer(object):
    def __init__(self, max_size, batch_size):
        self.memory = deque(maxlen=max_size)
        self.batch_size = batch_size

    def generate_batches(self):
        return random.sample(self.memory, self.batch_size)

    def push(self, data):
        for d in data:
            self.memory.append(d)

    def return_buffer(self):
        return self.memory


@ray.remote
class ContWorker:
    def __init__(self, memory):
        self.data_size = DATA_SIZE
        self.memory = memory

    def get_data(self):
        memory = []
        count = 0
        while True:
            state = np.random.normal(size=200)
            action = np.random.randint(0, 3, size=38)
            reward = np.random.normal(size=1)
            next_state = np.random.normal(size=200)
            terminated = False
            count += 1
            memory.append((state, action, reward, next_state, terminated))
            if count == self.data_size:
                self.memory.push.remote(memory)
                memory = []
                count = 0
            if count > 10000:
                return


if __name__ == '__main__':
    replay_buffer = ReplayBuffer.remote(100000, 256)

    workers = [ContWorker.remote(replay_buffer) for w in range(N_WORKERS)]
    [w.get_data.remote() for w in workers]
    time.sleep(10)  # gives time to fill the buffer so it's non-empty
    times = []
    for i in range(101):
        print(i + 1)
        start = time.time()
        data = ray.get(replay_buffer.generate_batches.remote())
        end = time.time()
        times.append((end - start) / 60)
    print(f'Avg time taken: {np.mean(times[1:])} minutes')


    class ReplayBuffer(object):
        def __init__(self, max_size, batch_size):
            self.memory = deque(maxlen=max_size)
            self.batch_size = batch_size

        def generate_batches(self):
            return random.sample(self.memory, self.batch_size)

        def push(self, data):
            for d in data:
                self.memory.append(d)

    replay_buffer_2 = ReplayBuffer(100000, 256)
    replay_buffer_2.memory = ray.get(replay_buffer.return_buffer.remote())
    times = []
    for i in range(101):
        print(i + 1)
        start = time.time()
        data = replay_buffer_2.generate_batches()
        end = time.time()
        times.append((end - start) / 60)
    print(f'Avg time taken: {np.mean(times[1:])} minutes')

@sangcho did you have any other thoughts on this?