[Serve,High] Stream-to-stream grpc methods do not work

Any attempt to send chunks in a thread gets the following error:

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = “Unexpected <class ‘TypeError’>: ‘NoneType’ object is not callable”
debug_error_string = “UNKNOWN:Error received from peer {created_time:“2024-01-18T19:50:11.488132856+05:00”, grpc_status:2, grpc_message:“Unexpected <class 'TypeError'>: 'NoneType' object is not callable”}”

The service has not been able to catch anywhere where the problem occurs.
When I change the method to unary-stream, the requests at least reach the method in the service.

Versions / Dependencies

grpcio-tools==1.59.3 (I’ve tried different versions)
ray[serve]==2.8.1
ray[serve-grpc]==2.8.1
python==3.11.5

Reproduction script

# test
syntax = "proto3";

message Config {
    string language_code    = 1;
    string domain           = 2;
}

message Request {
    oneof streaming_request {
        Config config = 1;
        bytes context = 2;
    }
    string id = 3;
}

message Response {
    string result  = 1; 
}

service MyService {
    rpc Streaming(stream Request) returns (stream Response);
}
# test.py
def generate_requests():
     yield stt_pb2.Request (id="123")
     yield stt_pb2.Request (id="456")

def test_getting_response_via_stream():
     with grpc.insecure_channel(ADDRESS) as channel:
            stub = test_pb2_grpc.MyServiceStub(channel)
            for response in stub.Streaming(generate_requests(), metadata=(("application", "test"),):
                 print(response)

test_getting_response_via_stream()
# service.py
from ray import serve
import test_pb2 as test_pb2 

@serve.deployment(
    ray_actor_options={"num_cpus": 4, "num_gpus": 0.2},
    max_concurrent_queries=5,
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 6,
        "upscale_delay_s": 360,
        "downscale_delay_s": 600,
    },
)
class MyService:
    async def StreamingRecognize(
        self,
        request_iterator,
    ):
        request_with_config = await anext(request_iterator)
        print(request_with_config)
        yield test_pb2.Response(
                result="pong",
        )

app = MyService.bind()
serve.run(app, name="test", route_prefix="/test")

High: It blocks me from completing my task.

@psydok yes that’s correct. Currently we only support unary-unary and unary-stream request responses. Will do some research on how to make stream-stream work and track this work in this issue [Serve] Support gRPC stream to stream request response · Issue #42488 · ray-project/ray · GitHub

1 Like

Thank you so much for the quick response and your work!

In ray 2.6.3 there was still support and everything was working… Now there is no way I can update and start the service in a cluster that has version 2.8.1 installed.

Can you please guide me from which version the support stopped?

hmm Serve’s gRPC support is only added in Ray 2.7.0 Release Ray-2.7.0 · ray-project/ray · GitHub :sweat_smile: stream-stream is a new feature that we haven’t yet exploring

I was using an experimental version of Serve’s gRPC :smiling_face_with_tear: