Offline inference vLLM: map_batches vs build_llm_processor

@RunLLM lets talk about scaling. vLLM supports tensor parallelism. I know ray supports data parallelism. For tensor parallelism in my current workflow I take direct approach by setting engine params like tensor_parallel_size and VLLM_WORKER_MULTIPROC_METHOD. What type of scaling is supported in this integration of Ray and vLLM?

Ray’s integration with vLLM supports both data parallelism (multiple independent model replicas) and model parallelism (tensor parallelism, pipeline parallelism, or both). You can configure tensor parallelism and pipeline parallelism by setting tensor_parallel_size and pipeline_parallel_size in engine_kwargs of your processor or LLM config. Ray will automatically orchestrate cross-node and multi-GPU scaling, including placement group strategies for optimal resource allocation (Ray Data LLM docs, Ray Serve LLM cross-node parallelism).

You do not need to manually set VLLM_WORKER_MULTIPROC_METHOD—Ray handles process orchestration and placement. Both data and model parallelism can be combined: for example, you can run multiple replicas (data parallelism), each using tensor and/or pipeline parallelism across multiple GPUs or nodes.

Would you like a code example for configuring both data and tensor parallelism?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM yes code example would be helpful. Also I am curios to learn about scaling issues encapsulated in this integration which normally in map_batches approach users would have to handle themselves. What are some out of box benefits for scaling in this integration?

Here’s a code example for combining data parallelism (multiple replicas) and tensor/pipeline parallelism (multi-GPU per replica) in Ray Data LLM with vLLM:

from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "tensor_parallel_size": 2,      # 2 GPUs per replica (tensor parallelism)
        "pipeline_parallel_size": 2,    # 2 pipeline stages (total 4 GPUs per replica)
        "max_model_len": 16384,
    },
    concurrency=3,                      # 3 replicas (data parallelism)
    batch_size=32,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: {...},
    postprocess=lambda row: {...},
)
# Apply processor to your dataset as usual

This will launch 3 independent model replicas, each using 4 GPUs (2 TP × 2 PP), for a total of 12 GPUs if available (Ray Data LLM docs, Ray example: basic_llm_example.py).

Out-of-the-box scaling benefits in Ray Data LLM vs. manual map_batches:

  • Automatic resource orchestration: Ray handles GPU/CPU allocation, placement groups, and parallelism, so you don’t need to manage process spawning or device assignment.
  • Fault tolerance: Failed batches are retried automatically; you don’t need to write custom error handling or checkpointing.
  • Elastic scaling: Easily scale up or down by changing concurrency or parallelism settings—Ray manages cluster resources and scheduling.
  • Efficient batching and scheduling: Ray Data LLM uses continuous batching and intelligent scheduling to maximize GPU utilization, reducing idle time.
  • Cross-node support: Ray can distribute model shards and replicas across multiple nodes, handling placement and communication.
  • Simplified API: No need to manually manage vLLM engine lifecycles, process pools, or inter-process communication.

These features reduce operational complexity and make large-scale, distributed LLM inference production-ready.

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM
**Automatic resource orchestration:**I thought this is already supported in manual approach
Fault tolerance: Does this support checkpointing of data? Suppose I have a batch of 5 records and it fails after during inference of 3 records. Is this information checkpointed? In the re-try attempt will it do inference on remaining 2 records?
Efficient batching and scheduling: Is this not supported in manual approach too?
Cross-node support: Tell me more about this. How is this not done in manual approach?

@RunLLM

Automatic resource orchestration: I thought this is already supported in manual approach
Fault tolerance: Does this support checkpointing of data? Suppose I have a batch of 5 records and it fails after during inference of 3 records. Is this information checkpointed? In the re-try attempt will it do inference on remaining 2 records?
Efficient batching and scheduling: Is this not supported in manual approach too?
Cross-node support: Tell me more about this. How is this not done in manual approach?

Automatic resource orchestration and efficient batching/scheduling can be implemented manually, but Ray Data LLM provides these out-of-the-box, including dynamic scaling, actor pool management, and continuous batching with vLLM, without requiring custom code for device assignment, process management, or scheduling (Ray Data LLM docs).

For fault tolerance, Ray Data LLM supports job-level checkpointing: if a batch fails, only unfinished records are retried. Completed records are checkpointed, so on restart, only the remaining records are processed—this is not available in standard manual map_batches, which typically requires restarting the entire job (Anyscale Docs: Ray Data job-level checkpointing, Ray GitHub issue #55008).

Cross-node support: Ray Data LLM and Ray Serve LLM natively support model and data parallelism across multiple nodes, automatically handling placement groups, resource bundling, and inter-node communication. Manual approaches require you to implement distributed process management, device mapping, and communication logic, which is complex and error-prone (Ray Serve LLM cross-node parallelism, Ray PR #56980).

Would you like a step-by-step breakdown of how checkpointing or cross-node placement works?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM could you provide some example for cross node support? How in the manual approach it can be done vs how it is supported in Ray Data LLM?

In the manual approach, cross-node support for vLLM typically requires you to manually launch distributed vLLM engine processes on each node, manage inter-process communication, set up distributed environment variables (like MASTER_ADDR, RANK, etc.), and coordinate placement and resource allocation—this is complex and error-prone (Ray GitHub issue #53192).

With Ray Data LLM, cross-node parallelism is natively supported: you simply set tensor_parallel_size, pipeline_parallel_size, and distributed_executor_backend="ray" in engine_kwargs, and Ray automatically handles placement, resource bundling, and communication across nodes. Example:

from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="unsloth/Llama-3.1-8B-Instruct",
    engine_kwargs={
        "tensor_parallel_size": 2,
        "pipeline_parallel_size": 2,
        "distributed_executor_backend": "ray",  # enables cross-node
        "max_model_len": 16384,
    },
    batch_size=32,
    concurrency=1,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: {...},
    postprocess=lambda row: {...},
)
# Apply processor to your dataset as usual

Ray will automatically distribute the 4 GPUs (2 TP × 2 PP) across nodes as needed (Ray Data LLM docs, Ray example: basic_llm_example.py).

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM great discussion. One last question. What version of ray has started ray.data.llm integration? we are using 2.43 at the moment

The ray.data.llm integration was introduced in Ray version 2.44.0. It is not available in 2.43; you will need to upgrade to at least 2.44 to use these APIs (Ray GitHub issue #2e2a40d4, Ray Discourse).

Would you like upgrade guidance or details on breaking changes?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM let’s continue the discussion on batch inference with a vision-language model using Ray. Can you explain the difference with following approaches:

processor = build_llm_processor(config, preprocess=vision_preprocess)

# First approach using take_all()
ds = processor(vision_dataset)
result = ds.take_all()

# Second approach using iter_batches()
ds = processor(vision_dataset)
results = [result for result in ds.iter_batches(batch_size=batch_size)]

I am particularly interested in what happens under the hood when a batch is processed. Let’s assume a scenario where preprocessing, inference and postprocessing occurs for batches as they come - similar to production setup when requests arrive in batches and you have visibility only on the current batch at hand. You mentioned earlier that the Processor does not modify your original dataset in-place. Is this correct? Or does the Processor object, upon execution/call, instantiate replica(s) of the vLLM engine using map_batches already under the hood, preprocessing/transforming the whole dataset first before starting to run inference (so iter_batches would be actually iterating over a preprocessed dataset)?

Can you also explain the difference between batch_size for vLLMEngineProcessorConfig and batch_size for iter_batches()?

Yes, your understanding is correct: the Processor does not modify your original dataset in-place. When you call processor(ds), it returns a new Ray Dataset that represents a pipeline of preprocessing, inference (with vLLM engine replicas via map_batches), and postprocessing. Under the hood, Ray Data LLM orchestrates a three-stage pipeline: preprocessing, inference, and postprocessing, all operating in parallel as data flows through the pipeline (Ray Data LLM docs, Anyscale Docs).

  • take_all(): Triggers execution of the entire pipeline and collects all results into memory at once.
  • iter_batches(): Streams results batch-by-batch, allowing you to process each output batch as soon as it is ready, which is more memory-efficient and production-like. Each batch yielded by iter_batches() is produced after preprocessing, inference, and postprocessing for that batch are complete—not after the entire dataset is preprocessed.

Batch size difference:

  • batch_size in vLLMEngineProcessorConfig controls how many rows are sent to each vLLM inference worker at a time (affecting GPU utilization and throughput).
  • batch_size in iter_batches() controls how many output rows you receive per iteration when consuming the result dataset; it does not affect the internal batching for inference.

Would you like a step-by-step breakdown of the pipeline execution flow?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM thank you for these explanations! So if I wanted to achieve something like this with Ray:

all_results = []
for i in range(0, len(dataset), batch_size):
    batch = dataset[i:i + batch_size]
    inputs = preprocess(batch)   
    outputs = llm.generate(inputs, ...)
    all_results.append(outputs)

i.e. running inference on batches, the iter_batches function does not really have any control how batches arrive to the model (or preprocessing, inference, and postprocessing), but only on how results are collected (or their availability)? If I want to run batch inference with Ray using different batch sizes, say 16, 32 and 64, is this value controlled via batch_size in vLLMEngineProcessorConfig?

Correct: in Ray Data LLM, iter_batches() only controls how results are yielded to you—it does not affect how batches are formed or processed internally for preprocessing, inference, or postprocessing. The actual batch size used for LLM inference is set via batch_size in vLLMEngineProcessorConfig. To experiment with different inference batch sizes (e.g., 16, 32, 64), you should set the desired value in vLLMEngineProcessorConfig(batch_size=...) (Ray Data LLM docs).

So, to control how many records are sent to the model per inference call, adjust batch_size in the processor config—not in iter_batches().

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM thanks again! Regarding how batches are formed or processed internally for preprocessing, inference, or postprocessing, is this pipeline deterministic? For example if I run batch inference following the documentation (Batch inference with vision-language-model (VLM)) multiple times, does the ordering of batches and samples within batches remain the same across repeats?

By default, Ray Data’s batch processing pipeline is not guaranteed to be deterministic—batch and sample ordering may vary between runs due to parallel execution and scheduling. However, you can enable deterministic ordering by setting preserve_order=True in the Ray Data execution context. This ensures that the order of batches and samples remains consistent across repeated runs (Ray Data reproducibility guide, Ray Discourse).

Would you like a code example for enabling deterministic execution?

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM thank you! Just a hypothetical scenario which may not make much sense performance-wise, but if I wanted to have true control over batches, would that entail creating a Dataset object for each batch and running inference on that? In pseudo-code:

processor = build_llm_processor(config, preprocess=vision_preprocess)

for i in range(0, len(dataset), batch_size):
    batch = dataset[i:i + batch_size]
    ds = ray.data.from_items(batch)
    ds_processor = processor(ds)
    result = ds_processor.take_all()

Yes, if you want absolute control over batch composition and ordering, you can manually create a Ray Dataset for each batch and run the processor on it as in your pseudo-code. This approach ensures each batch is processed exactly as you define, but it bypasses Ray Data’s internal batching, pipelining, and parallelism features, and may be less efficient for large-scale workloads (Ray Data docs).

This method is valid for scenarios where strict batch boundaries or custom batch logic are required, but for most use cases, leveraging Ray Data’s built-in batching and parallelism is recommended for performance and scalability.

Sources:

Hint: Mention @RunLLM in the post for followups.

@RunLLM thanks! What’s the difference between ds.iter_batches() and ds.iterator().iter_batches()?