Keypoint streaming usecase

Hi,

Can I use ray-serve to stream key-points (for example person key-point detection in machine learning) to 10k - 100k users with high-performance? Latency in milliseconds.

If yes, what would be the best possible way to achieve it?

Best,
Rakesh

Any help here?

Best,
Rakesh

Yes. Ray Serve is able to deliver single digit millisecond latency and horizontal scaling for high throughput.

However, we might need more details here.

  • Because Ray Serve doesn’t perform model optimization, your model’s latency is likely to become the bottleneck.
  • Does your model support batching? If so that can increase the throughput without sacrificing latency.
  • What’s the frames per second for each user.

Happy to chat more about your use case here.

Hey, model is very much perfomant and optimized. I run them on-device currently (client-side) which heats up mobile phones and laptops so I want to move them server side. Latency is in milliseconds same as Holistic - mediapipe

I easily get 30FPS in realtime.

Model supports batching but I am not sure how it helps speeding up inference. We are running in realtime right we will never know how the next frame looks like.

Can you please help me with how to implement it? Can I simply use FastAPI from ray-serve docs or is there a better way to do it? Do I need to use something like redis?

Best,
Rakesh

Yes I would recommend start with our batching tutorial. The batching comes from multiple client’s frame being delivered around the same time.

Looks interesting! Will try it and keep you posted here.

Btw can you please let me know what is the best way to reach you? Discourse/email/discord/slack/github etc.

Thanks,
Rakesh

Hi,

The server stops abruptly after I run the script. Am I doing something wrong here?

main.py:

from typing import List
import time

import numpy as np
import requests
from starlette.requests import Request

import ray
from ray import serve


@serve.deployment(route_prefix="/adder")
class BatchAdder:
    @serve.batch(max_batch_size=4)
    async def handle_batch(self, numbers: List[int]):
        input_array = np.array(numbers)
        print("Our input array has shape:", input_array.shape)
        # Sleep for 200ms, this could be performing CPU intensive computation
        # in real models
        time.sleep(0.2)
        output_array = input_array + 1
        return output_array.astype(int).tolist()

    async def __call__(self, request: Request):
        return await self.handle_batch(int(request.query_params["number"]))


if __name__=='__main__':
    ray.init(num_cpus=2)
    serve.start()
    BatchAdder.deploy()
$ python main.py
(kps) C:\Users\rakes\gov-ai\keypoint-streaming>python main.py
(ServeController pid=17052) 2022-05-15 03:44:46,079     INFO checkpoint_path.py:15 -- Using RayInternalKVStore for controller checkpoint and recovery.
(ServeController pid=17052) 2022-05-15 03:44:46,189     INFO http_state.py:106 -- Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:PvKuZi:SERVE_PROXY_ACTOR-node:127.0.0.1-0' on node 'node:127.0.0.1-0' listening on '127.0.0.1:8000'
(pid=12632) Stack (most recent call first):
(pid=12632)   File "C:\Users\rakes\anaconda3\envs\kps\lib\site-packages\ray\worker.py", line 165 in current_job_id
(pid=12632)   File "C:\Users\rakes\anaconda3\envs\kps\lib\site-packages\ray\_private\import_thread.py", line 83 in _do_importing
(pid=12632)   File "C:\Users\rakes\anaconda3\envs\kps\lib\site-packages\ray\_private\import_thread.py", line 71 in _run
(pid=12632)   File "C:\Users\rakes\anaconda3\envs\kps\lib\threading.py", line 870 in run
(pid=12632)   File "C:\Users\rakes\anaconda3\envs\kps\lib\threading.py", line 932 in _bootstrap_inner
(pid=12632)   File "C:\Users\rakes\anaconda3\envs\kps\lib\threading.py", line 890 in _bootstrap
2022-05-15 03:44:47,193 INFO api.py:794 -- Started Serve instance in namespace '13d5dace-0af4-452b-90d1-75503ee4541f'.
2022-05-15 03:44:47,193 INFO api.py:615 -- Updating deployment 'BatchAdder'. component=serve deployment=BatchAdder
(HTTPProxyActor pid=11324) INFO:     Started server process [11324]
(ServeController pid=17052) 2022-05-15 03:44:47,304     INFO deployment_state.py:1210 -- Adding 1 replicas to deployment 'BatchAdder'. component=serve deployment=BatchAdder
2022-05-15 03:44:49,202 INFO api.py:630 -- Deployment 'BatchAdder' is ready at `http://127.0.0.1:8000/adder`. component=serve deployment=BatchAdder
(ServeController pid=17052) 2022-05-15 03:44:49,296     INFO deployment_state.py:1236 -- Removing 1 replicas from deployment 'BatchAdder'. component=serve deployment=BatchAdder

(kps) C:\Users\rakes\gov-ai\keypoint-streaming>
(kps) C:\Users\rakes\gov-ai\keypoint-streaming>
(kps) C:\Users\rakes\gov-ai\keypoint-streaming>python main.py
(ServeController pid=19604) 2022-05-15 03:46:15,338     INFO checkpoint_path.py:15 -- Using RayInternalKVStore for controller checkpoint and recovery.
(ServeController pid=19604) 2022-05-15 03:46:15,465     INFO http_state.py:106 -- Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:pwkYtr:SERVE_PROXY_ACTOR-node:127.0.0.1-0' on node 'node:127.0.0.1-0' listening on '127.0.0.1:8000'
2022-05-15 03:46:16,446 INFO api.py:794 -- Started Serve instance in namespace 'ce450fa3-bdd3-43bb-9e72-f268a9cc360a'.
2022-05-15 03:46:16,446 INFO api.py:615 -- Updating deployment 'BatchAdder'. component=serve deployment=BatchAdder
(ServeController pid=19604) 2022-05-15 03:46:16,518     INFO deployment_state.py:1210 -- Adding 1 replicas to deployment 'BatchAdder'. component=serve deployment=BatchAdder
(HTTPProxyActor pid=17908) INFO:     Started server process [17908]
2022-05-15 03:46:18,458 INFO api.py:630 -- Deployment 'BatchAdder' is ready at `http://127.0.0.1:8000/adder`. component=serve deployment=BatchAdder
(ServeController pid=19604) 2022-05-15 03:46:18,549     INFO deployment_state.py:1236 -- Removing 1 replicas from deployment 'BatchAdder'. component=serve deployment=BatchAdder

(kps) C:\Users\rakes\gov-ai\keypoint-streaming>
(kps) C:\Users\rakes\gov-ai\keypoint-streaming>
(kps) C:\Users\rakes\gov-ai\keypoint-streaming>