Using initialized ray actors with serve deployment

I have a class for a Model which creates different pipeline based on the attribute it receives in the constructor.

"""
#Pipeline.py
Pipeline Constructor for Model

Methods:
--------
run (Class: Classifier): runs the pipeline on a set of image, words and word bounding boxes. 
"""

import os
import torch

from transformers import BertTokenizer

from .preprocessor import Preprocessor
from .predictor import Predictor

import ray

@ray.remote(num_cpus=1)
class NER:
    """
    NER Class for constructing the pipeline composed of the run method provided by the module
    """
    def __init__(self, region):
        self.region = region
        self.labels = open( "classes.txt")).read().strip().split("\n")
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.tokenizer =  BertTokenizer.from_pretrained('')
        self.preprocessor = Preprocessor(self.device, self.labels, self.tokenizer, region)
        self.predictor = Predictor(region, self.device, self.labels, self.tokenizer)

    def run(self, _img, _words, bboxes, text_lines):
        """
        Takes in a set of an img along with words and bounding boxes to return the words and its labels

        Returns:
        --------
            processed_output (dict): output object containing all entities, which can be outputted as JSON.
        """
        encoded_input = self.preprocessor.run(_img, _words, bboxes)
        predictor_output = self.predictor.run(encoded_input)

        return predictor_output

Predictor.py

"""
Predictor Module that Loads the Model and produces output from encoded input provided by the preprocessor
"""

import os
import torch
import numpy as np
import pandas as pd

from .architecture import Model
from timing import Timer


class Predictor:
    """
    Main class that provides the funtionality of running the model on a encoded sequence input 
    """
    def __init__(self, region, device, labels, tokenizer):
        """
        Initialization Method

        Arguments:
        ----------
            device (torch.device): torch.device object that the pipeline constructed

            labels (list): list of labels coming from the pipeline for classification
            
            tokenizer: tokenizer loaded from transformers, loaded in pipeline
        """
        self.tokenizer = tokenizer
        self.labels = labels
        self.device = device
        self.model = Model(number_of_classes=len(labels))
        self.region = region

        model_path = os.path.join(MODEL_BASE_DIR, "models", str(region) + ".pth" )

        self.model.load_state_dict(
            torch.load(model_path, map_location=device), strict=False
        )
        self.model.to(self.device)
        self.model.eval()

    def run(self, encoded_input):
        """
        Main run method that is self contained with model object, tokenizer object and labels for producing prediictions
       """
        outputs = self.model(*input goes here)
        return self.format_output(encoded_input, (outputs[0].cpu(), outputs[1].cpu()))
        

#__init__.py
from .pipeline import NER
regions = ["a", "b"]
model_dict = {}
for reg in regions:
      model_dict[reg] = NER.remote(reg)

The predictor.py file uses the architecture built in architecture.py to return predictions after running the input through it. Everytime a model for a new region is trained this is the class used for loading and serving that model. The pipeline.py class is being used as an Actor and is initialized by the init.py file:

The initialized actors are imported into the serve file and ray then starts running the actor.init() infinitely on all the process until the PC crashes. It worked perfectly if the Actors were initialized in the constructor of Model Comp Serve Class. I don’t understand the reason for the infinite initialization of the package.

from package import model_dict
@serve.deployment(max_concurrent_queries=10, route_prefix="/Model")
class ModelComp:
    def __init__(self):
        self.ner = model_dict
        
    async def __call__(self, starlette_request):
        data = await starlette_request.form()
        image_payload_bytes = await data["file"].read()
        pil_image = Image.open(BytesIO(image_payload_bytes))

        #Feed to Model and return output

I’m wondering if this is caused by the num_cpus=1. By default, Serve also allocates 1 CPU for each deployment replica. Could you try explicitly making num_cpus=0.6 for the deployment and the NER?

If that doesn’t work, could you check whether __init__.py runs as a standalone script (without being imported)?

Could you also post the code where it works when the actors get initialized in the deployment constructor?

This is the case where it does work:

from package.Pipeline import NER
@serve.deployment(max_concurrent_queries=10, route_prefix="/Model")
class ModelComp:
    def __init__(self):
        self.regions = ['a', 'b']
        self.ner = {reg: NER.remote(reg) for reg in self.regions} 
        
    async def __call__(self, starlette_request):
        data = await starlette_request.form()
        image_payload_bytes = await data["file"].read()
        pil_image = Image.open(BytesIO(image_payload_bytes))

        #Feed to Model and return output

Changing the num_cpus did not make a difference.

BTW I would like to point out that the problem occurs before the deployment class is reached. So I am running this via .ipynb and so this occurs even before running the @serve.deployment class cell. Simply importing into the ipynb results in this wild spawning of processes. I am using huggingface transformers library in the predictor to load a pretrained model and so when I import the NER pipeline via the init it starts downloading. In a matter of second multiple parallel processes are spawned each downloading from the very start.

Do you have any idea how I would go around using the NER Class as a @serve.deployment instead. but launch different instances of it based on the region configuration. Launching different instances with different configuration for the Actor works but I don’t know if I could do something similar with a deployment.

I am adding a snapshot of the ray dashboard upon simply importing this module into a .ipynb and the hell that breaks loose.

Got it, thanks for the extra detail. Here’s an example NER deployment implementation:

@serve.deployment(
    ray_actor_options={"num_cpus": 1}
)
class NER:
    """
    NER Class for constructing the pipeline composed of the run method provided by the module
    """
    def __init__(self, region):
        self.region = region
        self.labels = open( "classes.txt")).read().strip().split("\n")
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.tokenizer =  BertTokenizer.from_pretrained('')
        self.preprocessor = Preprocessor(self.device, self.labels, self.tokenizer, region)
        self.predictor = Predictor(region, self.device, self.labels, self.tokenizer)

    def run(self, _img, _words, bboxes, text_lines):
        """
        Takes in a set of an img along with words and bounding boxes to return the words and its labels

        Returns:
        --------
            processed_output (dict): output object containing all entities, which can be outputted as JSON.
        """
        encoded_input = self.preprocessor.run(_img, _words, bboxes)
        predictor_output = self.predictor.run(encoded_input)

        return predictor_output

NER.options(init_args=[region]).deploy()  # Pass in the region here 

Note that the only difference between this and the actor is

  1. The decorator
  2. The deploy statement at the end

One hacky solution for the initial actor implementation is to use ray.put(). Instead of taking in just region in NER's __init__() function, you can take in labels, device, tokenizer, preprocessor, and predictor directly.

Then, in the __init__.py file, you can calculate these values for each region. After getting the values, you can put them in the Ray object store with ray.put(), and you can pass the references returned by ray.put() into a new NER actor’s __init__() function. In the that function, you can set all the attributes (such as self.Preprocessor) by calling ray.get() on these references.

This isn’t all that clean, but it may solve the issue where lots of processes are being spawned by moving the downloading outside the actors.

NER.options(init_args=["A"]).deploy()  # Pass in the region here 
NER.options(init_args=["B"]).deploy()  # Pass in the region here 

Will the above result in 2 instances of the NER class deploying with the init regions? Secondly how would I get handle to either of them. Plus how would I differentiate between the handle I would get back?

Good question– you also need to add a name for each deployment:

# NER deployment instance with "A" passed in as region
NER.options(name="region_A_deployment", init_args=["A"]).deploy()

# NER deployment instance with "B" passed in as region
NER.options(name="region_B_deployment", init_args=["B"]).deploy()

This name is also how you can get a handle to the deployment:

# Get deployment_A's handle:
deployment_A = serve.get_deployment("region_A_deployment")
handle_to_A = deployment_A.get_handle()

# Call deployment_A's run function:
ray.get(handle_to_A.run.remote("example_img", "example_words", "example_bboxes", "example_text_lines"))

# Get deployment B's handle:
handle_to_B = serve.get_deployment("region_B_deployment").get_handle()

For more info on this, check out Serve’s documentation about deployments.

Thanks @shrekris. You have been great help. :smiley:

You’re welcome, glad I could help!