[High] Why doesn't parallelism work with data preprocessing?

Test:

import base64
import concurrent.futures
import logging
import time

import numpy as np
import requests
import torch
from ray import serve
from torchaudio.transforms import Resample as TResample
from transformers import pipeline

logger = logging.getLogger("ray.serve")

torch.set_num_threads(1)


@serve.deployment(ray_actor_options={"num_gpus": 0.1, "num_cpus": 1}, num_replicas=2)
class Preprocessor:
    def __init__(self):
        self.default_resampler = TResample(
            orig_freq=8000,
            new_freq=16000,
            dtype=torch.float32,
            resampling_method="sinc_interp_kaiser",
        ).to("cuda")

    def __call__(self, audio):
        raw_audio = base64.b64decode(audio)
        samples = np.frombuffer(raw_audio, dtype=np.int16)
        samples = samples.astype(np.float32) / np.iinfo(samples.dtype).max
        with torch.inference_mode():
            samples = torch.from_numpy(samples).to("cuda")
            samples = self.default_resampler(samples)
            samples = samples.to("cpu")
            return samples.numpy()


@serve.deployment(
    ray_actor_options={"num_gpus": 0.33, "num_cpus": 2},
    autoscaling_config={"min_replicas": 2, "max_replicas": 3},
)
class Translator:
    def __init__(self, process):
        self.process = process.options(use_new_handle_api=True)
        self.pipe = pipeline(
            "automatic-speech-recognition",
            model="openai/whisper-medium",
            chunk_length_s=30,
            device="cuda",
        )

    async def translate(self, request) -> str:
        samples = await self.process.remote(request["audio"])
        with torch.inference_mode():
            transcription = self.pipe(samples, generate_kwargs={"language": "russian"})[
                "text"
            ]
        return [{"text": transcription}]

    async def __call__(self, request):
        request = await request.json()
        return await self.translate(request)


app = Translator.options(route_prefix="/translate").bind(Preprocessor.bind())
serve.run(app)


def send_request(blob):
    s = time.time()
    resp = requests.post(f"http://127.0.0.1:8000/translate", json={"audio": blob})
    print(resp.json())
    return time.time() - s


with open(
    "/home/max/projects/models_deployment/tests/resources/long_phrase_etalon.wav", "rb"
) as f:
    blob = f.read()
    blob = base64.b64encode(blob).decode("ascii")

print(send_request(blob))
print(send_request(blob))
print(send_request(blob))
print(send_request(blob))

time.sleep(3)
print("TEST")
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = []
    for i in range(2):
        results.append(executor.submit(send_request, blob))
    print("RESULTS")
    for future in concurrent.futures.as_completed(results):
        result = future.result()
        print(result)

output:

2023-11-30 13:00:33,350 WARNING deployment.py:404 -- DeprecationWarning: `route_prefix` in `@serve.deployment` has been deprecated. To specify a route prefix for an application, pass it into `serve.run` instead.
2023-11-30 13:00:34,964 INFO worker.py:1664 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 
(ProxyActor pid=3197416) INFO 2023-11-30 13:00:36,631 proxy 10.80.0.21 proxy.py:1072 - Proxy actor 7f31fcd30dd1ad253d5c382c01000000 starting on node 6bb61fbb55ab6769c9a44e80afde55c19a5a660a435d26bfe02aa3a9.
(ProxyActor pid=3197416) INFO 2023-11-30 13:00:36,635 proxy 10.80.0.21 proxy.py:1257 - Starting HTTP server on node: 6bb61fbb55ab6769c9a44e80afde55c19a5a660a435d26bfe02aa3a9 listening on port 8000
(ProxyActor pid=3197416) INFO:     Started server process [3197416]
(ServeController pid=3197339) INFO 2023-11-30 13:00:36,755 controller 3197339 deployment_state.py:1379 - Deploying new version of deployment Preprocessor in application 'default'.
(ServeController pid=3197339) INFO 2023-11-30 13:00:36,759 controller 3197339 deployment_state.py:1379 - Deploying new version of deployment Translator in application 'default'.
(ServeController pid=3197339) INFO 2023-11-30 13:00:36,862 controller 3197339 deployment_state.py:1668 - Adding 2 replicas to deployment Preprocessor in application 'default'.
(ServeController pid=3197339) INFO 2023-11-30 13:00:36,871 controller 3197339 deployment_state.py:1668 - Adding 2 replicas to deployment Translator in application 'default'.
(ServeReplica:default:Translator pid=3197448) Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
(ServeReplica:default:Preprocessor pid=3197446) INFO 2023-11-30 13:00:45,819 Preprocessor default#Preprocessor#AQQHpC 39ed7197-32fd-4a07-9968-bdb7291cea5f /translate default replica.py:726 - __CALL__ OK 73.2ms
[{'text': ' test'}]
0.9717648029327393
(ServeReplica:default:Translator pid=3197449) INFO 2023-11-30 13:00:46,664 Translator default#Translator#XTFRyI 39ed7197-32fd-4a07-9968-bdb7291cea5f /translate default replica.py:726 - __CALL__ OK 924.5ms
(ServeReplica:default:Preprocessor pid=3197446) INFO 2023-11-30 13:00:46,682 Preprocessor default#Preprocessor#AQQHpC 92b3ae19-ab35-4031-82d9-a3e4c662f743 /translate default replica.py:726 - __CALL__ OK 0.7ms
[{'text': ' test'}]
0.8590915203094482
(ServeReplica:default:Preprocessor pid=3197446) INFO 2023-11-30 13:00:47,537 Preprocessor default#Preprocessor#AQQHpC 4b6625d2-1820-4230-be75-d8c67aa84373 /translate default replica.py:726 - __CALL__ OK 0.7ms
(ServeReplica:default:Translator pid=3197448) INFO 2023-11-30 13:00:47,526 Translator default#Translator#YRwxid 92b3ae19-ab35-4031-82d9-a3e4c662f743 /translate default replica.py:726 - __CALL__ OK 851.7ms
[{'text': ' test'}]
0.7534689903259277
(ServeReplica:default:Translator pid=3197449) INFO 2023-11-30 13:00:48,280 Translator default#Translator#XTFRyI 4b6625d2-1820-4230-be75-d8c67aa84373 /translate default replica.py:726 - __CALL__ OK 747.7ms
(ServeReplica:default:Preprocessor pid=3197447) INFO 2023-11-30 13:00:48,364 Preprocessor default#Preprocessor#SSdZWA b73ebbd5-bee3-426e-9b82-a748fcdb0c1d /translate default replica.py:726 - __CALL__ OK 73.3ms
[{'text': ' test'}]
0.8116359710693359
(ServeReplica:default:Translator pid=3197448) INFO 2023-11-30 13:00:49,092 Translator default#Translator#YRwxid b73ebbd5-bee3-426e-9b82-a748fcdb0c1d /translate default replica.py:726 - __CALL__ OK 805.6ms
TEST
RESULTS
(ServeReplica:default:Preprocessor pid=3197447) INFO 2023-11-30 13:00:52,110 Preprocessor default#Preprocessor#SSdZWA 98dd517a-c5f7-4ff6-a37c-1c61d1f94db9 /translate default replica.py:726 - __CALL__ OK 0.8ms
(ServeReplica:default:Preprocessor pid=3197447) INFO 2023-11-30 13:00:52,111 Preprocessor default#Preprocessor#SSdZWA 461e56eb-6b0b-4799-9af3-30ea17eae4da /translate default replica.py:726 - __CALL__ OK 0.7ms
(ServeReplica:default:Translator pid=3197449) Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
[{'text': ' test'}]
[{'text': ' test'}]
1.2893760204315186
1.2903039455413818
(ServeReplica:default:Translator pid=3197449) INFO 2023-11-30 13:00:53,385 Translator default#Translator#XTFRyI 98dd517a-c5f7-4ff6-a37c-1c61d1f94db9 /translate default replica.py:726 - __CALL__ OK 1280.6ms
(ServeReplica:default:Translator pid=3197448) INFO 2023-11-30 13:00:53,385 Translator default#Translator#YRwxid 461e56eb-6b0b-4799-9af3-30ea17eae4da /translate default replica.py:726 - __CALL__ OK 1280.1ms

If I remove Preprocessor - the service response under load will have the expected speed (about 0.5-0.8 sec per each parallel request).
Preprocessor works fast. Can you tell me how to fix it?

I have realised that it is not a matter of pre-processing after all. But nevertheless it turns out that balancing is not clear how it works.
I raised 4 replicas and ran a load test with 5 users. You can see that the load on the replicas is not evenly distributed. Tried fixing this with RAY_scheduler_spread_threshold=0.0. Nothing changed.

If you run multiple ray containers with 1 replica each and do nginx balancing, then everything is fine. But this should be provided by ray, right? Can you tell me how to set up round-robin balancing?

Ray Serve uses power-of-two-choices routing. When a ServeHandle receives a request, it:

  1. Randomly chooses 2 replicas from the requested deployment
  2. Queries the number of requests that each replica is processing
  3. Sends the request to the replica that’s processing fewer requests. If both replicas are already processing max_concurrent_queries requests, then the ServeHandle picks 2 new replicas and repeats the process.

Power-of-two-choices generally does a good job of balancing load. E.g. if there’s a slow replica or a replica processing lengthy requests, power-of-two-choices naturally directs requests to other replicas while round-robin continues to send requests to the replica, which risks overloading it.

The downside is that since the 2 replicas are chosen randomly, if there’s a low number of requests and a low number of replicas, the request distribution will be a bit more uneven. How much traffic do you anticipate receiving in production?

Currently, Ray Serve doesn’t provide a way to do round-robin routing to replicas. If you’re interested in it, could you file a feature request on GitHub?

1 Like

Thanks for the quick reply!
It would be really great if round-robin support could be added.

In production, we expect a constant load up to 10-20 requests per second of requests running up to 800ms. It is important to get a response before the client times out (usually set to 3 sec).
Unfortunately, batching is not possible at the moment. And the current balancing result does not meet the requirements (most replicas seem to be idle).
I have tried setting max_concurrent_queries=2, but it doesn’t make any difference.

Thank you, I’ve created a feature request on GitHub.

Thanks for filing the feature request!

And the current balancing result does not meet the requirements (most replicas seem to be idle).

How many replicas are you running and how many seem idle? How many requests are queued up at the replicas that aren’t idle?

I have 4 replicas running. You can see from the graph that the service balances normally for a while, but then it starts loading only one replica. So 3 replicas are idle.
How can I see how many requests are in queues on replicas?

Use the following metrics (docs):

  • ray_serve_replica_processing_queries: The number of requests running on the replica.
  • ray_serve_replica_pending_queries: The number of requests that have been sent to the replica but haven’t started running.

You can see from the graph that the service balances normally for a while, but then it starts loading only one replica.

Could you explain this more? How does the graph tell us that only 1 replica is being used?

We know that each replica processes one request in 500-800 ms. On the graph, under load, it can be seen that some requests are really distributed among replicas, but at some point we see that requests begin to accumulate in a queue for processing by one replica and the response speed significantly sags, judging by the response time, requests are processed sequentially.

I also tried testing the service with 4 instances via Docker and balancing them via nginx. In this case the response speed did not sag under load. The graph on all tests up to 5 requests per second was flat.

We have not yet connected the Prometheus. I’ll try it and report back.
In general, I also tried to set max_concurrent_queries=2 and max_concurrent_queries=1, it should not be more 1-2. But the response speed did not get better in either case :frowning:

ray_serve_replica_pending_queries is always 0 on any load. Tested 5/10/15/50 requests per second.
ray_serve_replica_processing_queries reaches a value equal to load testing (5/10/15/50 requests per second).

I tested the prod version of the service on grpc and from the example.

I tried to replicate 2 replicas on two nodes. Surprisingly, balancing between replicas started working correctly (2 requests were executed in the same time). But when I run replicas on one node, the balancing does not work at all (the service responds to requests sequentially).

Hi @psydok thanks for reporting some possible issues. I tried to run the code from the top and can’t really get it working. So I just replaced some things with some minimal test code like the below

# deployment.py
import asyncio
from ray import serve


@serve.deployment(num_replicas=2)
class Preprocessor:
    def __init__(self):
        pass

    async def __call__(self, audio):
        await asyncio.sleep(1)
        return f"from preprocessor: {audio}"


@serve.deployment(
    autoscaling_config={"min_replicas": 2, "max_replicas": 3},
)
class Translator:
    def __init__(self, process):
        self.process = process.options(use_new_handle_api=True)

    async def translate(self, request) -> str:
        samples = await self.process.remote(request["audio"])
        return [{"text": samples}]

    async def __call__(self, request):
        request = await request.json()
        return await self.translate(request)


app = Translator.options(route_prefix="/translate").bind(Preprocessor.bind())

Run serve run deployment:app to start the app

# client.py
import time
import requests
import concurrent.futures


def send_request(blob):
    s = time.time()
    resp = requests.post(f"http://127.0.0.1:8000/translate", json={"audio": blob})
    print(resp.json())
    return time.time() - s


blob = "foobar"
print(send_request(blob))
print(send_request(blob))
print(send_request(blob))
print(send_request(blob))

time.sleep(3)
print("TEST")
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = []
    for i in range(50):
        results.append(executor.submit(send_request, blob))
    print("RESULTS")
    for future in concurrent.futures.as_completed(results):
        result = future.result()
        print(result)

All the results seems to be processed pretty fast?

Can you run this code and let us know if you see the same issue?

PS: I’m running this with Ray on master if that matters

Yes, thank you very much for your reply. I’m convinced that the problem is not in preprocessing. For some reason whisper cannot be raised on a single gpu with multiple replicas, so that rai understands that it has to be marshalled between them. Only 1 replica on each gpu, but this is not optimal utilisation of gpus.