tested:
ds = ray.data.from_items(
[
[{
"role": "system",
"content": [
{
"type": "text",
"text": "You are a helpful assistant."}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": "Describe this image."},
{
"type": "image_url",
"image_url": {"url": "./test/domestic/096416187f0b5fddf40b25f3c946fc76/096416187f0b5fddf40b25f3c946fc76_0.jpeg"}}
]
}]
] for _ in range(1000)
)
I checked python.ray.data.Dataset.map_batches and python.ray.llm._internal.batch.stages.prepare_image_stage.PrepareImageUDF, but could not find the cause of the issue.
The system just repeatedly logged the following error:
(raylet) Task MapWorker(Map(_preprocess)->MapBatches(PrepareImageUDF)).submit failed. There are infinite retries remaining, so the task will be retried. Error: The actor is temporarily unavailable: RpcError: RPC Error message: Socket closed; RPC Error details: rpc_code: 14 [repeated 8x across cluster]
Other settings:
import os
os.environ['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn'
os.environ['RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION'] = '0.5'
import ray
ray.init(num_cpus=16,
num_gpus=8,
runtime_env={"env_vars": {"RAY_DEBUG": "legacy"}})
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor, Processor
config = vLLMEngineProcessorConfig(
model_source="models/GUI-Owl-7B",
engine_kwargs={
"enable_chunked_prefill": True,
"max_num_batched_tokens": 4096,
"max_model_len": 16384,
"tensor_parallel_size": 2,
"gpu_memory_utilization": 0.9,
},
runtime_env=dict(
env_vars=dict(
# HF_TOKEN=HF_TOKEN, # Token not needed for public models
VLLM_USE_V1="1",
),
),
concurrency=4, # dp
batch_size=64,
apply_chat_template=False,
has_image=True,
)
sampling_params = SamplingParams(n=1,
temperature=0.1,
max_tokens=2048)
processor: Processor = build_llm_processor(
config,
preprocess=lambda messages: dict(
messages=messages,
sampling_params=dict(
temperature=0.1,
max_tokens=2048,
n=1
)
),
postprocess=lambda row: dict(
answer=row["generated_text"],
**row # This will return all the original columns in the dataset.
),
)