How to prioritise certain method calls in a shared object?

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

I am training a DQN using a distributed set-up, but using the distributed setup slows the code down massively, even though they are essentially the same (except for how the data is collected).

In the non-distributed setting the agent will take an environment step and add it to the buffer, then learn by sampling from the buffer and performing an SGD step. In the distributed setting I delegate the data collection to workers where they push the data to a shared buffer. I am thinking that this must be what is causing the process to slow down since the learning code is exactly the same. Could the fact that all these workers are interacting with the buffer be slowing down the whole process? I’m very new to using Ray, but as I understand, in Python only a single process can access an object at a time so if these workers are constantly pushing data to the buffer, does this mean the learning method has to ‘wait’ until the buffer is free to be sampled from? if so, is there a way around this e.g. by giving the learning method a higher priority over the workers?

Here is the code for my replay buffer:

@ray.remote
class ReplayBuffer:
    def __init__(self, capacity, batch_size=128):
        self.buffer = deque(maxlen=capacity)
        self.batch_size = batch_size

    def push(self, data):
        for d in data:
            self.buffer.append(d)

    def sample(self):
        return random.sample(self.buffer, self.batch_size)

    def __len__(self):
        return len(self.buffer)

I want to make it so that if there is e.g. a queue of push calls waiting to be executed, and then a sample call comes in, that sample has a priority over the existing push calls.

By default, Ray actor is single threaded, method calls are invoked in the receive order so push calls can block later arrived sample calls.

For your case, you can probably use concurrency group for actor (Limiting Concurrency Per-Method with Concurrency Groups — Ray 2.3.0): assigning different concurrency group to push and sample.

Thanks @jjyao, I got this to work but it turns out it was not the problem. I profiled my code and there is a huge amount of overhead from ray.get. I’m not sure of a way around this – I tried to extend the periods at which data from the actors is pushed to the buffers, but this didn’t affect performance. For reference, when doing the sampling as above, using ray.get it takes around 0.001 seconds per sample, whereas in a non-distributed setting it takes ~1e-5. I get that there is more overhead when sampling using ray.get from a shared object, but surely it shouldn’t give rise to that much of a slow down? buffer sizes, batch size etc. are kept constant, and is being run on the same machine.

1 microsecond per sample seems reasonable to me given the overhead of running the sample method on a remote actor process and fetching the result back.

How often do you call sample, is it possible to call it less frequent but with larger batch size to amortize the cost?

1 Like

sample can be called somewhere between 1-5million times, so unfortunately the microseconds add up. However, I’ve changed how the distributed setting works now – rather than have actors continually collecting data (which requires a decentralised replay buffer), they now collect a fixed sample, return this, then perform N gradient steps (sampling now from a centralised buffer). This gets rid of one round of serialising/de-serialising when pulling/pushing from the replay buffer and has helped speed things up!

Nice, glad to hear that you have a solution.

1 Like