vllm代码框架分析

vllm代码框架

框架整体调度流程

用户代码

│ 1. 同步/异步调用

┌──────────────────────────┐

│ LLM / AsyncLLMEngine │◀─ yield / return ───┐

│ ·tokenize&pre-process │ │

│ ·生成 EngineCoreRequest │ │

└┬─────────────────────────┘ │

│ 2. 序列化 + ZMQ PUSH │

│ (ipc:///tmp/xxx_input) │

▼ ▼ │

┌──────────────────────────┐ │

│ EngineCoreClient │ │

│ (用户进程内 RPC 代理) │ │

└┬─────────────────────────┘ │

│ 3. 广播 RPC "add_request" │

│ 给所有 rank │

▼ ▼ │

┌──────────────────────────┐ │

│ EngineCore 进程 (rank0) │ │

│ ·Router 收消息 │ │

│ ·Scheduler 连续批处理 │ │

└┬─────────────────────────┘ │

│ 4. 每步调度产生 SchedulerOutput │

▼ ▼ │

┌──────────────────────────┐ │

│ EngineCore rpc_server │ │

│ ·collective_rpc │ │

└┬─────────────────────────┘ │

│ 5. 再次 ZMQ DEALER 广播 │

│ "execute_model" │

│ 参数:SchedulerOutput │

▼ ▼ ┌──────────────────────────┐ │

┌────────────────┼─▶│ Worker-0 / GPU-0 进程 │ │

│ │ ·recv RPC │ │

│ │ ·ModelRunner.execute │ │

│ │ ·ViT + Transformer │ │

│ │ ·sample + logits │ │

│ │ ·send result back │ │

│ └──────────────────────────┘ │

│ 6. 各 Worker 并行执行 │

│ TP 组内 NCCL AllReduce │

├──────────────────────────┐ │

│ EngineCore (聚合输出) │◀── ZMQ 收结果────────┘

│ ·构造 RequestOutput │

└┬─────────────────────────┘

│ 7. 通过 output_socket PUSH

▼ ▼

┌──────────────────────────┐

│ EngineCoreClient │

│ ·recv_multipart │

│ ·反序列化 │

└┬─────────────────────────┘

│ 8. 塞进 asyncio Queue / threading Queue

▼ ▼

用户代码 ◀─ yield RequestOutput ◀─ Queue.get()

初始化

参数初始化

ModelConfig:创建时从EngineArgs的self变量中获取

EngineArgs:llm_engine创建时直接从参数列表中获取传参

self.llm_engine = LLMEngine.from_engine_args(

engine_args=engine_args, usage_context=UsageContext.LLM_CLASS

)

->vllm_config = engine_args.create_engine_config(usage_context)

关键class

模型推理任务的执行实体为engine_core,LLM_Engine通过socket向多进程的engine_core下发推理请求request

engine_core接收到任务请求后,向worker分发。

self.workers = WorkerProc.wait_for_ready(unready_workers)

engine_core

vllm.v1.engine.core_client.SyncMPClient

self.resources.engine_manager=CoreEngineProcManager: 这里使用mp.Process多进程创建engine_core

->EngineCoreProc.run_engine_core: 选择EngineCoreProc类型,并开启执行run_busy_loop

->DPEngineCoreProc.run_busy_loop:循环等待llm_engine发来任务请求

Executor

MultiprocExecutor

使用zmq建立连接,并在未来使用broadcast语义将请求发送给所有worker

->self.workers=WorkerProc->WorkerWrapperBase->gpu_worker.Worker

->model_runner=vllm.v1.worker.gpu_model_runner.GPUModelRunner

Worker

->WorkerProc

self.worker=WorkerWrapperBase().init_worker()

->WorkerWrapperBase:管理着workers实体gpu_worker.Worker

->gpu_worker.Worker

init_device: 初始化并创建self.model_runner, 与model_runner一一对应

ModelRunner

GPUModelRunner,与Worker一一对应

数据结构

多模态数据

MultiModalDataItems

每个请求的所有多模态数据都由这个类管理,访问方式为mm_data_items[modality][idx], idx为同一模态多个数据准备(如qwen3vl可以喂入多张图片)

MultiModalFeatureSpec

请求req中的多模态数据

请求生成与发送

llm.generate:接收请求,并将请求挂在到request队列上,从输出队列上取结果,进行后处理并返回

->llm._add_request->llm_engine.add_request

1 input_processor.process_inputs: 将字符转换成token,将图片切割patch并转换为visual token。

2 output_processor.add_request

3 engine_core.add_request->SyncMPClient.add_request->SyncMPClient._send_input(EngineCoreRequestType.ADD, request)

->llm.llm_engine.step: 进行一轮推理,返回该轮推理的输出 ->llm_engine.engine_core: engine_core.get_output,从输出队列上取结果

1 请求预处理

完成:1)将字符转换成token,将图片切割patch并转换为visual token。2)将visual token缓存至cache,如果cache有该图片的缓存,则从缓存中读取visual token而跳过图片预处理步骤。

LLM.input_processor.process_inputs->InputProcessor.process_inputs->InputPreprocessor.preprocess

1.1 InputPreProcessor._prompt_to_llm_inputs

->......->mm_processor.apply

BaseMultiModalProcessor.apply

输入:

Prompt: str, 原始问句;

mm_data: dict, 原始mm data, 如图片存在mm_data['image']

输出:

MultiModalInputs, 成员变量如下

prompt_token_ids

mm_kwargs

mm_hashes

mm_placeholders

  1. self._cached_apply_hf_processor

字符和图片转token,并标注占位符等其他数据。

a.将提示文本与多模态数据一并送入 HF Processor,一次性得到 token ID 序列和预处理后的图片张量。

->Qwen3VLMultiModalProcessor.apply->Qwen3VLMultiModalProcessor._cached_apply_hf_processor

->Qwen3VLMultiModalProcessor._apply_hf_processor

Qwen3VLMultiModalProcessor._apply_hf_processor_main

_apply_hf_processor_text_only:将prompt(包含特殊符号<|im_start|>等)转换成prompt_id。

_apply_hf_processor_mm_only->_apply_hf_processor_text_mm->_call_hf_processor[=Qwen3VLProcessor]: 将图片切分成固定尺寸(模型要求)的patch,并返回pixel_values: size=[patch_h_num*patch_w_num, hid_dim]、image_grid_thw: [[ 1, patch_h_num, patch_w_num]]。

b. 在 token ID 序列中定位并替换“占位 token”,占位 token 的数量 = 多模态编码器输出特征的条数。例如占位符长度为972,那么原prompt中会插入972个占位token。

c. 从处理后的 token ID 序列里抽取出这些占位 token 的信息。信息记录在mm_position变量中。

  1. self._maybe_apply_prompt_updates

将图片数据插入prompt_ids,并记录占位符信息。

1.2 其他

InputProcessor和InputPreprocessor剩余部分未对数据本身进行转换,只是定义了新的class去包裹,最终在InputProcessor.process_inputs中放在EngineCoreRequest中传递给receive侧

2 请求发送

engine_core.add_request->SyncMPClient.add_request->SyncMPClient._send_input(EngineCoreRequestType.ADD, request)

SyncMPClient.resources管理了engine_manager(CoreEngineProcManager),以及llm_engine与core_engine通信的套接字socket。

SyncMPClient.resources.engine_manager.processes后台启动了多个engine_core,并记录了其句柄。

3 请求发送数据结构

->MultiModalProcessingInfo + prompt_ids,

kwargs

hashes

prompt_updates

->MultiModalInputs:

prompt_token_ids: 已经插入了多模态占位符的tokens, list长度为[972+prompt_len]。

mm_kwargs: Dict结构的mm data,image的data为一个list,list中每一个元素为一张图片的信息,包含{pixel_value:MultiModalFieldElem存储image token的tensor, tensor形状为[3888, 1536]; image_grid_thw: MultiModalFieldElem指示image token第一维度形状的长宽,tensor=[1, 72, 54]}

mm_hashes:

mm_placeholders: {'image': [PlaceholderRange(offset=15, length=972, is_embed=None)]}

MultiModalInputs两部分会在EngineCoreRequest(类型MultiModalFeatureSpec)中分别用data和identifier传入EngineCore计算。

->EngineCoreRequest

request_id

prompt_token_ids

mm_features: list[MultiModalFeatureSpec], 其中多模态数据仍然以MultiModalFeatureSpec数据结构传输(该结构为mm data+mm hash)构成。

sampling_params

请求接收与执行

1 请求接收

llm_engine利用socket将request传输给EngineCore

SyncMPClient._send_input ==> EngineCoreProc.process_input_sockets

EngineCoreProc.process_input_sockets:这里处理解析socket传来的数据,还会对ADD类型的数据,即模型推理数据进行缓存的读取。个人觉得把缓存数据读取放在这里不太好。

1)接收数据并分析请求类型

type_frame, *data_frames = input_socket.recv_multipart(copy=False)

request_type = EngineCoreRequestType(bytes(type_frame.buffer))

2)如果请求类型为ADD,对请求做preprocess处理

request = add_request_decoder.decode(data_frames)

request = self.preprocess_add_request(request)

  1. 将请求放入input_queue, 等待EngineCore.run_busy_loop从队列中读取数据

self.input_queue.put_nowait((request_type, request))

2 请求调度

数据结构:scheduler:Request.mm_features=list[MultiModalFeatureSpec]->MultiModalFeatureSpec.data/identifier

调度链:

DPEngineCoreProc._handle_client_request

->DPEngineCoreProc.run_busy_loop

EngineCoreProc会等待请求队列的请求,并根据请求类型处理(_handle_client_request),其中模型推理请求类型为‘ADD’。

EngineCoreProc.run_busy_loop轮询请求,并每轮执行:

1)_process_input_queue->_handle_client_request->add_request

EngineCoreProc.add_request->Scheduler.add_request

2)_process_engine_step->step_fn

self.step_fn()->step

self.output_queue.put_nowait(output)

->DPEngineCoreProc.step-->EngineCore.step

self.scheduler.schedule

self.model_executor.execute_model

Executor使用rpc调度的方式调用workers进行gpu的推理计算,workers由WorkerWrapperBase管理

self.model_executor.execute_model->Executor.execute(MultiprocExecutor)-> MultiprocExecutor.collective_rpc('execute_model')->WorkerWrapperBase.execute_model

3 Worker执行

-> ModelRunner -> GPUModelRunner.execute_model

GPUModelRunner的初始化中,提前申请了很多GPU的tensor,为GPU graph(compile作准备)。

GPUModelRunner.execute_model

  1. self._prepare_inputs

  2. self._preprocess

输入:

scheduler_output:

prompt_token_ids:[1001]

mm_features:

pixel_values:torch.Size([3888, 1536])

image_grid_thw:=tensor([[1, 72, 54]])

输出:

inputs_embeds: [1002, 2048]

positions: [3, 1002]

4 模型推理

GPUModelRunner: model_output = self._model_forward

1)llm模型推理

输入:input_ids,positions;

输出:model_output, tensor[batch, hidden_dim]

2)vlm模型推理

输入: inputs_embeds[token_nums, hidden_dim(=2048)], positions[3, token_nums]

输出:model_output, tensor[batch, hidden_dim(=2048)]

5 接收执行数据结构

->EngineCoreRequest

(从cache中取出缓存数据构成完整的request)

->SchedulerOutput

scheduled_new_reqs: NewRequestData{prompt_token_ids, mm_features, }

mm 缓存复用

InputProcessor

设计理念:针对每个模型和配置参数,生成推理请求各模态的每个数据item的hash和cache;同一模态的同一uuids代表着完全相同的模态数据,如图片,给同一uuids再次传入新的图片data将被忽略。vllm会在req process阶段计算hash和需要增加(missing)的模态数据,并传给worker,worker侧会管理hash与对应的模态数据。

一、创建

InputProcessor创建cache:

self.mm_processor_cache = processor_cache_from_config(vllm_config, mm_registry)

->MultiModalProcessorSenderCache(model_config): self._cache = MultiModalCache.get_lru_cache

->MultiModalCache.get_lru_cache->LRUCache

二、处理流程

->InputProcessor.process_inputs->InputPreProcessor.preprocess

mm_cache由InputProcessor管理, 数据结构InputProcessor.mm_processor_cache,数据结构MultiModalProcessorSenderCache

->InputPreProcessor._preprocess->InputPreProcessor._process_decoder_only_prompt->InputPreProcessor._prompt_to_llm_inputs->InputPreProcessor._process_text->InputPreProcessor._process_multimodal

->Qwen3VLMultiModalProcessor.apply->BaseMultiModalProcessor.apply

BaseMultiModalProcessor.cache = MultiModalProcessorSenderCache

1. BaseMultiModalProcessor._cached_apply_hf_processor:处理mm_items缓存,如果未进行缓存,则新增hash和缓存,如果有缓存,则检查是否需要更新缓存;最后返回req推理需要的prompt_ids, mm_info

1)_hash_mm_items: 计算本次请求全部mm items的hash值

使用model_id, modality, item, hf_processor_mm_kwargs, tokenization_kwargs为每个模态的每个item生成mm_hashes值,并用字典保存。其中后四项同一模态(如qwen3vl是it->t)维持不变,注意此处计算hash值只与item的类型挂钩,与item.data的实际值无关。hf_processor_mm_kwargs, tokenization_kwargs打印出来为空,暂时未用。

2)_get_cache_missing_items: 根据hash值验证哪些items已经被缓存,并返回未被缓存的items

返回值:

mm_is_cached:标注各模态是否存在cache数据;

mm_missing_data_items:还没有被缓存的cache数据;

3)_apply_hf_processor_main:对未进行缓存的data items进行preprocess,转换成模型推理需要的形式

输入:

prompt:str,输入的句子。

mm_missing_data_items: MultiModalDataItems,所有未缓存的多模态数据

输出:

prompt_id: prompt转换处理的token,还未进行mm插值。

mm_missing_processed_data:Dict[str, List[Dict[str,tensor]]], 转换后(以及转换成多模态模型推理需要格式)的mm数据。

is_update_applied:一直是False,暂时没用。

4)->mm_missing_kwargs:每个模态的数据是一个list,list的每个元素为推理需要的tensor。

  1. _merge_mm_kwargs

2. BaseMultiModalProcessor._maybe_apply_prompt_updates

prompt_ids: List[int[, 填充了占位符的tokens。

mm_placeholders: 指示占位符开始的位置,以及占位符本身。

EngineCoreProc

cache由EngineCoreProc管理: EngineCore.mm_receiver_cache

EngineCoreProc.process_input_sockets->EngineCore.preprocess_add_request

->EngineCore.mm_receiver_cache.get_and_update_features->BaseMultiModalReceiverCache.get_and_update_features

->BaseMultiModalReceiverCache.get_and_update_item

feature.data = self.get_and_update_item(feature.data, feature.identifier)

self._cache[mm_hash] = mm_item

Note: WorkerWrapperBase处也写了mm_cache的代码和逻辑,但是v0.13.0更新和读取cache都在EngineCoreProc。

online

启动

vllm/entrypoints/cli/main.py->vllm/entrypoints/cli/serve.py:

->ServeSubcommand.cmd->uvloop.run(run_server(args))->vllm/entrypoints/openai/api_server.py:run_server

->run_server_worker

->build_async_engine_client->build_async_engine_client_from_engine_args

->async_llm = AsyncLLM.from_vllm_config->AsyncLLM.__init__

1.InputProcessor/OutputProcessor与LLM相同

2.engine_core:后台拉起的EngineCoreProc和LLM复用MPClient代码

EngineCoreClient.make_async_mp_client->AsyncMPClient

请求传递与执行

https请求在哪里转换成async_engine需要的req的,req的内容是什么样子的?

1. 从路由接收请求

->api_server.py:: create_chat_completion, 接受的ChatCompletionRequest,即承载着原始http请求字段;

2. 将OPENAI格式的request转换成LLMEngine格式的request。

->OpenAIServingChat.create_chat_completion

  1. self._preprocess_chatrequest

输出:conversation, engine_prompts

conversation:[{'role': 'user', 'content': [{'type': 'text', 'text': '图片里面有什么?'}, {'type': 'image'}]}]

engine_prompts:[{'prompt_token_ids': [151644, 872, 198, 45930, 100420, 104139, 30, 151652, 151655, 151653, 151645, 198, 151644, 77091, 198], 'prompt': '<|im_start|>user\n图片里面有什么?<|vision_start|><|image_pad|><|vision_end|><|im_end|>\n<|im_start|>assistant\n', 'multi_modal_data': {'image': [MediaWithBytes(media=<PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=1920x2530 at 0x7F0828425430>, original_bytes=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\...)]}, 'multi_modal_uuids': {'image': [None]}}]

2)self._process_inputs

输出:engine_request:EngineCoreRequest

3)self.engine_client.generate

这里调用到AsyncLLM.generate

输出:AsyncGenerator[RequestOutput, None]

调度图总结

HTTP POST /v1/chat/completions

create_chat_completion()

├─> chat_completion_parser() ──> prompt: str

├─> request_id = f"cmpl-xxx"

├─> sampling_params = request.to_sampling_params()

engine.generate(prompt, sampling_params, request_id)

AsyncLLMEngine.generate()

├─> prompt_token_ids = tokenizer.encode(prompt)

├─> seq = Sequence(...)

├─> seq_group = SequenceGroup(seqs=[seq], ...)

scheduler.add_seq_group(seq_group) # ← 进入调度队列

vllm代码框架

posted on 2026-01-21 15:49  uestc001  阅读(23)  评论(0)    收藏  举报