RuntimeError: can't start new thread

I have 3 serve deployments in a model composition setup. Each deployment has multiple replicas, some using fractional resources.

...

@serve.deployment(
    route_prefix="/ner",
    ray_actor_options={"num_cpus": 1},
    max_concurrent_queries=10,
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 2,
    }
)
@serve.ingress(app)
class Chain:

    def __init__(self,
                 tokeniser_handle: RayServeDeploymentHandle,
                 pipeline_handle: RayServeDeploymentHandle):
        self.tokeniser_handle = tokeniser_handle
        self.pipeline_handle = pipeline_handle

    @app.get("/predict")
    async def predict(self, doc: Doc):
        task = self.pipeline_handle.remote(self.tokeniser_handle.remote(doc))
        ref = await task
        result = await ref
        return result


@serve.deployment(
    ray_actor_options={"num_cpus": 1},
    max_concurrent_queries=10,
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 2,
    }
)
class Tokenisation:
    ...
    def __call__(self, doc):
        ...
        return tokens


@serve.deployment(
    ray_actor_options={"num_gpus": 0.25, "num_cpus": 1},
    max_concurrent_queries=10,
    autoscaling_config={
        "min_replicas": 1,
        "max_replicas": 4,
    }
)
class Pipeline:
    ...
    def __call__(self, tokens):
        ...
        return preds
  

pipeline = Pipeline.bind()
tokeniser = Tokenisation.bind()
chain = Chain.bind(tokeniser, pipeline)

When testing the API using aiohttp with many connections to the API (~50), the response time progressively increases until RuntimeError: can't start new thread occurs.
In the @serve.deployment decorators I added max_concurrent_queries=10, thinking this would limit the number of threads that were created, but the API continues to crash with the same error.

import aiohttp
import argparse

async def fetch_page(url, session):
    async with session.get(url) as response:
        return await response.json()

async def run_benchmark(url: str, t: int):
    # for a single connection
    start = time.time()
    count = 0
    async with aiohttp.ClientSession(loop=loop) as session:
        while time.time() - start < t:  # Run test for one minute.
            count += 1
            _ = await fetch_page(url, session)
        return count / (time.time() - start)  # Compute queries per second.


async def main(args):
    results = await asyncio.gather(*[run_benchmark(args.url, args.time) for _ in range(args.num_connections)])

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num_connections", "-n", type=int, required=True)
    parser.add_argument("--time", "-t", type=int, default=60)
    parser.add_argument("--url", "-u", type=str)
    args = parser.parse_args()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(args))

From the serve logs:

ERROR 2022-11-17 13:44:36,336 Chain Chain#XKLQGv replica.py:438 - Request failed due to RuntimeError:
(ServeReplica:Chain pid=10730) Traceback (most recent call last):
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/_private/replica.py", line 420, in invoke_single
(ServeReplica:Chain pid=10730)     result = await method_to_call(*args, **kwargs)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/api.py", line 221, in __call__
(ServeReplica:Chain pid=10730)     await self._serve_app(
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/applications.py", line 269, in __call__
(ServeReplica:Chain pid=10730)     await super().__call__(scope, receive, send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/applications.py", line 124, in __call__
(ServeReplica:Chain pid=10730)     await self.middleware_stack(scope, receive, send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/middleware/errors.py", line 184, in __call__
(ServeReplica:Chain pid=10730)     raise exc
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
(ServeReplica:Chain pid=10730)     await self.app(scope, receive, _send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/middleware/cors.py", line 84, in __call__
(ServeReplica:Chain pid=10730)     await self.app(scope, receive, send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/exceptions.py", line 93, in __call__
(ServeReplica:Chain pid=10730)     raise exc
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
(ServeReplica:Chain pid=10730)     await self.app(scope, receive, sender)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
(ServeReplica:Chain pid=10730)     raise e
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
(ServeReplica:Chain pid=10730)     await self.app(scope, receive, send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/routing.py", line 670, in __call__
(ServeReplica:Chain pid=10730)     await route.handle(scope, receive, send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/routing.py", line 266, in handle
(ServeReplica:Chain pid=10730)     await self.app(scope, receive, send)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/routing.py", line 65, in app
(ServeReplica:Chain pid=10730)     response = await func(request)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/routing.py", line 227, in app
(ServeReplica:Chain pid=10730)     raw_response = await run_endpoint_function(
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
(ServeReplica:Chain pid=10730)     return await dependant.call(**values)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/nrer/api/./app.py", line 68, in predict_groups
(ServeReplica:Chain pid=10730)     ref = await self.pipeline_handle.predict.remote(self.tokeniser_handle.remote(doc))
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/handle.py", line 345, in remote
(ServeReplica:Chain pid=10730)     self.handle = handle.options(method_name=self.handle_options.method_name)
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/handle.py", line 206, in options
(ServeReplica:Chain pid=10730)     return self.__class__(
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/handle.py", line 152, in __init__
(ServeReplica:Chain pid=10730)     self._pusher = start_metrics_pusher(
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/_private/autoscaling_metrics.py", line 82, in start_metrics_pusher
(ServeReplica:Chain pid=10730)     timer.start()
(ServeReplica:Chain pid=10730)   File "/home/ljbails/.pyenv/versions/3.8.14/lib/python3.8/threading.py", line 852, in start
(ServeReplica:Chain pid=10730)     _start_new_thread(self._bootstrap, ())
(ServeReplica:Chain pid=10730) RuntimeError: can't start new thread

Can someone shed some light on this error?

Hi @Lewis_Bails1 , thank you for trying serve! For your case, what is your e2e latency? Does it only happens when you have 50 connections? does it mean it will have 50x qps comparing with 1 connection.

During your testing, can you also take a look ray dashboard to double check the cpu usage of your hardware and whether the new replica is able to be provisioned based on your autoscaling config?

Latency for a single request is ~180ms. The replicas appear to be provisioned properly.
I think I may be misunderstanding asyncio.ClientSession, in that when num_connections is 1 it is still sending hundreds of concurrent requests, so perhaps those requests are still getting backed up to the point that no more threads can be made on the replicas.