I have a simple setup with 2 actors. First Actor places raw and preprocessed images in shared memory and second actor runs predictions on preprocessed images.
import ray
import time
import numpy as np
from tqdm import tqdm
from ray.util import ActorPool
from ray.util.metrics import Count, Histogram, Gauge
CAMERA_COUNT = 3
@ray.remote
class Camera:
def __init__(self):
self.camera_acquired_frame_counter = Count(name="camera_acquired_count")
self.camera_acquire_time = Gauge(name=f'camera_acquire', tag_keys=("frame_order",))
self.preprocess_time = Gauge(name=f'camera_preprocess', tag_keys=("frame_order",))
self.ray_frame_set_put_time = Gauge(name=f'camera_frame_ray_put')
self.ray_preprocessed_frame_set_put_time = Gauge(name=f'camera_preprocessed_ray_put')
def get(self):
frame_set_acquire_start_time = time.time()
camera_count = CAMERA_COUNT
for cam_idx in range(camera_count):
frame_acquire_start_time = time.time()
time.sleep(0.01)
self.camera_acquire_time.record(time.time() - frame_acquire_start_time, tags={"frame_order": str(cam_idx)})
self.camera_acquired_frame_counter.record(1.0)
self.camera_acquire_time.record(time.time() - frame_set_acquire_start_time, tags={"fr ame_order": f'frame_set'})
frame_set_preprocess_start_time = time.time()
for cam_idx in range(camera_count):
frame_preprocess_start_time = time.time()
time.sleep(0.01)
self.preprocess_time.record(time.time() - frame_preprocess_start_time, tags={"frame_order": str(cam_idx)})
self.camera_acquire_time.record(time.time() - frame_set_preprocess_start_time, tags={"frame_order": f'frame_set'})
ray_frame_set_put_stime = time.time()
frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
self.ray_frame_set_put_time.record(time.time() - ray_frame_set_put_stime)
ray_preprocessed_frame_set_put_stime = time.time()
preprocesed_frame_ref = ray.put(np.zeros((3, 512, 512, 1)))
self.ray_preprocessed_frame_set_put_time.record(time.time() - ray_preprocessed_frame_set_put_stime)
return frame_ref, preprocesed_frame_ref
@ray.remote
class Classifier:
def __init__(self):
self.classifier_infer_time = Gauge(name=f'classifier_infer')
def predict(self, preprocesed_frame_ref):
predict_stime = time.time()
time.sleep(0.03)
self.classifier_infer_time.record(time.time() - predict_stime)
return np.zeros(1000)
if __name__ == '__main__':
print(ray.init(_metrics_export_port=58391))
print(ray.cluster_resources())
camera = Camera.remote()
classifier = Classifier.remote()
for ridx in tqdm(range(1000000), desc="Main loop"):
frame_ref, preprocesed_frame_ref = ray.get(camera.get.remote())
prediction = ray.get(classifier.predict.remote(preprocesed_frame_ref))
I notice that ray.put()
call
frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
starts to slow down over time.
This trend looks a bit concerning. Any ideas about what’s happening here?