1. Severity of the issue: (select one)
[+] Medium: Significantly affects my productivity but can find a workaround.
2. Environment:
- Ray version: 2.43.0
- Python version: 3.10.6
- OS: EulerOSv2R10
3. What happened vs. what you expected:
- Expected: Keeping frame and detected boundingboxes in order for object tracker
- Actual: I have queried a problem before, How to keep frame and detected boundingboxes in order for object tracker. But after I finish developing the function of keeping order by buffering the frames by frame index, there occurs another problem.
I first try to record the index of each frame in “Tracker” actor and show it in each 30 seconds. Unfortunately, I found that only half the number of frames actually arrived within 30 second.
Assuming there have two rtsp stream with 25-30 FPS and several actors. Then the algorithm is described as:
video decode (common python class, multiple threading/processing) -> skip_frame(common python class) -> shared_queue -> fetch (common python class) -> model_service.process (ray actor)
Excuse me,when I am developing the function of keep order, I first try to record the index of each frame in “Tracker” actor and show it in each 30 seconds. Unfortunately, I found that only half the number of frames actually arrived within 30 second.
Assuming there have two rtsp stream with 25-30 FPS and several actors. Then the algorithm is described as:
video decode (common python class, multiple threading/processing) -> skip_frame(common python class) -> shared_queue -> fetch (common python class) -> model_service.process (ray actor)
model_service:
@serve.deployment(num_replicas=1, name="ModelService")
class ModelService(object):
def __init__(self):
self.preprocessor = proprocess.bind()
self.inference = infer.bind()
self.postprocessor = postprocessor.bind()
self.track = track.bind()
async def process(self, image_list):
input_data_ref = self.preprocessor.preprocess_image.remote(image_list)
result_ref = await self.inference.infer.remote(input_data_ref)
processed_ref = self.postprocessor.postprocess.remote(image_list, result_ref)
await self.track.update.remote(image_list, processed_ref)
Other actor:
@serve.deployment(num_replicas=1, name="Preprocessor")
class Preprocessor:
...
@serve.deployment(num_replicas=1, name="Inference")
class Inference:
...
@serve.deployment(num_replicas=1, name="Postprocessor")
class Postprocessor:
...
@serve.deployment(num_replicas=1, name="Tracker")
class Tracker:
...
Assuming skip interval is 5, the scheduled task of printing index which are collected in self.track .
real coming sequence:
rtsp stream one:
[0, 10, 5, 15, 125, 20, 25, 230, 160, 65, 210, 250, 30, 95, 280, 200, 35, 140, 40, 225, 255, 220, 45, 115, 325, 50, 205, 135, 295, 55, 360, 380, 60, 260, 410, 70, 130, 480, 365, 75, 425, 450, 80, 335, 375, 85, 415, 340, 580, 90, 175, 545, 100, 105, 215, 515, 600, 620, 275, 370, 385, 110, 690, 120, 490, 265, 745, 145, 150, 455, 670, 270, 640, 730]
length = 74
actual length should = 149
rtsp stream two:
[0, 5, 10, 15, 145, 110, 20, 155, 25, 240, 95, 185, 30, 35, 40, 115, 280, 45, 245, 190, 50, 140, 165, 335, 55, 235, 325, 345, 60, 65, 100, 415, 285, 70, 355, 450, 175, 75, 400, 420, 80, 130, 200, 485, 470, 85, 535, 90, 305, 395, 220, 480, 510, 505, 545, 590, 620, 105, 340, 655, 350, 390, 570, 120, 405, 555, 660, 125, 230, 665, 490, 605, 725, 135, 435, 690, 265, 710, 150, 460, 790]
length = 81
actual length should = 158
manual sort:
rtsp stream one:
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 160, 175, 200, 205, 210, 215, 220, 225, 230, 250, 255, 260, 265, 270, 275, 280, 295, 325, 335, 340, 360, 365, 370, 375, 380, 385, 410, 415, 425, 450, 455, 480, 490, 515, 545, 580, 600, 620, 640, 670, 690, 730, 745]
length = 74
actual length should = 149
rtsp stream two:
[0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60, 65, 70, 75, 80, 85, 90, 95, 100, 105, 110, 115, 120, 125, 130, 135, 140, 145, 150, 155, 165, 175, 185, 190, 200, 220, 230, 235, 240, 245, 265, 280, 285, 305, 325, 335, 340, 345, 350, 355, 390, 395, 400, 405, 415, 420, 435, 450, 460, 470, 480, 485, 490, 505, 510, 535, 545, 555, 570, 590, 605, 620, 655, 660, 665, 690, 710, 725, 790]
length = 81
actual length should = 158
According to my understand of ray, the actor decorated by @serve.deployment have a request queue. So, when there have a bunch of frame inputs, previous actor will abandon some frames in the front of queue if the queue is full, causing current actor will receive same frame which index is not successive (e.g. 0, 10, 5, 15, 125 in rtsp stream one) and actual length is not equal to length (crucial problem : losing frames). But I do not know how to tackle it by using the features of ray serve. Cound anyone give me some advice to tackle this problem? Thanks in advance!
My goal is developing the function of keeping order while the index of all frames within 5 second.
So I can accept this sequence [0,10,5,25,20] with a complete unordered frame index, rather than [0,10,25,20] with missing frame indices.
I have try to add “max_ongoing_requests=100, max_queued_requests=300”, but it does not work… My StreamIndexPrinter is :
class StreamIndexPrinter:
def __init__(self, stage, interval=30):
self.stage = stage
self.interval = interval
self.stream_data = defaultdict(list)
self.color_codes = {
"Track": "34"
}
self.color = self.color_codes.get(stage, "35")
self._start_timer()
def _start_timer(self):
self.timer = threading.Timer(self.interval, self._print_and_clear)
self.timer.start()
def _print_and_clear(self):
for stream_id, indices in self.stream_data.items():
if indices:
print(f"\033[{self.color}m{self.stage}\033[0m : stream_id: {stream_id}, index: {indices}")
self._start_timer()
def add_index(self, stream_id, index):
self.stream_data[stream_id].append(index)