Certainly! Here’s a step-by-step example of running offline batch inference with Ray Data and vLLM, where the model is too large for a single GPU and must be sharded across multiple GPUs using tensor and/or pipeline parallelism:
1. Install dependencies:
pip install -U "ray[data, llm]>=2.49.1" vllm==0.7.2
2. Configure vLLM for model parallelism:
You must specify tensor_parallel_size and/or pipeline_parallel_size in the vLLM config. For example, to split across 4 GPUs (2x2):
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
config = vLLMEngineProcessorConfig(
model_source="unsloth/Llama-3.1-8B-Instruct",
engine_kwargs={
"max_model_len": 16384,
"tensor_parallel_size": 2, # 2 GPUs for tensor parallelism
"pipeline_parallel_size": 2, # 2 GPUs for pipeline parallelism (total 4 GPUs)
"enable_chunked_prefill": True,
"max_num_batched_tokens": 2048,
},
concurrency=1,
batch_size=32,
accelerator_type="L4", # or your GPU type
)
(Reference)
3. Build the processor and run inference:
import ray
processor = build_llm_processor(
config,
preprocess=lambda row: dict(
messages=[
{"role": "system", "content": "You are a bot that responds with haikus."},
{"role": "user", "content": row["item"]},
],
sampling_params=dict(
temperature=0.3,
max_tokens=250,
),
),
postprocess=lambda row: dict(
answer=row["generated_text"],
**row,
),
)
ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])
ds = processor(ds)
ds.show(limit=1)
(Reference)
4. What happens under the hood:
- Ray Data launches an actor for each batch, and vLLM inside the actor uses the specified parallelism to shard the model across multiple GPUs on the node.
- The actor is scheduled on a node with enough GPUs (here, 4 GPUs per actor).
- vLLM handles the intra-actor communication and model sharding; Ray manages the actor lifecycle and data flow.
5. Notes:
Would you like a more advanced example (e.g., multi-node or with DeepSpeed/FSDP)?
Sources:
Hint: Mention @RunLLM in the post for followups.