1. Severity of the issue: (select one)
None: I’m just curious or want clarification.
Low: Annoying but doesn’t hinder my work.
Medium: Significantly affects my productivity but can find a workaround.
[x ] High: Completely blocks me.
2. Environment:
- Ray version: 2.44.1
- Python version: 3.10.12
- OS: Ubuntu 22.04.1 LTS
- Cloud/Infrastructure: on premise
- Other libs/tools (if relevant): CatBoost regression, gRPC
3. What happened vs. what you expected:
- Expected: gRPC runtime inference latency < 5ms in single threaded mode
- Actual: latency > 10 ms, whether client is run on same machine or another
serve_model.py
import ray
from ray import serve
import pandas as pd
from catboost import CatBoostRegressor
import time
import datetime
import inference_pb2
import inference_pb2_grpc
from fastapi import Request
#@serve.deployment(num_replicas=8, ray_actor_options={“num_cpus”: 2.0, “num_gpus”: 0})
@serve.deployment
class CatBoostModel:
def init(self):
self.model = CatBoostRegressor()
self.model.load_model(“catboost_model.cbm”)
async def __call__(self, request: Request):
start_time = datetime.datetime.now()
body = await request.json()
#features = pd.DataFrame([body["features"]], columns=["f1"])
features = pd.DataFrame([body["features"]], columns=["f1", "f2", "f3"])
pred = self.model.predict(features)[0]
resp = {"prediction": pred}
end_time = datetime.datetime.now()
dur = int((end_time - start_time).microseconds/1000)
print(f'REST inference: {dur}')
return resp
async def Predict(self, request):
start_time = datetime.datetime.now()
#print(f'request.features class: {request.features.__class__}')
features = list(request.features)
#print(f'features: {features}')
df = pd.DataFrame([features], columns=["f1", "f2", "f3"])
#print(f'dataframe shape: {df.shape}')
#print(f'dataframe info: {df.info()}')
#print(f'dataframe: {df}')
pred = self.model.predict(df)[0]
resp = inference_pb2.PredictResponse(result=pred)
end_time = datetime.datetime.now()
dur = int((end_time - start_time).microseconds/1000)
print(f'gRPC inference: {dur}')
return resp
g = CatBoostModel.bind()
app1 = “app1”
serve.run(target=g, name=app1, route_prefix=f"/{app1}")
inference.proto
syntax = “proto3”;
package inference;
service InferenceService {
rpc Predict (PredictRequest) returns (PredictResponse);
}
message PredictRequest {
repeated float features = 1;
}
message PredictResponse {
float result = 1;
}
generate proto
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. inference.proto
start serve
serve start --grpc-port 9000 --grpc-servicer-functions inference_pb2_grpc.add_InferenceServiceServicer_to_server
python serve_model.py
client
grpc_client.py
import grpc
import inference_pb2
import inference_pb2_grpc
import time
import threading
import datetime
NUM_THREADS = 1
REQUESTS_PER_THREAD = 1000
#NUM_THREADS = 8
#REQUESTS_PER_THREAD = 125
channel = grpc.insecure_channel(“localhost:9000”)
stub = inference_pb2_grpc.InferenceServiceStub(channel)
features = [0.5, 1.2, -0.7]
def worker(thread_id):
for i in range(REQUESTS_PER_THREAD):
start_time = datetime.datetime.now()
#start = time.perf_counter()
#response = stub.Predict(inference_pb2.PredictRequest(features=features), compression=grpc.Compression.Gzip)
response = stub.Predict(inference_pb2.PredictRequest(features=features))
end_time = datetime.datetime.now()
dur = int((end_time - start_time).microseconds/1000)
#end = time.perf_counter()
#latency = (end - start) * 1000
print(f"[Thread {thread_id}] Response: {response.result} | Latency: {dur:.2f} ms")
#print("gRPC prediction:", res.result)
#print(f"gRPC latency: {(end - start) * 1000:.2f} ms")
Launch threads
threads =
for t_id in range(NUM_THREADS):
t = threading.Thread(target=worker, args=(t_id,))
threads.append(t)
t.start()
Wait for all threads to complete
for t in threads:
t.join()
print(“Load test completed.”)
bash
python grpc_client_multi.py |grep Latency |awk ‘{s+=$7; c++} END {print s/c}’ |bc -l
10.113 ms