I have a class for a Model which creates different pipeline based on the attribute it receives in the constructor.
"""
#Pipeline.py
Pipeline Constructor for Model
Methods:
--------
run (Class: Classifier): runs the pipeline on a set of image, words and word bounding boxes.
"""
import os
import torch
from transformers import BertTokenizer
from .preprocessor import Preprocessor
from .predictor import Predictor
import ray
@ray.remote(num_cpus=1)
class NER:
"""
NER Class for constructing the pipeline composed of the run method provided by the module
"""
def __init__(self, region):
self.region = region
self.labels = open( "classes.txt")).read().strip().split("\n")
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.tokenizer = BertTokenizer.from_pretrained('')
self.preprocessor = Preprocessor(self.device, self.labels, self.tokenizer, region)
self.predictor = Predictor(region, self.device, self.labels, self.tokenizer)
def run(self, _img, _words, bboxes, text_lines):
"""
Takes in a set of an img along with words and bounding boxes to return the words and its labels
Returns:
--------
processed_output (dict): output object containing all entities, which can be outputted as JSON.
"""
encoded_input = self.preprocessor.run(_img, _words, bboxes)
predictor_output = self.predictor.run(encoded_input)
return predictor_output
Predictor.py
"""
Predictor Module that Loads the Model and produces output from encoded input provided by the preprocessor
"""
import os
import torch
import numpy as np
import pandas as pd
from .architecture import Model
from timing import Timer
class Predictor:
"""
Main class that provides the funtionality of running the model on a encoded sequence input
"""
def __init__(self, region, device, labels, tokenizer):
"""
Initialization Method
Arguments:
----------
device (torch.device): torch.device object that the pipeline constructed
labels (list): list of labels coming from the pipeline for classification
tokenizer: tokenizer loaded from transformers, loaded in pipeline
"""
self.tokenizer = tokenizer
self.labels = labels
self.device = device
self.model = Model(number_of_classes=len(labels))
self.region = region
model_path = os.path.join(MODEL_BASE_DIR, "models", str(region) + ".pth" )
self.model.load_state_dict(
torch.load(model_path, map_location=device), strict=False
)
self.model.to(self.device)
self.model.eval()
def run(self, encoded_input):
"""
Main run method that is self contained with model object, tokenizer object and labels for producing prediictions
"""
outputs = self.model(*input goes here)
return self.format_output(encoded_input, (outputs[0].cpu(), outputs[1].cpu()))
#__init__.py
from .pipeline import NER
regions = ["a", "b"]
model_dict = {}
for reg in regions:
model_dict[reg] = NER.remote(reg)
The predictor.py file uses the architecture built in architecture.py to return predictions after running the input through it. Everytime a model for a new region is trained this is the class used for loading and serving that model. The pipeline.py class is being used as an Actor and is initialized by the init.py file:
The initialized actors are imported into the serve file and ray then starts running the actor.init() infinitely on all the process until the PC crashes. It worked perfectly if the Actors were initialized in the constructor of Model Comp Serve Class. I don’t understand the reason for the infinite initialization of the package.
from package import model_dict
@serve.deployment(max_concurrent_queries=10, route_prefix="/Model")
class ModelComp:
def __init__(self):
self.ner = model_dict
async def __call__(self, starlette_request):
data = await starlette_request.form()
image_payload_bytes = await data["file"].read()
pil_image = Image.open(BytesIO(image_payload_bytes))
#Feed to Model and return output