How can I optimize the following process

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Hello Ray Community,

I am seeking assistance to optimize a process where I am inferring a Huggingface model on a Databricks cluster with Ray installed. The goal is to identify if a company name contains a person’s name. The input column consists of company names, which are not long strings.

Cluster Configuration:

  • Nodes: 3 worker nodes, 1 head node
  • Resources per Node: 1 GPU, 4 CPUs, 28 GiB memory
  • Machine Type: standard_NC4as_T4_v3

Current Setup:

For a dataset with 16k records:

  • Ray Data Partition: 4
  • Concurrency: 4
  • Batch Size: 4100
  • Performance: The process completes in 1 minute.

For a dataset with 800k records:

  • Ray Data Partition: 40
  • Batch Size: 200k
  • Performance: The process takes around 55 minutes.

Additionally, with the same 800k records and a batch size of 20k, the process still takes around 55 minutes. The dashboard indicates that all GPUs are utilized to at least 65%.

I would like to know if there is a way to optimize this code further or if adding more resources is the only way to scale the process.

Code:

@ray.remote
def ray_data_task(pandas_df):
    # Create a Ray dataset from the Pandas DataFrame
    ray_df = ray.data.from_pandas(pandas_df)
    # Repartition the dataset to ensure multiple blocks
    ray_df = ray_df.repartition(num_blocks=40, shuffle=False)
    return ray_df

# Execute the task
result = ray.get(ray_data_task.remote(pandas_df))

# Define a Predictor class for NER inference
class HuggingFaceNERPredictor:
    def __init__(self):
        model_name = "Jean-Baptiste/roberta-large-ner-english"
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForTokenClassification.from_pretrained(model_name)
        self.ner_pipeline = pipeline("ner", model=self.model, tokenizer=self.tokenizer, aggregation_strategy="simple", device="cuda:0")

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
        texts = list(batch["norm_legal_business_name"])
        predictions = self.ner_pipeline(texts)        
        batch["ner_output"] = predictions
        return batch

# Optimize Ray Dataset map_batches
predictions = result.map_batches(
    HuggingFaceNERPredictor,
    num_gpus=1,  # Assuming each worker uses 1 GPU
    batch_size=21000,  # Adjust batch size as needed
    concurrency=4  # Adjust concurrency based on the number of GPUs
)

res = predictions.to_pandas()