Ray.put slows down over time

I have a simple setup with 2 actors. First Actor places raw and preprocessed images in shared memory and second actor runs predictions on preprocessed images.

import ray
import time
import numpy as np
from tqdm import tqdm
from ray.util import ActorPool
from ray.util.metrics import Count, Histogram, Gauge

CAMERA_COUNT = 3

@ray.remote
class Camera:
    def __init__(self):
        self.camera_acquired_frame_counter = Count(name="camera_acquired_count")
        self.camera_acquire_time = Gauge(name=f'camera_acquire', tag_keys=("frame_order",))
        self.preprocess_time = Gauge(name=f'camera_preprocess', tag_keys=("frame_order",))
        self.ray_frame_set_put_time = Gauge(name=f'camera_frame_ray_put')
        self.ray_preprocessed_frame_set_put_time = Gauge(name=f'camera_preprocessed_ray_put')

    def get(self):
        frame_set_acquire_start_time = time.time()
        camera_count = CAMERA_COUNT
        for cam_idx in range(camera_count):
            frame_acquire_start_time = time.time()
            time.sleep(0.01)
            self.camera_acquire_time.record(time.time() - frame_acquire_start_time, tags={"frame_order":     str(cam_idx)})
        self.camera_acquired_frame_counter.record(1.0)
        self.camera_acquire_time.record(time.time() - frame_set_acquire_start_time, tags={"fr    ame_order": f'frame_set'})
        frame_set_preprocess_start_time = time.time()
        for cam_idx in range(camera_count):
            frame_preprocess_start_time = time.time()
            time.sleep(0.01)
            self.preprocess_time.record(time.time() - frame_preprocess_start_time, tags={"frame_order": str(cam_idx)})
        self.camera_acquire_time.record(time.time() - frame_set_preprocess_start_time, tags={"frame_order": f'frame_set'})

        ray_frame_set_put_stime = time.time()
        frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.ray_frame_set_put_time.record(time.time() - ray_frame_set_put_stime)
        ray_preprocessed_frame_set_put_stime = time.time()
        preprocesed_frame_ref = ray.put(np.zeros((3, 512, 512, 1)))
        self.ray_preprocessed_frame_set_put_time.record(time.time() -     ray_preprocessed_frame_set_put_stime)

        return frame_ref, preprocesed_frame_ref

@ray.remote
class Classifier:
    def __init__(self):
        self.classifier_infer_time = Gauge(name=f'classifier_infer')

    def predict(self, preprocesed_frame_ref):
        predict_stime = time.time()
        time.sleep(0.03)
        self.classifier_infer_time.record(time.time() - predict_stime)
        return np.zeros(1000)


if __name__ == '__main__':
print(ray.init(_metrics_export_port=58391))
print(ray.cluster_resources())
camera = Camera.remote()
classifier = Classifier.remote()
for ridx in tqdm(range(1000000), desc="Main loop"):
    frame_ref, preprocesed_frame_ref = ray.get(camera.get.remote())
    prediction = ray.get(classifier.predict.remote(preprocesed_frame_ref))

I notice that ray.put() call
frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
starts to slow down over time.

This trend looks a bit concerning. Any ideas about what’s happening here?

1 Like

Let’s communicate through github issue for this; ray.put() slows down over time. · Issue #13612 · ray-project/ray · GitHub