Hi guys, I’m new to Ray and I’m facing issues with concurrency while using ray data’s map_batches. I’m trying to launch 8 vLLM instances in parallel. At first it tries to launch 8 actors, then the model loading starts of each of the GPUs. When the model is fully loaded, all except one actor die spontaneously. Then the whole dataset is processed on that one actor. There’s no errors in the log and no out-of-memory issues. I have also tried the exact code from ray docs and it has the same problem for concurrency > 1.My code is pretty straightforward and based on the code available in ray docs.My environment is:
8xT4 (15GB) GPUs; 96vCPUs
Ray version: 2.49.1
vLLM: 0.7.2
torch: 2.5.1+cu124
import ray
from datasets import load_dataset
from vllm import LLM, SamplingParams
import numpy as np
import re
class Model:
def __init__(self):
self.llm = LLM(
model="Qwen/Qwen2.5-0.5B-Instruct",
dtype="half",
gpu_memory_utilization=0.6,
tensor_parallel_size=1,
distributed_executor_backend="mp",
enable_chunked_prefill=True,
enable_prefix_caching=True,
max_model_len=2048,
)
self.tokenizer = self.llm.get_tokenizer()
self.sampling_params = SamplingParams(
temperature=0.8,
top_p=0.95,
max_tokens=512,
)
def extract_solution(self, answer):
solution = re.search("#### (\\-?[0-9\\.\\,]+)", answer)
if solution is None:
return "0"
final_solution = solution.group(0)
final_solution = final_solution.split("#### ")[1].replace(",", "")
return final_solution
def __call__(self, batch: dict[str, np.ndarray]) -> dict[str, np.ndarray]:
questions = batch["question"].tolist()
answers = batch["answer"].tolist()
ground_truths = [self.extract_solution(answer) for answer in answers]
prompts = [
self.tokenizer.apply_chat_template([
{"role": "system", "content": "Let's think step by step and output the final answer after '####'"},
{"role": "user", "content": question}
],
tokenize=False, add_generation_prompt=True)
for question in questions
]
outputs = self.llm.generate(
prompts,
sampling_params=self.sampling_params
)
batch["generated_text"] = np.array([out.outputs[0].text for out in outputs])
batch["model_answer"] = np.array([self.extract_solution(out.outputs[0].text) for out in outputs])
batch["ground_truth"] = np.array(ground_truths)
return batch
def main():
# Initialize Ray with both GPUs
ray.init(num_cpus=64, num_gpus=8)
print("Ray cluster resources:", ray.cluster_resources())
ds_test = ray.data.from_huggingface(load_dataset("openai/gsm8k", "main", split="test"))
predictions = ds_test.map_batches(
Model,
batch_size=128,
num_cpus=4,
num_gpus=1, # Each worker uses 1 GPU with reduced memory utilization
concurrency=8,
)
predictions.show(limit=1)
if __name__ == "__main__":
main()