- Medium: It contributes to significant difficulty to complete my task, but I can work around it.
Hello Ray Community,
I am seeking assistance to optimize a process where I am inferring a Huggingface model on a Databricks cluster with Ray installed. The goal is to identify if a company name contains a person’s name. The input column consists of company names, which are not long strings.
Cluster Configuration:
- Nodes: 3 worker nodes, 1 head node
- Resources per Node: 1 GPU, 4 CPUs, 28 GiB memory
- Machine Type: standard_NC4as_T4_v3
Current Setup:
For a dataset with 16k records:
- Ray Data Partition: 4
- Concurrency: 4
- Batch Size: 4100
- Performance: The process completes in 1 minute.
For a dataset with 800k records:
- Ray Data Partition: 40
- Batch Size: 200k
- Performance: The process takes around 55 minutes.
Additionally, with the same 800k records and a batch size of 20k, the process still takes around 55 minutes. The dashboard indicates that all GPUs
are utilized to at least 65%.
I would like to know if there is a way to optimize this code further or if adding more resources is the only way to scale the process.
Code:
@ray.remote
def ray_data_task(pandas_df):
# Create a Ray dataset from the Pandas DataFrame
ray_df = ray.data.from_pandas(pandas_df)
# Repartition the dataset to ensure multiple blocks
ray_df = ray_df.repartition(num_blocks=40, shuffle=False)
return ray_df
# Execute the task
result = ray.get(ray_data_task.remote(pandas_df))
# Define a Predictor class for NER inference
class HuggingFaceNERPredictor:
def __init__(self):
model_name = "Jean-Baptiste/roberta-large-ner-english"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForTokenClassification.from_pretrained(model_name)
self.ner_pipeline = pipeline("ner", model=self.model, tokenizer=self.tokenizer, aggregation_strategy="simple", device="cuda:0")
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, list]:
texts = list(batch["norm_legal_business_name"])
predictions = self.ner_pipeline(texts)
batch["ner_output"] = predictions
return batch
# Optimize Ray Dataset map_batches
predictions = result.map_batches(
HuggingFaceNERPredictor,
num_gpus=1, # Assuming each worker uses 1 GPU
batch_size=21000, # Adjust batch size as needed
concurrency=4 # Adjust concurrency based on the number of GPUs
)
res = predictions.to_pandas()