Streaming support for Ray actors

How severe does this issue affect your experience of using Ray?

  • Medium: It contributes to significant difficulty to complete my task, but I can work around it.

Ray provides a great class RayGenerator (Ray Generators — Ray 2.42.1), which allows to run code as follows:

import asyncio

@ray.remote
def task():
    for i in range(5):
        time.sleep(1)
        yield i


async def main():
    async for ref in task.remote():
        print(await ref)

asyncio.run(main())

Ray Also provides dynamic generators (Dynamic generators — Ray 2.42.1), but, they are not supported by actors and do not provide async support

Ray serve provides streaming responses (Serve a Chatbot with Request and Response Streaming — Ray 2.42.1), which is exactly what I am looking for.

Is there a chance to add streaming support for Ray actors or can you suggest the way to implement such functionality?

Hmm, it should be supported in actors right?

https://docs.ray.io/en/latest/ray-core/ray-generator.html#generator-from-actor-tasks

generators are, but dynamic generators no. As a result streaming, similar to Ray serve, is not supported

Can you help define what is “dynamic generators”?

Hi @blublinsky,

streaming generator is supported by actors: Ray Generators — Ray 2.42.1

The reference to documentation is above Dynamic generators — Ray 2.42.1. In simple terms, it allows to return back an AsyncGenerator, that can be used outside of the actor that recieved it.

Dynamic generator != ray generator

@blublinsky Can you share exactly a code snippet, specifically in the context of an actor, for what you want to achieve? Pseudocode/code that doesn’t work is fine.

I think the confusion on our side is that I don’t think we’ve intentionally named it differently to distinguish between generator vs dynamic generator.

Sure, here is my working code for Ray Serve. The deployment looks like:

@serve.deployment(name="AsyncLLMEngine", ray_actor_options={"num_gpus": 1, "num_cpus": 4})
class AsyncLLMEngineDeployment:
    """
    Ray serve deployment based on VLLM AsyncLLMEngine. We create this
    SO that we can easily scale this and wire it with other serving components
    """
    def __init__(
            self,
            engine_args: AsyncEngineArgs,
    ):
        # create logger
        self.logger = logging.getLogger("async_llm_support")
        # create engine
        self.engine = AsyncLLMEngine.from_engine_args(engine_args)
        self.logger.debug("Created AsyncLLMEngine")
....
    async def generate(
            self,
            prompt: PromptType,
            sampling_params: SamplingParams,
            request_id: str,
            lora_request: Optional[LoRARequest] = None,
            trace_headers: Optional[Mapping[str, str]] = None,
            prompt_adapter_request: Optional[PromptAdapterRequest] = None,
            priority: int = 0,
    ) -> AsyncGenerator[RequestOutput, None]:
        """
        Generate outputs for a request. This method is a coroutine. It adds the
        request into the waiting queue of the LLMEngine and streams the outputs
        from the LLMEngine to the caller.
        :param prompt: The prompt to the LLM. See :class:`~vllm.inputs.PromptType
            `for more details about the format of each input.
        :param sampling_params: the sampling parameters of the request.
        :param request_id: the unique id of the request
        :param lora_request: LoRA request to use for generation, if any
        :param trace_headers: OpenTelemetry trace headers
        :param prompt_adapter_request: Prompt Adapter request to use for generation, if any
        :param priority: the priority of the request. Only applicable with priority scheduling.
        :return:
        """
        self.logger.info("AsyncLLMEngine - generate request")
        return self.engine.generate(prompt=prompt, sampling_params=sampling_params, request_id=request_id,
                                    lora_request=lora_request, trace_headers=trace_headers,
                                    prompt_adapter_request=prompt_adapter_request, priority=priority)
....

Now I can use this deployment as:

results_generator = engine.options(stream=True).generate.remote(
            prompt=prompt,
            sampling_params=sampling_params,
            request_id=request_id,
            lora_request=lora_request,
            trace_headers=trace_headers,
            prompt_adapter_request=prompt_adapter_request,
            priority=priority
        )
    final_output = None
    async for request_output in results_generator:
        final_output = request_output

    prompt = final_output.prompt
    text_outputs = [prompt + output.text for output in final_output.outputs]
    return text_outputs

I can not do the same thing for the plain Ray actors as options(stream=True) is deployment specific

The main thing that I need to do is to return generator from the actor and then invoke it to get execution results

They are very different things. Generator is a Ray actor method, implementing Python generator interface, while dynamic generator returns back a generator object, through which you can get data (bypassing an actor)

@rliaw, @jjyao Is my explanation good enough? Any implementation suggestions?

Is the difference here just that in the latter, you don’t need to call ray.get()?