How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
I am seeing the memory leakage while putting a object (numpy array) into object store via ray.put()
. I am using ray==2.8.0
and python3.8
with ubuntu20.04
.
Please find attached the reproduction scripts.
import re
import ray
import time
import numpy as np
from ray.util.queue import Queue as rQueue
ray.init(address="auto" if ray.is_initialized() else None)
class Frame:
def __init__(self, src_id, seq_id=None, ref=None):
self.src_id = src_id
self.seq_id = seq_id
self.ref = ref
@ray.remote
class PacketDecoder:
def __init__(self, actor_name="packet_decoder") -> None:
self.actor_name = actor_name
self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
self.frame_count = 0
async def run(self, frame_holder):
print(f"{self.actor_name} Packet Decoder started.")
while True:
try:
# generate frame
# bgr_frame = np.random.rand(1920, 1080, 3)
bgr_frame = np.random.randint(low=0, high=255, size=(1920, 1080, 3), dtype=np.uint8)
frame_obj = Frame(src_id=self.actor_id, seq_id=self.frame_count, ref=ray.put(bgr_frame))
# some blocking task
time.sleep(0.04)
# Push data to holders
await frame_holder.put_async(frame_obj, timeout=1)
self.frame_count += 1
except Exception as ex:
print(f"Error: {self.actor_name}: {ex}")
@ray.remote
class ODIActor:
def __init__(self, actor_name="odi"):
self.actor_name = actor_name
async def run(self, frame_holder, src_id, timeout=2):
while True:
try:
try:
frame_obj = await frame_holder.get_async(block=True, timeout=timeout)
except:
print(f"[{src_id}]: No frame for last {timeout} seconds.")
continue
ray.internal.free(frame_obj.ref)
except Exception as ex:
print(f"[{src_id}]: Error: {ex}.")
def main():
# initialise camera actors
cam_prefix = "cam_"
decode_actors = {}
for cam in range(7):
decoder_name = f"decoder_{cam}"
decode_actors[f"{cam_prefix}{cam}"] = PacketDecoder.options(name=decoder_name, namespace="test",
num_cpus=0.1).remote(decoder_name)
# initialise odi actor
odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote("odi")
# create frame holders for all camera actors
frame_holders = {
cam_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{cam_name}_fh"})
for cam_name in decode_actors
}
# start ODI actors first
for f_name in frame_holders:
odi.run.remote(frame_holders[f_name], f_name)
# start the camera actors
decoder_refs = []
for d_name in decode_actors:
decoder_refs.append(decode_actors[d_name].run.remote(frame_holders[d_name]))
ray.get(decoder_refs)
print("------------------------- END -------------------------")
if __name__ == "__main__":
try:
main()
except Exception as ex:
print(f"Error: {ex}")