How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
In an attempt to speed up some RLlib algorithms, I’ve been experimenting with using Ray core to do some decompression work with Ray tasks. The relevant part of my code looks like this:
import ray from queue import Queue from threading import Thread from ray._raylet import ObjectRef from ray.rllib.algorithms.dqn.dqn import DQN @ray.remote def decompress(sample_batch: SampleBatchType) -> SampleBatchType: sample_batch.decompress_if_needed() return sample_batch class FastDQN(DQN): def __init__(self, *args, **kwargs): DQN.__init__(self, *args, **kwargs) self.decompress_in_queue: Queue[SampleBatchType] = Queue() self.decompress_out_queue: Queue[SampleBatchType] = Queue() decompress_thread = Thread( target=self.run_decompress, daemon=True, ) decompress_thread.start() def run_decompress(self): batch_refs: List[ObjectRef[SampleBatchType]] =  while True: while True: try: compressed_batch = self.decompress_in_queue.get( block=len(batch_refs) == 0 ) batch_refs.append(decompress.remote(compressed_batch)) except queue.Empty: break ready_refs, batch_refs = ray.wait(batch_refs, num_returns=1, timeout=0.1) for batch_ref in ready_refs: self.decompress_out_queue.put(ray.get(batch_ref))
So various sample batches are passed in to
decompress_in_queue, the decompress thread starts a
decompress task for each, then waits for it to finish with
ray.wait and puts the result into
This works fine except that it constantly spills from the object store:
(raylet) Spilled 6168 MiB, 4 objects, write throughput 143 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message. (raylet) Spilled 15128 MiB, 6 objects, write throughput 342 MiB/s. (raylet) Spilled 15691 MiB, 7 objects, write throughput 347 MiB/s. (raylet) Spilled 16944 MiB, 58 objects, write throughput 361 MiB/s. (raylet) Spilled 34301 MiB, 270 objects, write throughput 517 MiB/s. (raylet) Spilled 66594 MiB, 627 objects, write throughput 632 MiB/s. (raylet) Spilled 133549 MiB, 1096 objects, write throughput 800 MiB/s.
Note that this is hundreds of gigabytes, and eventually reaches multiple terabytes. I’ve looked in the directory where the spilling is happening inside
/tmp, and it definitely seems to be the
SampleBatch objects that
decompress is consuming and producing. And the problem goes away when I run the decompression without using Ray, so it definitely seems like this is the problem.
Can anyone help me figure out why the code is spilling so much data from the object store? It seems like once the
decompress task is done, it should be able to clean the
SampleBatch object out of the object store. Are there any tools that could help determine why it isn’t?