I have 3 serve deployments in a model composition setup. Each deployment has multiple replicas, some using fractional resources.
...
@serve.deployment(
route_prefix="/ner",
ray_actor_options={"num_cpus": 1},
max_concurrent_queries=10,
autoscaling_config={
"min_replicas": 1,
"max_replicas": 2,
}
)
@serve.ingress(app)
class Chain:
def __init__(self,
tokeniser_handle: RayServeDeploymentHandle,
pipeline_handle: RayServeDeploymentHandle):
self.tokeniser_handle = tokeniser_handle
self.pipeline_handle = pipeline_handle
@app.get("/predict")
async def predict(self, doc: Doc):
task = self.pipeline_handle.remote(self.tokeniser_handle.remote(doc))
ref = await task
result = await ref
return result
@serve.deployment(
ray_actor_options={"num_cpus": 1},
max_concurrent_queries=10,
autoscaling_config={
"min_replicas": 1,
"max_replicas": 2,
}
)
class Tokenisation:
...
def __call__(self, doc):
...
return tokens
@serve.deployment(
ray_actor_options={"num_gpus": 0.25, "num_cpus": 1},
max_concurrent_queries=10,
autoscaling_config={
"min_replicas": 1,
"max_replicas": 4,
}
)
class Pipeline:
...
def __call__(self, tokens):
...
return preds
pipeline = Pipeline.bind()
tokeniser = Tokenisation.bind()
chain = Chain.bind(tokeniser, pipeline)
When testing the API using aiohttp
with many connections to the API (~50), the response time progressively increases until RuntimeError: can't start new thread
occurs.
In the @serve.deployment
decorators I added max_concurrent_queries=10
, thinking this would limit the number of threads that were created, but the API continues to crash with the same error.
import aiohttp
import argparse
async def fetch_page(url, session):
async with session.get(url) as response:
return await response.json()
async def run_benchmark(url: str, t: int):
# for a single connection
start = time.time()
count = 0
async with aiohttp.ClientSession(loop=loop) as session:
while time.time() - start < t: # Run test for one minute.
count += 1
_ = await fetch_page(url, session)
return count / (time.time() - start) # Compute queries per second.
async def main(args):
results = await asyncio.gather(*[run_benchmark(args.url, args.time) for _ in range(args.num_connections)])
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--num_connections", "-n", type=int, required=True)
parser.add_argument("--time", "-t", type=int, default=60)
parser.add_argument("--url", "-u", type=str)
args = parser.parse_args()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(args))
From the serve logs:
ERROR 2022-11-17 13:44:36,336 Chain Chain#XKLQGv replica.py:438 - Request failed due to RuntimeError:
(ServeReplica:Chain pid=10730) Traceback (most recent call last):
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/_private/replica.py", line 420, in invoke_single
(ServeReplica:Chain pid=10730) result = await method_to_call(*args, **kwargs)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/api.py", line 221, in __call__
(ServeReplica:Chain pid=10730) await self._serve_app(
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/applications.py", line 269, in __call__
(ServeReplica:Chain pid=10730) await super().__call__(scope, receive, send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/applications.py", line 124, in __call__
(ServeReplica:Chain pid=10730) await self.middleware_stack(scope, receive, send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/middleware/errors.py", line 184, in __call__
(ServeReplica:Chain pid=10730) raise exc
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/middleware/errors.py", line 162, in __call__
(ServeReplica:Chain pid=10730) await self.app(scope, receive, _send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/middleware/cors.py", line 84, in __call__
(ServeReplica:Chain pid=10730) await self.app(scope, receive, send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/exceptions.py", line 93, in __call__
(ServeReplica:Chain pid=10730) raise exc
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
(ServeReplica:Chain pid=10730) await self.app(scope, receive, sender)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
(ServeReplica:Chain pid=10730) raise e
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
(ServeReplica:Chain pid=10730) await self.app(scope, receive, send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/routing.py", line 670, in __call__
(ServeReplica:Chain pid=10730) await route.handle(scope, receive, send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/routing.py", line 266, in handle
(ServeReplica:Chain pid=10730) await self.app(scope, receive, send)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/starlette/routing.py", line 65, in app
(ServeReplica:Chain pid=10730) response = await func(request)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/routing.py", line 227, in app
(ServeReplica:Chain pid=10730) raw_response = await run_endpoint_function(
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
(ServeReplica:Chain pid=10730) return await dependant.call(**values)
(ServeReplica:Chain pid=10730) File "/home/ljbails/nrer/api/./app.py", line 68, in predict_groups
(ServeReplica:Chain pid=10730) ref = await self.pipeline_handle.predict.remote(self.tokeniser_handle.remote(doc))
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/handle.py", line 345, in remote
(ServeReplica:Chain pid=10730) self.handle = handle.options(method_name=self.handle_options.method_name)
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/handle.py", line 206, in options
(ServeReplica:Chain pid=10730) return self.__class__(
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/handle.py", line 152, in __init__
(ServeReplica:Chain pid=10730) self._pusher = start_metrics_pusher(
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/envs/nrer_38/lib/python3.8/site-packages/ray/serve/_private/autoscaling_metrics.py", line 82, in start_metrics_pusher
(ServeReplica:Chain pid=10730) timer.start()
(ServeReplica:Chain pid=10730) File "/home/ljbails/.pyenv/versions/3.8.14/lib/python3.8/threading.py", line 852, in start
(ServeReplica:Chain pid=10730) _start_new_thread(self._bootstrap, ())
(ServeReplica:Chain pid=10730) RuntimeError: can't start new thread
Can someone shed some light on this error?