Optimal way to handle for loop with multiple await calls

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hi, I am trying out Ray Serve for a deployment scenario I have which is to make a prediction on each sentence within a paragraph. The expected output based on an input paragraph should be the following:

[
  {
    "sentence": "",
    "news_type": "hoax",
    "score": 0.33035558462142944
  },
  {
    "sentence": "",
    "news_type": "hoax",
    "score": 0.4125480353832245
  }
]

I have 3 internal deployments which are all async, that are to be used within the actual prediction endpoint:

@serve.deployment(version="v0")
class AutoTokenizerDeployment:
    def __init__(self, model_name: str) -> None:
        ...

    async def __call__(self, sentence: str):
        # return tokenized sentence


@serve.deployment(version="v0")
class SplitSentencesDeployment:
    async def __call__(self, text: List[str]) -> List[str]:
        # split sentences

@serve.deployment(num_replicas=4, version="v0")
class ModelDeployment:
    def __init__(self, model_path: str) -> None:
        ...

    async def __call__(self, preprocessed) -> Tuple[float, Category]:
        # return prediction based on tokenized sentence

Question:
In the actual /predict endpoint, I get the handles of the previous deployments to make inferences. After referring to the documentation and a couple of examples such as this one, I have noticed that I will need to call await (await remote()) on each loop multiple times to achieve the desired output. Is this the correct way to do so? Or is there a better way?

@serve.deployment(route_prefix="/")
@serve.ingress(app)
class NLPClassifierComposite:
    def __init__(self) -> None:
        self.tokenizer = AutoTokenizerDeployment.get_handle(sync=False)
        self.sentence_splitter = SplitSentencesDeployment.get_handle(sync=False)
        self.model = ModelDeployment.get_handle(sync=False)

    @app.post(
        "/predict",
        # response_model=List[CompositeModelOutput],
        # throws an error due to bug in Ray
        status_code=status.HTTP_200_OK,
    )
    async def predict(self, payload: ModelInput):
        text = payload.text
        sentences = await (await self.sentence_splitter.remote(text))
        results = []
        for sentence in sentences:
            tokenized_sentence = await (await self.tokenizer.remote(sentence))
            probability, class_name = await (
                await self.model.remote(tokenized_sentence)
            )
            results.append(
                {
                    "sentence": sentence,
                    "news_type": class_name,
                    "score": float(probability),
                }
            )
        return results

I suspect performing a double await within the same deployment might be slowing things down. I understand that the inner await is to get the value from the remote handle call to other deployments. What’s the reason for the outer await inside all the calls inside NLPClassifierComposite?

Could you move one of the await calls to the other deployments? This would require changing the AutoTokenizerDeployment, the SplitSentencesDeployment, and the ModelDeployment's return statements from:

return some_value

to

return await some_value

By outer await do you mean like in sentences = await (await self.sentence_splitter.remote(text))? If so, based on the documentation this is equivalent to sentences = ray.get(await self.sentence_splitter.remote(text)).

I tried that but I don’t think that helps as I would still need to await on the ObjectRef either way. Also isn’t return await meant for returning a coroutine from another coroutine? In this case, I am just returning the value directly and not a coroutine.

Ok, so I think I have improved the code to prevent most of the unnecessary blocking(multiple await in the for loop). I referred to this article, Scaling Python Asyncio with Ray.

Previously, I ran await (await remote_func) within each for loop for each sentence in the list of sentences, which was blocking other code twice for each sentence.

for sentence in sentences:
    tokenized_sentence = await (await self.tokenizer.remote(sentence))
    probability, class_name = await (
        await self.model.remote(tokenized_sentence)
    )

Instead, I generated a list of the coroutines to run and then used asyncio.gather to run all the coroutines asynchronously, like so:

tokenized_sentence_ids = [
    self.tokenizer.remote(sentence) for sentence in sentences
]
# tokenized_sentence_ids = [<coroutine object RayServeHandle.remote at 0x7ff5d4e1b5c0>, <coroutine object RayServeHandle.remote at 0x7ff5d4e50240>]
tokenized_sentence_refs = await asyncio.gather(*tokenized_sentence_ids)
# tokenized_sentence_refs = [ObjectRef(dca25e4878ed59e0b81e14c8e3a69bffbe53f3560100000002000000), ObjectRef(05b9c30025b245d1b81e14c8e3a69bffbe53f3560100000002000000)]

Another thing that I noticed is that await (await remote_func) != ray.get(await remote_func). await (await remote_func) only works if you run it on a single ObjectRef, if you run it on a List[ObjectRef], if will throw an error. So I had to run predictions = ray.get(await asyncio.gather(*output_ids)) to get the final predictions from the output_ids which was of type List[ObjectRef].

Would appreciate some feedback on this updated answer, @shrekris.

Updated code:

@serve.deployment(route_prefix="/")
@serve.ingress(app)
class NLPClassifierComposite:
    def __init__(self) -> None:
        self.tokenizer = AutoTokenizerDeployment.get_handle(sync=False)
        self.sentence_splitter = SplitSentencesDeployment.get_handle(sync=False)
        self.model = ModelDeployment.get_handle(sync=False)

    @app.post(
        "/predict",
        # response_model=List[CompositeModelOutput],
        # throws an error due to bug in Ray
        status_code=status.HTTP_200_OK,
    )
    async def predict(
        self,
        payload: ModelInput,
    ) -> List[CompositeModelOutput]:
        text = payload.text
        sentences = await (await self.sentence_splitter.remote(text))
        tokenized_sentence_ids = [
            self.tokenizer.remote(sentence) for sentence in sentences
        ]
        tokenized_sentence_refs = await asyncio.gather(*tokenized_sentence_ids)
        output_ids = [
            self.model.remote(tokenized_sentence)
            for tokenized_sentence in tokenized_sentence_refs
        ]
        predictions = ray.get(await asyncio.gather(*output_ids))
        return [
            {
                "sentence": sentence,
                "news_type": class_name,
                "score": float(
                    probability,
                ),
            }
            for sentence, (probability, class_name) in zip(sentences, predictions)
        ]

Great work on the updates! Yes, the asyncio.gather() looks like the right approach.

Another thing that I noticed is that await (await remote_func) != ray.get(await remote_func) . await (await remote_func) only works if you run it on a single ObjectRef , if you run it on a List[ObjectRef] , if will throw an error. So I had to run predictions = ray.get(await asyncio.gather(*output_ids)) to get the final predictions from the output_ids which was of type List[ObjectRef] .

I think one other approach you could try here is a nested asyncio.gather(). Instead of running

predictions = ray.get(await asyncio.gather(*output_ids))

you could try

predictions = await asyncio.gather(*(await asyncio.gather(*output_ids)))

That way, you never block the execution thread with ray.get().

1 Like

Thanks! Is there any place in the current documentation where I could find out more about this?

Yeah, the Ray documentation has a some info about using asyncio with actors:

1 Like