How severe does this issue affect your experience of using Ray?
- High: It blocks me to complete my task.
ray version: 2.31
I have an async Camera Actor which continuously puts Numpy-Arrays (a Frame) in a fixed size circular buffer (created using a Deque). There is a read function which will return the first frame in the Deque or None if the deque is empty.
I have a test code which spawns this Actor and starts the continuous buffer filling (for 30sec intervals).
After that, “read” function is being called continuously to get the latest frame from the actor. And I am explicitly removing it from the object store.
I am seeing a steady increase in the Camera Actor’s Memory Utilization overtime
I have attached scripts for Camera Actor & the test code with the memory utilization graph.
stream.py
import time
import asyncio
import traceback
import numpy as np
from collections import deque
from typing_extensions import Deque
MODULE_NAME = 'CAMERA'
class Camera:
def __init__(
self,
) -> None:
self.__max_buffer_length = 3
self.__stop_capturing = False
self.frames: Deque[np.ndarray] = deque()
self.job_stop_event = asyncio.Event()
async def __start(self) -> None:
'''
Starts reading frames and adds them to a buffer
'''
dummy_frame = np.random.normal(size=(1920, 1080, 3))
while self.job_active:
try:
# Read Frame
frame = dummy_frame+np.random.uniform(0,1) if not self.__stop_capturing else None
# Add the frame to the queue & maintain a circular buffer
if frame is not None:
self.frames.append(frame)
# Maintain a circular buffer
if len(self.frames) >= self.__max_buffer_length:
x = self.frames.popleft()
del x
except Exception as ex:
error_str = ''.join(traceback.format_exception(None, ex, ex.__traceback__))
print(f"{MODULE_NAME}, error reading frames, {error_str}")
await asyncio.sleep(0.01)
print(f"{MODULE_NAME}, stopped reading frames")
self.job_stop_event.set()
def start_capturing(self) -> None:
self.__stop_capturing = False
def stop_capturing(self) -> None:
self.__stop_capturing = True
def read(self) -> np.ndarray:
'''
Returns first frame from the buffer
'''
frame = None
if len(self.frames) > 0:
try:
frame = self.frames.popleft()
except Exception as ex:
error_str = ''.join(traceback.format_exception(None, ex, ex.__traceback__))
print(f"{MODULE_NAME}, error grabbing frame from the buffer, {error_str}")
return frame
async def start(self) -> None:
# Start Job
self.job_active = True
# Start reading frames
await self.__start()
async def stop(self) -> None:
# Stop the job
self.job_active = False
await self.job_stop_event.wait()
# Clear the queues
self.frames.clear()
print(f"{MODULE_NAME}, Exiting !!!")
test.py
import ray
import time
import asyncio
import ray._private.internal_api
from stream import Camera
async def main() -> None:
name = "CAM"
camera = ray.remote(Camera).options(
name=f"{name}#TEST",
num_cpus=0.1,
).remote()
camera.start.remote()
st = time.time()
stop_capturing = True
while True:
try:
frame_ref = camera.read.remote()
_ = await frame_ref
ray._private.internal_api.free(frame_ref)
except KeyboardInterrupt:
break
if time.time()-st >= 30:
if stop_capturing:
camera.stop_capturing.remote()
print(time.time(), "Stopped capturing")
else:
camera.start_capturing.remote()
print(time.time(), "Started capturing")
stop_capturing = not stop_capturing
st = time.time()
await asyncio.sleep(0.005)
await asyncio.sleep(10)
await camera.stop.remote()
if __name__ == "__main__":
asyncio.run(main())
Memory Profile
Ray Core Uncategorized Monitoring & Debugging Debugging and performance tuning #memory #leak #ram