PrepareImageUDF Error "The actor is temporarily unavailable" for ray.data.llm multimodal batch inference

tested:

ds = ray.data.from_items(
    [
        [{
            "role": "system",
            "content": [
                {
                    "type": "text",
                    "text": "You are a helpful assistant."}
            ]
         },
         {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Describe this image."},
                {
                    "type": "image_url",
                    "image_url": {"url": "./test/domestic/096416187f0b5fddf40b25f3c946fc76/096416187f0b5fddf40b25f3c946fc76_0.jpeg"}}
            ]
        }]
    ] for _ in range(1000)
)

I checked python.ray.data.Dataset.map_batches and python.ray.llm._internal.batch.stages.prepare_image_stage.PrepareImageUDF, but could not find the cause of the issue.

The system just repeatedly logged the following error:

(raylet) Task MapWorker(Map(_preprocess)->MapBatches(PrepareImageUDF)).submit failed. There are infinite retries remaining, so the task will be retried. Error: The actor is temporarily unavailable: RpcError: RPC Error message: Socket closed; RPC Error details:  rpc_code: 14 [repeated 8x across cluster]

Other settings:

import os

os.environ['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn'
os.environ['RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION'] = '0.5'

import ray
ray.init(num_cpus=16,
         num_gpus=8,
         runtime_env={"env_vars": {"RAY_DEBUG": "legacy"}})

from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor, Processor

config = vLLMEngineProcessorConfig(
    model_source="models/GUI-Owl-7B",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
        "tensor_parallel_size": 2,
        "gpu_memory_utilization": 0.9,
    },
    runtime_env=dict(
        env_vars=dict(
            # HF_TOKEN=HF_TOKEN, # Token not needed for public models
            VLLM_USE_V1="1",
        ),
    ),
    concurrency=4,  # dp
    batch_size=64,
    apply_chat_template=False,
    has_image=True,
)

sampling_params = SamplingParams(n=1,
                                 temperature=0.1,
                                 max_tokens=2048)

processor: Processor = build_llm_processor(
    config,
    preprocess=lambda messages: dict(
        messages=messages,
        sampling_params=dict(
            temperature=0.1,
            max_tokens=2048,
            n=1
        )
    ),
    postprocess=lambda row: dict(
        answer=row["generated_text"],
        **row  # This will return all the original columns in the dataset.
    ),
)

The repeated error (raylet) Task MapWorker(Map(_preprocess)->MapBatches(PrepareImageUDF)).submit failed. There are infinite retries remaining, so the task will be retried. Error: The actor is temporarily unavailable: RpcError: RPC Error message: Socket closed; ... is likely due to a serialization or resource issue in the image preparation stage, especially when using has_image=True and passing a large dataset with local image URLs. The Ray Data vision pipeline expects image content as either a PIL Image object or a valid image URL, but "type": "image_url" with a local file path (e.g., "./test/domestic/...") may not be supported or accessible by Ray workers, causing failures in PrepareImageUDF and actor restarts. This is consistent with the expected input format described in Ray’s LLM vision batch inference docs and example code.

Additionally, the error can be exacerbated by resource exhaustion or misconfiguration, but the primary cause here is likely the unsupported or inaccessible image path format. The correct format should use "type": "image" and either a public URL or a PIL Image object, not "image_url". See the official example for reference.

Would you like a step-by-step breakdown or a code example for the correct input format?

Sources:

Hint: Mention @RunLLM in the post for followups.

It works now. Thank God.

At least I’m at the starting line now

I suppose loading images during preprocessing could help avoid serialization errors?

Also should have used row['item'] to access the passed inputs

import os

os.environ['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn'
os.environ['RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION'] = '0.5'

import ray
ray.init(num_gpus=8,
         runtime_env={"env_vars": {"RAY_DEBUG": "legacy"}})

from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor, Processor
from vllm import SamplingParams
from PIL import Image
from io import BytesIO

vision_processor_config = vLLMEngineProcessorConfig(
    model_source="models/Qwen2.5-VL/Qwen2.5-VL-3B-Instruct",
    engine_kwargs=dict(
        tensor_parallel_size=4,
        # pipeline_parallel_size=1,
        max_model_len=8192,
        # enable_chunked_prefill=True,
        max_num_batched_tokens=2048,
    ),
    # Override Ray's runtime env to include the Hugging Face token. Ray Data uses Ray under the hood to orchestrate the inference pipeline.
    runtime_env=dict(
        env_vars=dict(
            # HF_TOKEN=HF_TOKEN, # Token not needed for public models
            VLLM_USE_V1="1",
        ),
    ),
    batch_size=16,
    # accelerator_type="L4",
    concurrency=2,
    has_image=True,
)

def vision_preprocess(row: dict) -> dict:
    """
    Preprocessing function for vision-language model inputs.

    Converts dataset rows into the format expected by the VLM:
    - System prompt for analysis instructions
    - User message with text and image content
    - Multiple choice formatting
    - Sampling parameters
    """
    choice_indices = ["A", "B", "C", "D", "E", "F", "G", "H"]

    return {
        "messages": [
            {
                "role": "system",
                "content": (
                    "Analyze the image and question carefully, using step-by-step reasoning. "
                    "First, describe any image provided in detail. Then, present your reasoning. "
                    "And finally your final answer in this format: Final Answer: <answer> "
                    "where <answer> is: The single correct letter choice A, B, C, D, E, F, etc. when options are provided. "
                    "Only include the letter. Your direct answer if no options are given, as a single phrase or number. "
                    "IMPORTANT: Remember, to end your answer with Final Answer: <answer>."
                ),
            },
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": "Describe this image."},
                    {
                        "type": "image",
                        "image": Image.open('./test/domestic/006b3e83f8da89719da76290ed590a3c/006b3e83f8da89719da76290ed590a3c_0.jpeg'),
                    }
                ],
            },
        ],
        "sampling_params": {
            "temperature": 0.3,
            "max_tokens": 150,
            "detokenize": False,
        },
        # # Include original data for reference
        # "original_data": {
        #     "question": row["question"],
        #     "answer_choices": row["answer"],
        #     "image_size": row["image"].get("width", 0) if row["image"] else 0,
        # },
    }

def vision_postprocess(row: dict) -> dict:
    return {
        "resp": row["generated_text"],
    }

vision_processor = build_llm_processor(
    vision_processor_config,
    preprocess=vision_preprocess,
    postprocess=vision_postprocess,
)

ds = ray.data.from_items(
    [dict(a=1) for _ in range(1000)]
)

results = vision_processor(ds).take_all()

Wait. There is still one reported problem:

(MapWorker(MapBatches(TokenizeUDF)) pid=2760523) Failed to convert column '__data' into pyarrow array due to: Error converting data to Arrow: [{'a': 1, 'messages': [{'role': 'system', 'content': 'Analyze the image and question carefully, using step-by-step reasoning. First, describe any image provided in detail. Then, present your reasoning...; falling back to serialize as pickled python objects                                                                                                                                  
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523) Traceback (most recent call last): [repeated 2x across cluster]                    
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)   File ".../lib/python3.10/site-packages/ray/air/util/tensor_extensions/arrow.py", line 255, in _convert_to_pyarrow_native_array [repeated 2x across cluster]              
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)     pa_type = _infer_pyarrow_type(column_values)                                   
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)   File ".../lib/python3.10/site-packages/ray/air/util/tensor_extensions/arrow.py", line 341, in _infer_pyarrow_type                                                        
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)     inferred_pa_dtype = pa.infer_type(column_values)                               
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)   File "pyarrow/array.pxi", line 571, in pyarrow.lib.infer_type                    
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)   File "pyarrow/error.pxi", line 155, in pyarrow.lib.pyarrow_internal_check_status 
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)   File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status                   
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523) pyarrow.lib.ArrowInvalid: Could not convert <PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=1220x2712 at 0x7EF6CC63D630> with type JpegImageFile: did not recognize Python value type when inferring an Arrow data type                                                                                                                             
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523) The above exception was the direct cause of the following exception:               
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)   File ".../lib/python3.10/site-packages/ray/air/util/tensor_extensions/arrow.py", line 145, in convert_to_pyarrow_array                                                   
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)     return _convert_to_pyarrow_native_array(column_values, column_name)            
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523)     raise ArrowConversionError(str(column_values)) from e                          
(MapWorker(MapBatches(TokenizeUDF)) pid=2760523) ray.air.util.tensor_extensions.arrow.ArrowConversionError: Error converting data to Arrow: [{'a': 1, 'messages': [{'role': 'system', 'content': 'Analyze the image and question carefully, using step-by-step reasoning. First, describe any image provided in detail. Then, present your reasoning...

Hi @Erickek, thanks for posting! Is the serialize via pickle fallback working?

Otherwise, have you tried the encode_example workaround for the pyarrow error here? GitHub · Where software is built