Sure, here is my working code for Ray Serve. The deployment looks like:
@serve.deployment(name="AsyncLLMEngine", ray_actor_options={"num_gpus": 1, "num_cpus": 4})
class AsyncLLMEngineDeployment:
"""
Ray serve deployment based on VLLM AsyncLLMEngine. We create this
SO that we can easily scale this and wire it with other serving components
"""
def __init__(
self,
engine_args: AsyncEngineArgs,
):
# create logger
self.logger = logging.getLogger("async_llm_support")
# create engine
self.engine = AsyncLLMEngine.from_engine_args(engine_args)
self.logger.debug("Created AsyncLLMEngine")
....
async def generate(
self,
prompt: PromptType,
sampling_params: SamplingParams,
request_id: str,
lora_request: Optional[LoRARequest] = None,
trace_headers: Optional[Mapping[str, str]] = None,
prompt_adapter_request: Optional[PromptAdapterRequest] = None,
priority: int = 0,
) -> AsyncGenerator[RequestOutput, None]:
"""
Generate outputs for a request. This method is a coroutine. It adds the
request into the waiting queue of the LLMEngine and streams the outputs
from the LLMEngine to the caller.
:param prompt: The prompt to the LLM. See :class:`~vllm.inputs.PromptType
`for more details about the format of each input.
:param sampling_params: the sampling parameters of the request.
:param request_id: the unique id of the request
:param lora_request: LoRA request to use for generation, if any
:param trace_headers: OpenTelemetry trace headers
:param prompt_adapter_request: Prompt Adapter request to use for generation, if any
:param priority: the priority of the request. Only applicable with priority scheduling.
:return:
"""
self.logger.info("AsyncLLMEngine - generate request")
return self.engine.generate(prompt=prompt, sampling_params=sampling_params, request_id=request_id,
lora_request=lora_request, trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request, priority=priority)
....
Now I can use this deployment as:
results_generator = engine.options(stream=True).generate.remote(
prompt=prompt,
sampling_params=sampling_params,
request_id=request_id,
lora_request=lora_request,
trace_headers=trace_headers,
prompt_adapter_request=prompt_adapter_request,
priority=priority
)
final_output = None
async for request_output in results_generator:
final_output = request_output
prompt = final_output.prompt
text_outputs = [prompt + output.text for output in final_output.outputs]
return text_outputs
I can not do the same thing for the plain Ray actors as options(stream=True)
is deployment specific
The main thing that I need to do is to return generator from the actor and then invoke it to get execution results