Hey folks,
I’ve been trying to solve a simple batch inference problem using ray actors:
- Create 8 actors each owning separate GPU
- Get a list of files
- Submit a ray task that accepts batch of files, loads them and then uses one of the actors to calculate embeddings for each of the files.
I can get it to work more or less reliably with 1-2 actors. Whenever I increase number of actors, things become very shaky and fail un-deterministically. The only trend I see is the more actors I add, the sooner the failure happens.
WIth 8 - it almost fails immediately, with 4 actors it fails after processing couple of hundred thousand files.
I’m running ray locally on a single node with 8 x A100 80GB gpus. The model that generates embeddings doesn’t take more than couple of GB of memory. So, I’m really puzzled.
Anyone can suggest what I’m doing wrong here?
Here is the most often error message about actor being unavailable:
[36mray::process_batch()e[39m (pid=3819074, ip=10.18.105.70)
File "/nfs/zhanibek/actor_embeddings/batch_inference_v2.py", line 56, in process_batch
embeddings = ray.get(actor.get_embeddings.remote(texts))
ray.exceptions.ActorUnavailableError: The actor b6099cf974c635fbd9a61ef601000000 is unavailable: The actor is temporarily unavailable: RpcError: RPC Error message: Socket closed; RPC Error details: . The task may or maynot have been executed on the actor.
Here is the code snippet:
import ray
import gc
import torch
import numpy as np
import os
import time
import pandas as pd
from sentence_transformers import SentenceTransformer
# Been trying to increase timeout related parameters, but doesn't seem to help.
ray.init(num_gpus=8,
_system_config={
"gcs_rpc_server_connect_timeout_s": 60,
"gcs_server_request_timeout_seconds": 120,
"py_gcs_connect_timeout_s": 120,
"health_check_timeout_ms": 200000,
}
)
NUM_ACTORS = 8
@ray.remote(num_gpus=1, max_restarts=5)
class EmbeddingInferenceActor:
def __init__(self, gpu_id):
# self.model = SentenceTransformer("dunzhang/stella_en_400M_v5", trust_remote_code=True, device="cuda")
self.model = SentenceTransformer("jinaai/jina-embeddings-v3", trust_remote_code=True, device="cuda")
self.task = "retrieval.passage"
def get_embeddings(self, texts: list[str]) -> np.ndarray:
embeddings = self.model.encode(texts, task=self.task, prompt_name=self.task)
return embeddings
# Generator function to find all files recursively using os.scandir
def find_all_files(directory, pattern=".txt"):
for entry in os.scandir(directory):
if entry.is_dir():
yield from find_all_files(entry.path, pattern)
else:
if entry.is_file() and entry.name.endswith(pattern):
yield entry.path
# Function to process a batch of files
@ray.remote(num_cpus=1, num_gpus=0)
def process_batch(file_paths, actor):
print("Processing batch of files len:", len(file_paths))
texts = []
paths = []
for file_path in file_paths:
with open(file_path, 'r') as file:
texts.append(file.read())
embeddings = ray.get(actor.get_embeddings.remote(texts))
results = list(zip(file_paths, embeddings))
return results
def main():
num_actors = NUM_ACTORS
actors = [EmbeddingInferenceActor.remote(i) for i in range(num_actors)]
directory = '/nfs/zhanibek/results-txt'
print("instantiating file generator...")
batch_size = 32
results = []
batch_results = []
batch_files = []
actor_index = 0
cnt = 0
print("Starting file processing...")
for file_path in find_all_files(directory):
batch_files.append(file_path)
if len(batch_files) >= batch_size:
# Distribute the batch to the next actor in round-robin fashion
print("Processing submitting for remote execution...")
batch_result = process_batch.remote(batch_files, actors[actor_index])
batch_results.append(batch_result)
batch_files = []
actor_index = (actor_index + 1) % num_actors
if len(batch_results) >= NUM_ACTORS:
print("Collecting results...")
collected_results = ray.get(batch_results.pop(0))
if collected_results is not None:
results.extend(collected_results)
if len(results) >= 10000:
print("Saving results...")
df = pd.DataFrame(results, columns=['file_path', 'embedding'])
df.to_csv(f'results_{cnt}.csv')
results = []
# Process any remaining files
if batch_files:
batch_result = process_batch.remote(batch_files, actors[actor_index])
batch_results.append(batch_result)
# Collect any remaining results
if batch_results:
collected_results = ray.get(batch_results)
for result in collected_results:
results.extend(result)
# Save any remaining results
if results:
df = pd.DataFrame(results, columns=['file_path', 'embedding'])
df.to_csv(f'results_final.csv')
if __name__ == "__main__":
main()
ray.shutdown()