Resources allocation during serve deployment

Hi,
I have one model which is around 1Gb
And have 16 gb 1 GPU.
In serve deployment i set (num of replicas=4,ray actor options=(num cpus= 2,num gpus=0.25)
But i can not see any improvement when hit this deployment with http request.
May i know where its getting wrong.

Hi @riyaj8888 , how are you determining the improvement? Is it the number of requests per second?

Could you share your mail?
I would like to share code.
I am using model composition with three deployment.
One for preprocessing,model predictions and post-processing respectively.
Its bert based model.
In preprocessing deployment i used tokenizer, model predictions has bert model for text classification and in post-processing just getting category name.
I am using batch prediction logic inside composed model.

from starlette.requests import Request
import numpy as np
import ray,torch,json
from ray import serve
from transformers import AutoTokenizer, XLMRobertaModel,BertModel
import torch.nn as nn
from torch.cuda import amp
import torch,os
from typing import List

device = "cuda" if torch.cuda.is_available() else "cpu"

class XLMRobertaSubject(nn.Module):
    def __init__(self):
        super(XLMRobertaSubject, self).__init__()

        self.robertamodel = XLMRobertaModel.from_pretrained(
            "xlm-roberta-base", output_hidden_states=True,output_attentions=True)

        self.dropout = nn.Dropout(p=0.2)
        self.classifier = nn.Linear(3072, 46)
    
    
    def forward(
        self,
        input_ids,
        attention_mask
    ):
        outputs = self.robertamodel(input_ids, attention_mask)

        hidden_states = outputs[2]
        concat_hidden = torch.cat([hidden_states[i] for i in [-1, -2, -3, -4]], dim=-1)

        out = torch.mean(concat_hidden, 1)

        x = self.dropout(out)
        logits = self.classifier(x)

        return logits


@serve.deployment(num_replicas=2,ray_actor_options={
                "num_cpus":0.5
                    })
class PreProcess_Model:
    def __init__(self):

        self.bert_tokenizer =  AutoTokenizer.from_pretrained('xlm-roberta-base')
        
    def pack_samples(self,ids,masks):
        data = []
        for idx ,mask in zip(ids,masks):
            sample = idx+mask
            data.append(sample)
        return data
    
    def preprocessing(self, text:List[str]):
        
        token_output = self.bert_tokenizer(
                text,
                add_special_tokens=True,
                max_length=512,
                truncation=True,
                truncation_strategy="longest_first",
                pad_to_max_length=True,
                
            
            )
        ids = token_output["input_ids"]
        
        mask = token_output["attention_mask"]
        
        data = self.pack_samples(ids,mask)

        return data
    


@serve.deployment(num_replicas=3,
        ray_actor_options={
                "num_gpus":0.25
                    }
                )
class Predict_Subjects:
    
    def __init__(self):
    # .half should be called before applying .to devie
        with torch.no_grad():
            
            self.bert_model = XLMRobertaSubject().half().to(device)
            self.bert_model.eval()
            self.bert_model.load_state_dict(torch.load("subject_xlmr_checkpoint.pt"))
            
    def unpack_samples(self,data):
    
        ids = [d[len(d)//2:] for d in data]
        mask = [d[len(d)//2:] for d in data]

        return [ids,mask]
    
    def classification(self,tokenized_input):
        
        ids , mask = self.unpack_samples(tokenized_input)

        ids =  torch.tensor(np.array(ids))
        mask = torch.tensor(np.array(mask))
        input_ids = ids.squeeze(1)

        logits = self.bert_model(input_ids.to(device),mask.to(device))
        
        subject_prob =  nn.functional.softmax(logits, dim=-1)#np.array(logits).argmax(axis=1).tolist()
        subject_prob = subject_prob.cpu().detach().numpy().tolist()

        return subject_prob


@serve.deployment
class PostProcess_Model:
    def __init__(self,cat2sub_map,top_n,sub2lab):

        self.category_mapping = json.load(open(cat2sub_map,"r"))
        self.id2sub =  {val:key for key,val in json.load(open(sub2lab,"r")).items()}
        self.top_n = top_n
        self.num_requests = 1

    def round_probs(self,x):
        return round(x,4)

    def pack_results(self,data1,data2):
        data = []
        for d1 ,d2 in zip(data1,data2):
            sample = d1+d2
            data.append(sample)
        return data

    def postprocessing(self, subject_prob):
       
        top_n_preds_int = np.argsort(np.array(subject_prob),axis=1)[:,-top_n:]
        top_n_preds_probs = np.sort(np.array(subject_prob),axis=1)[:,-top_n:]

        top_subjects_probs_round = []
        top_subjects_names = []
        top_subjects_catgory = []

        for prob,idx in zip(top_n_preds_probs,top_n_preds_int):

            round_probs = list(map(self.round_probs,prob))

            top_subjects_probs_round.append(round_probs)

            top_subjects_names.append([self.id2sub[i] for i in idx])

            top_subjects_catgory.append([self.category_mapping[self.id2sub[i]] for i in idx] )
            
        R = self.pack_results(top_subjects_names,top_subjects_probs_round)

        R = self.pack_results(R,top_subjects_catgory)

        return R #top_subjects_names

@serve.deployment(route_prefix="/compose")
class compose_models:

    def __init__(self,PreProcess_Model,Predict_Subjects,PostProcess_Model):

        self.PreProcess_Model = PreProcess_Model
        self.Predict_Subjects = Predict_Subjects
        self.PostProcess_Model = PostProcess_Model

    @serve.batch(max_batch_size=6)
    async def handle_batch(self, english_text:List[str])->List:
        
        print("Our input array has length:", len(english_text),english_text)

        token_output_ref = await self.PreProcess_Model.preprocessing.remote(english_text)

        token_output = ray.get(token_output_ref)

        subject_ref = await self.Predict_Subjects.classification.remote(token_output)

        probs = ray.get(subject_ref)
        
        results_ref = await self.PostProcess_Model.postprocessing.remote(probs)
        
        results = ray.get(results_ref)

        # print("results",results)

        return results
    
    

    # async def __call__(self, request: Request) -> List:
    #     return await self.handle_batch(request.query_params["text"])

    async def __call__(self, request: Request) -> List:

        text = await request.json()

        return await self.handle_batch(text)



top_n = 1
sub2lab = "d1.json"
cat2sub_map = "d2.json"



deployment_graph = compose_models.bind(PreProcess_Model.bind(),Predict_Subjects.bind(),PostProcess_Model.bind(cat2sub_map,top_n,sub2lab))


# serve run graph_deploy_batch:deployment_graph   --port 8088  #--host 0.0.0.0

I am facing two undesired issues:
1 . one model takes around 1613MB gpu memory , in my deployment i have created 3 replicas of deployment with gpu so total memory is 3*1613 MB.
but when i am hitting deployment with single request with one sample its memory increased by 600MB approx.
same when i tried with 2 sample again its memory increased by 500-600mb approx.
it goes on increasing proportionally with number of sample per request.

  1. even though i am using batching in deployment ,my requests were handled sequentially after max_batch request. is there any issue in my above code .
    because if i am creating multiple replicas of preprocessing deployment it is doing it sequentially i feel.
    where is the issue in above deployment graph?

thanks

when i tried to load test using locust its around 2rps only which is not what i am expecting ?
could u test above code as it is so u will get idea.