How severe does this issue affect your experience of using Ray?
- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
In an attempt to speed up some RLlib algorithms, I’ve been experimenting with using Ray core to do some decompression work with Ray tasks. The relevant part of my code looks like this:
import ray
from queue import Queue
from threading import Thread
from ray._raylet import ObjectRef
from ray.rllib.algorithms.dqn.dqn import DQN
@ray.remote
def decompress(sample_batch: SampleBatchType) -> SampleBatchType:
sample_batch.decompress_if_needed()
return sample_batch
class FastDQN(DQN):
def __init__(self, *args, **kwargs):
DQN.__init__(self, *args, **kwargs)
self.decompress_in_queue: Queue[SampleBatchType] = Queue()
self.decompress_out_queue: Queue[SampleBatchType] = Queue()
decompress_thread = Thread(
target=self.run_decompress,
daemon=True,
)
decompress_thread.start()
def run_decompress(self):
batch_refs: List[ObjectRef[SampleBatchType]] = []
while True:
while True:
try:
compressed_batch = self.decompress_in_queue.get(
block=len(batch_refs) == 0
)
batch_refs.append(decompress.remote(compressed_batch))
except queue.Empty:
break
ready_refs, batch_refs = ray.wait(batch_refs, num_returns=1, timeout=0.1)
for batch_ref in ready_refs:
self.decompress_out_queue.put(ray.get(batch_ref))
So various sample batches are passed in to decompress_in_queue
, the decompress thread starts a decompress
task for each, then waits for it to finish with ray.wait
and puts the result into decompress_out_queue
.
This works fine except that it constantly spills from the object store:
(raylet) Spilled 6168 MiB, 4 objects, write throughput 143 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.
(raylet) Spilled 15128 MiB, 6 objects, write throughput 342 MiB/s.
(raylet) Spilled 15691 MiB, 7 objects, write throughput 347 MiB/s.
(raylet) Spilled 16944 MiB, 58 objects, write throughput 361 MiB/s.
(raylet) Spilled 34301 MiB, 270 objects, write throughput 517 MiB/s.
(raylet) Spilled 66594 MiB, 627 objects, write throughput 632 MiB/s.
(raylet) Spilled 133549 MiB, 1096 objects, write throughput 800 MiB/s.
Note that this is hundreds of gigabytes, and eventually reaches multiple terabytes. I’ve looked in the directory where the spilling is happening inside /tmp
, and it definitely seems to be the SampleBatch
objects that decompress
is consuming and producing. And the problem goes away when I run the decompression without using Ray, so it definitely seems like this is the problem.
Can anyone help me figure out why the code is spilling so much data from the object store? It seems like once the decompress
task is done, it should be able to clean the SampleBatch
object out of the object store. Are there any tools that could help determine why it isn’t?