Unstable actors on GPU

Hey folks,

I’ve been trying to solve a simple batch inference problem using ray actors:

  1. Create 8 actors each owning separate GPU
  2. Get a list of files
  3. Submit a ray task that accepts batch of files, loads them and then uses one of the actors to calculate embeddings for each of the files.

I can get it to work more or less reliably with 1-2 actors. Whenever I increase number of actors, things become very shaky and fail un-deterministically. The only trend I see is the more actors I add, the sooner the failure happens.

WIth 8 - it almost fails immediately, with 4 actors it fails after processing couple of hundred thousand files.

I’m running ray locally on a single node with 8 x A100 80GB gpus. The model that generates embeddings doesn’t take more than couple of GB of memory. So, I’m really puzzled.

Anyone can suggest what I’m doing wrong here?

Here is the most often error message about actor being unavailable:

[36mray::process_batch()e[39m (pid=3819074, ip=10.18.105.70)
  File "/nfs/zhanibek/actor_embeddings/batch_inference_v2.py", line 56, in process_batch
    embeddings = ray.get(actor.get_embeddings.remote(texts))
ray.exceptions.ActorUnavailableError: The actor b6099cf974c635fbd9a61ef601000000 is unavailable: The actor is temporarily unavailable: RpcError: RPC Error message: Socket closed; RPC Error details: . The task may or maynot have been executed on the actor.

Here is the code snippet:

import ray
import gc
import torch
import numpy as np
import os
import time
import pandas as pd

from sentence_transformers import SentenceTransformer

# Been trying to increase timeout related parameters, but doesn't seem to help.
ray.init(num_gpus=8, 
    _system_config={
        "gcs_rpc_server_connect_timeout_s": 60, 
        "gcs_server_request_timeout_seconds": 120, 
        "py_gcs_connect_timeout_s": 120,
        "health_check_timeout_ms": 200000,
    }
)

NUM_ACTORS = 8

@ray.remote(num_gpus=1, max_restarts=5)
class EmbeddingInferenceActor:
    def __init__(self, gpu_id):
        # self.model = SentenceTransformer("dunzhang/stella_en_400M_v5", trust_remote_code=True, device="cuda")
        self.model = SentenceTransformer("jinaai/jina-embeddings-v3", trust_remote_code=True, device="cuda")
        self.task = "retrieval.passage"

    def get_embeddings(self, texts: list[str]) -> np.ndarray:
        embeddings = self.model.encode(texts, task=self.task, prompt_name=self.task)
        return embeddings

# Generator function to find all files recursively using os.scandir
def find_all_files(directory, pattern=".txt"):
    for entry in os.scandir(directory):
        if entry.is_dir():
            yield from find_all_files(entry.path, pattern)
        else:
            if entry.is_file() and entry.name.endswith(pattern):
                yield entry.path

# Function to process a batch of files
@ray.remote(num_cpus=1, num_gpus=0)
def process_batch(file_paths, actor):
    print("Processing batch of files len:", len(file_paths))
    texts = []
    paths = []
    for file_path in file_paths:
        with open(file_path, 'r') as file:
            texts.append(file.read())
    embeddings = ray.get(actor.get_embeddings.remote(texts))
    results = list(zip(file_paths, embeddings))
    return results

def main():
    num_actors = NUM_ACTORS
    actors = [EmbeddingInferenceActor.remote(i) for i in range(num_actors)]
    
    directory = '/nfs/zhanibek/results-txt'
    print("instantiating file generator...")
    batch_size = 32
    results = []
    batch_results = []
    batch_files = []
    actor_index = 0
    cnt = 0

    print("Starting file processing...")
    for file_path in find_all_files(directory):
        batch_files.append(file_path)
        if len(batch_files) >= batch_size:
            # Distribute the batch to the next actor in round-robin fashion
            print("Processing submitting for remote execution...")
            batch_result = process_batch.remote(batch_files, actors[actor_index])
            batch_results.append(batch_result)
            batch_files = []
            actor_index = (actor_index + 1) % num_actors

        if len(batch_results) >= NUM_ACTORS:
            print("Collecting results...")
            collected_results = ray.get(batch_results.pop(0))
            if collected_results is not None:
                results.extend(collected_results)

            if len(results) >= 10000:
                print("Saving results...")
                df = pd.DataFrame(results, columns=['file_path', 'embedding'])
                df.to_csv(f'results_{cnt}.csv')
                results = []

    # Process any remaining files
    if batch_files:
        batch_result = process_batch.remote(batch_files, actors[actor_index])
        batch_results.append(batch_result)

    # Collect any remaining results
    if batch_results:
        collected_results = ray.get(batch_results)
        for result in collected_results:
            results.extend(result)

    # Save any remaining results
    if results:
        df = pd.DataFrame(results, columns=['file_path', 'embedding'])
        df.to_csv(f'results_final.csv')


if __name__ == "__main__":
    main()
    ray.shutdown()

@rliaw @Sam_Chan any pointers?

Hmm, some follow up questions:

  • Ray / Python versions?
  • Could you share your cluster config details, like how you initialize the cluster?
  • Does the problem replicate on a cloud setup (just for repro); go with some cheap A10s to start (easier to get also)
  • python 3.11, ray 2.37
  • it’s just a local testing, no special cluster setup, i.e. I’m just running ray.init() from my python.
  • I haven’t tried any cloud setup yet for this job yet.

Additional details: I have tried various model and various ways to run inference on each actor, but I still get this issue.

Hey Zhanibek could you provide a sample directory (or a script that generates such a directory) so I can try running this?