Versions / Dependencies
vLLM version = 0.5.5
rayVersion = 2.24.0
I am trying to create vLLM inferencing using Ray Serve on GCP GKE 4*L4 GPU Instance. Here is my ray config file:
apiVersion: ray.io/v1
kind: RayService
metadata:
name: text2sql
spec:
serveConfigV2: |
applications:
- name: text2sql
route_prefix: /
import_path: serve:model
deployments:
- name: VLLMDeployment
max_ongoing_requests: 100
autoscaling_config:
target_ongoing_requests: 1
min_replicas: 1
max_replicas: 4
ray_actor_options:
num_cpus: 1.0
num_gpus: 2.0
runtime_env:
setup_timeout_seconds: 1800
pip: ["vllm==0.5.5"]
env_vars:
MODEL_LOCAL_PATH: "Final Model"
TENSOR_PARALLELISM: "2"
rayClusterConfig:
rayVersion: '2.24.0'
enableInTreeAutoscaling: true
autoscalerOptions:
upscalingMode: Default
imagePullPolicy: IfNotPresent
idleTimeoutSeconds: 300
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
metadata:
labels:
ai.gke.io: rayserve
spec:
containers:
- name: ray-head
image: public.ecr.aws/XXXXXXXX:latest
resources:
limits:
cpu: "2"
memory: "8Gi"
requests:
cpu: "2"
memory: "8Gi"
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
- containerPort: 8000
name: serve
workerGroupSpecs:
- replicas: 1
groupName: gpu-group
rayStartParams: {}
template:
metadata:
labels:
ai.gke.io: rayserve
spec:
containers:
- name: text2opsql
image: public.ecr.aws/XXXXXXXX:latest
resources:
limits:
cpu: "8"
memory: "16Gi"
nvidia.com/gpu: "2"
requests:
cpu: "8"
memory: "16Gi"
nvidia.com/gpu: "2"
nodeSelector:
cloud.google.com/gke-accelerator: nvidia-l4
When i try to apply this yaml file, I am getting below error.
INFO 2024-09-19 08:23:22,415 controller 528 proxy_state.py:758 - Removing drained proxy on node 'd877a59f7f2701f2eb42c80be8074cd75ff9bc6c8a575ca0974daffe'.
INFO 2024-09-19 08:23:42,507 controller 528 deployment_state.py:1844 - Adding 1 replica to Deployment(name='VLLMDeployment', app='text2opsql').
INFO 2024-09-19 08:23:42,508 controller 528 deployment_state.py:401 - Starting Replica(id='352yge88', deployment='VLLMDeployment', app='text2opsql').
INFO 2024-09-19 08:23:53,748 controller 528 proxy_state.py:151 - Starting proxy on node 'd877a59f7f2701f2eb42c80be8074cd75ff9bc6c8a575ca0974daffe' listening on '0.0.0.0:8000'.
ERROR 2024-09-19 08:23:55,732 controller 528 deployment_state.py:681 - Exception in Replica(id='352yge88', deployment='VLLMDeployment', app='text2opsql'), the replica will be stopped.
Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/deployment_state.py", line 677, in check_ready
_, self._version, self._initialization_latency_s = ray.get(
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/auto_init_hook.py", line 21, in auto_init_wrapper
return fn(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
return func(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/worker.py", line 2613, in get
values, debugger_breakpoint = worker.get_objects(object_refs, timeout=timeout)
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/_private/worker.py", line 861, in get_objects
raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(RuntimeError): ray::ServeReplica:text2opsql:VLLMDeployment.initialize_and_get_metadata() (pid=12819, ip=10.20.3.31, actor_id=b1d2e8c1218640a96c989fca01000000, repr=<ray.serve._private.replica.ServeReplica:text2opsql:VLLMDeployment object at 0x7c67cebcf760>)
File "/home/ray/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 439, in result
return self.__get_result()
File "/home/ray/anaconda3/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 630, in initialize_and_get_metadata
raise RuntimeError(traceback.format_exc()) from None
RuntimeError: Traceback (most recent call last):
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 608, in initialize_and_get_metadata
await self._user_callable_wrapper.initialize_callable()
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 888, in initialize_callable
await self._call_func_or_gen(
File "/home/ray/anaconda3/lib/python3.9/site-packages/ray/serve/_private/replica.py", line 854, in _call_func_or_gen
result = callable(*args, **kwargs)
File "/src/serve.py", line 68, in __init__
self.engine = AsyncLLMEngine.from_engine_args(args)
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 740, in from_engine_args
engine = cls(
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 636, in __init__
self.engine = self._init_engine(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 840, in _init_engine
return engine_class(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/async_llm_engine.py", line 272, in __init__
super().__init__(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/engine/llm_engine.py", line 270, in __init__
self.model_executor = executor_class(
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/multiproc_gpu_executor.py", line 215, in __init__
super().__init__(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/distributed_gpu_executor.py", line 25, in __init__
super().__init__(*args, **kwargs)
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/executor_base.py", line 46, in __init__
self._init_executor()
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/multiproc_gpu_executor.py", line 138, in _init_executor
self._run_workers("load_model",
File "/home/ray/anaconda3/lib/python3.9/site-packages/vllm/executor/multiproc_gpu_executor.py", line 175, in _run_workers
raise NotImplementedError(
NotImplementedError: max_concurrent_workers is not supported yet.
Reproduction script
import json
import logging
import os
import time
from typing import AsyncGenerator
from fastapi import BackgroundTasks
from ray import serve
from starlette.requests import Request
from starlette.responses import StreamingResponse, Response, JSONResponse
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.sampling_params import SamplingParams
from vllm.utils import random_uuid
# from vllm.utils import iterate_with_cancellation
# Environment and configuration setup
logger = logging.getLogger("ray.serve")
@serve.deployment(name="VLLMDeployment")
class VLLMDeployment:
def __init__(self, **kwargs):
logger.info("Started Loading the model")
args = AsyncEngineArgs(
model=str(os.getenv("MODEL_LOCAL_PATH", "Final Model")),
# Model identifier from Hugging Face Hub or local path.
dtype=str(os.getenv("MODEL_DTYPE", "auto")),
# Automatically determine the data type (e.g., float16 or float32) for model weights and computations.
gpu_memory_utilization=float(os.getenv("GPU_MEMORY_UTILIZATION", "0.9")),
# Percentage of GPU memory to utilize, reserving some for overhead.
max_model_len=int(os.getenv("MAX_MODEL_LEN", "4096")),
# Maximum sequence length (in tokens) the model can handle, including both input and output tokens.
max_num_seqs=int(os.getenv("MAX_NUM_SEQ", "512")),
# Maximum number of sequences (requests) to process in parallel.
max_num_batched_tokens=int(os.getenv("MAX_NUM_BATCHED_TOKENS", "32768")),
# Maximum number of tokens processed in a single batch across all sequences (max_model_len * max_num_seqs).
trust_remote_code=True, # Allow execution of untrusted code from the model repository (use with caution).
enable_chunked_prefill=False, # Disable chunked prefill to avoid compatibility issues with prefix caching.
max_parallel_loading_workers=int(os.getenv("PARALLEL_LOADING_WORKERS", 2)),
# Number of parallel workers to load the model concurrently.
pipeline_parallel_size=int(os.getenv("PIPELINE_PARALLELISM", 1)),
# Number of pipeline parallelism stages; typically set to 1 unless using model parallelism.
tensor_parallel_size=int(os.getenv("TENSOR_PARALLELISM", 1)),
# Number of tensor parallelism stages; typically set to 1 unless using model parallelism.
enable_prefix_caching=True, # Enable prefix caching to improve performance for similar prompt prefixes.
quantization=os.getenv("QUANTIZATION", None), # Model Quantization
enforce_eager=True,
disable_log_requests=True,
)
self.engine = AsyncLLMEngine.from_engine_args(args)
self.max_model_len = args.max_model_len
logger.info("Loaded the VLLM Model")
logger.info(f"VLLM Engine initialized with max_model_len: {self.max_model_len}")
async def stream_results(self, results_generator) -> AsyncGenerator[bytes, None]:
num_returned = 0
async for request_output in results_generator:
text_outputs = [output.text for output in request_output.outputs]
assert len(text_outputs) == 1
text_output = text_outputs[0][num_returned:]
ret = {"text": text_output}
yield (json.dumps(ret) + "\n").encode("utf-8")
num_returned += len(text_output)
async def may_abort_request(self, request_id) -> None:
await self.engine.abort(request_id)
async def __call__(self, request: Request) -> Response:
try:
request_dict = await request.json()
except json.JSONDecodeError:
return JSONResponse(status_code=400, content={"error": "Invalid JSON in request body"})
context_length = request_dict.pop("context_length", 8192) # Default to 8k
# Ensure context length is either 8k or 32k
if context_length not in [8192, 32768]:
context_length = 8192 # Default to 8k if invalid
text = request_dict.pop("text")
stream = request_dict.pop("stream", False)
default_sampling_params = {
"temperature": 0,
"max_tokens": 256,
"stop": ['}'],
'include_stop_str_in_output': True
}
# Get model config and tokenizer
model_config = await self.engine.get_model_config()
tokenizer = await self.engine.get_tokenizer()
# input_token_ids = tokenizer.encode(prompt)
# input_tokens = len(input_token_ids)
# max_possible_new_tokens = min(context_length, model_config.max_model_len) - input_tokens
# max_new_tokens = min(request_dict.get("max_tokens", 8192), max_possible_new_tokens)
sampling_params = {**default_sampling_params, **request_dict}
sampling_params = SamplingParams(**sampling_params)
request_id = random_uuid()
start_time = time.time()
logger.info('Started processing request with id: {} and text: {}'.format(request_id, text))
results_generator = self.engine.generate(prompt, sampling_params, request_id)
# results_generator = iterate_with_cancellation(
# results_generator, is_cancelled=request.is_disconnected)
if stream:
background_tasks = BackgroundTasks()
# Using background_tasks to abort the request
# if the client disconnects.
background_tasks.add_task(self.may_abort_request, request_id)
return StreamingResponse(
self.stream_results(results_generator), background=background_tasks
)
# Non-streaming case
final_output = None
async for request_output in results_generator:
if await request.is_disconnected():
# Abort the request if the client disconnects.
await self.engine.abort(request_id)
logger.warning(f"Client disconnected for request {request_id}")
return Response(status_code=499)
final_output = request_output
assert final_output is not None
ret = {"results": json.loads(final_output)}
ret = {"results": json.loads(final_output)}
logger.info('Completed processing request with id: {} in {} secs'.format(
request_id,
time.time() - start_time
))
return Response(content=json.dumps(ret))
model = VLLMDeployment.bind()
Same setup works if Tensor_parallelism = 1 and num_gpus: 1.0. But Issue occurs which i increase tp > 1 and num_gpus > 1.0.
I need to figure out a solution to this as this issue as soon as possible. I have tried with multiple vLLM version and had no luck. Can anyone please help me with this issue?