vllm代码框架分析
vllm代码框架
框架整体调度流程
用户代码
│
│ 1. 同步/异步调用
▼
┌──────────────────────────┐
│ LLM / AsyncLLMEngine │◀─ yield / return ───┐
│ ·tokenize&pre-process │ │
│ ·生成 EngineCoreRequest │ │
└┬─────────────────────────┘ │
│ 2. 序列化 + ZMQ PUSH │
│ (ipc:///tmp/xxx_input) │
▼ ▼ │
┌──────────────────────────┐ │
│ EngineCoreClient │ │
│ (用户进程内 RPC 代理) │ │
└┬─────────────────────────┘ │
│ 3. 广播 RPC "add_request" │
│ 给所有 rank │
▼ ▼ │
┌──────────────────────────┐ │
│ EngineCore 进程 (rank0) │ │
│ ·Router 收消息 │ │
│ ·Scheduler 连续批处理 │ │
└┬─────────────────────────┘ │
│ 4. 每步调度产生 SchedulerOutput │
▼ ▼ │
┌──────────────────────────┐ │
│ EngineCore rpc_server │ │
│ ·collective_rpc │ │
└┬─────────────────────────┘ │
│ 5. 再次 ZMQ DEALER 广播 │
│ "execute_model" │
│ 参数:SchedulerOutput │
▼ ▼ ┌──────────────────────────┐ │
┌────────────────┼─▶│ Worker-0 / GPU-0 进程 │ │
│ │ ·recv RPC │ │
│ │ ·ModelRunner.execute │ │
│ │ ·ViT + Transformer │ │
│ │ ·sample + logits │ │
│ │ ·send result back │ │
│ └──────────────────────────┘ │
│ 6. 各 Worker 并行执行 │
│ TP 组内 NCCL AllReduce │
├──────────────────────────┐ │
│ EngineCore (聚合输出) │◀── ZMQ 收结果────────┘
│ ·构造 RequestOutput │
└┬─────────────────────────┘
│ 7. 通过 output_socket PUSH
▼ ▼
┌──────────────────────────┐
│ EngineCoreClient │
│ ·recv_multipart │
│ ·反序列化 │
└┬─────────────────────────┘
│ 8. 塞进 asyncio Queue / threading Queue
▼ ▼
用户代码 ◀─ yield RequestOutput ◀─ Queue.get()
初始化
参数初始化
ModelConfig:创建时从EngineArgs的self变量中获取
EngineArgs:llm_engine创建时直接从参数列表中获取传参
self.llm_engine = LLMEngine.from_engine_args(
engine_args=engine_args, usage_context=UsageContext.LLM_CLASS
)
->vllm_config = engine_args.create_engine_config(usage_context)
关键class
模型推理任务的执行实体为engine_core,LLM_Engine通过socket向多进程的engine_core下发推理请求request
engine_core接收到任务请求后,向worker分发。
self.workers = WorkerProc.wait_for_ready(unready_workers)
engine_core
vllm.v1.engine.core_client.SyncMPClient
self.resources.engine_manager=CoreEngineProcManager: 这里使用mp.Process多进程创建engine_core
->EngineCoreProc.run_engine_core: 选择EngineCoreProc类型,并开启执行run_busy_loop
->DPEngineCoreProc.run_busy_loop:循环等待llm_engine发来任务请求
Executor
MultiprocExecutor
使用zmq建立连接,并在未来使用broadcast语义将请求发送给所有worker
->self.workers=WorkerProc->WorkerWrapperBase->gpu_worker.Worker
->model_runner=vllm.v1.worker.gpu_model_runner.GPUModelRunner
Worker
->WorkerProc
self.worker=WorkerWrapperBase().init_worker()
->WorkerWrapperBase:管理着workers实体gpu_worker.Worker
->gpu_worker.Worker
init_device: 初始化并创建self.model_runner, 与model_runner一一对应
ModelRunner
GPUModelRunner,与Worker一一对应
数据结构
多模态数据
MultiModalDataItems
每个请求的所有多模态数据都由这个类管理,访问方式为mm_data_items[modality][idx], idx为同一模态多个数据准备(如qwen3vl可以喂入多张图片)
MultiModalFeatureSpec
请求req中的多模态数据
请求生成与发送
llm.generate:接收请求,并将请求挂在到request队列上,从输出队列上取结果,进行后处理并返回
->llm._add_request->llm_engine.add_request
1 input_processor.process_inputs: 将字符转换成token,将图片切割patch并转换为visual token。
2 output_processor.add_request
3 engine_core.add_request->SyncMPClient.add_request->SyncMPClient._send_input(EngineCoreRequestType.ADD, request)
->llm.llm_engine.step: 进行一轮推理,返回该轮推理的输出 ->llm_engine.engine_core: engine_core.get_output,从输出队列上取结果
1 请求预处理
完成:1)将字符转换成token,将图片切割patch并转换为visual token。2)将visual token缓存至cache,如果cache有该图片的缓存,则从缓存中读取visual token而跳过图片预处理步骤。
LLM.input_processor.process_inputs->InputProcessor.process_inputs->InputPreprocessor.preprocess
1.1 InputPreProcessor._prompt_to_llm_inputs
->......->mm_processor.apply
BaseMultiModalProcessor.apply
输入:
Prompt: str, 原始问句;
mm_data: dict, 原始mm data, 如图片存在mm_data['image']
输出:
MultiModalInputs, 成员变量如下
prompt_token_ids
mm_kwargs
mm_hashes
mm_placeholders
- self._cached_apply_hf_processor
字符和图片转token,并标注占位符等其他数据。
a.将提示文本与多模态数据一并送入 HF Processor,一次性得到 token ID 序列和预处理后的图片张量。
->Qwen3VLMultiModalProcessor.apply->Qwen3VLMultiModalProcessor._cached_apply_hf_processor
->Qwen3VLMultiModalProcessor._apply_hf_processor
Qwen3VLMultiModalProcessor._apply_hf_processor_main
_apply_hf_processor_text_only:将prompt(包含特殊符号<|im_start|>等)转换成prompt_id。
_apply_hf_processor_mm_only->_apply_hf_processor_text_mm->_call_hf_processor[=Qwen3VLProcessor]: 将图片切分成固定尺寸(模型要求)的patch,并返回pixel_values: size=[patch_h_num*patch_w_num, hid_dim]、image_grid_thw: [[ 1, patch_h_num, patch_w_num]]。
b. 在 token ID 序列中定位并替换“占位 token”,占位 token 的数量 = 多模态编码器输出特征的条数。例如占位符长度为972,那么原prompt中会插入972个占位token。
c. 从处理后的 token ID 序列里抽取出这些占位 token 的信息。信息记录在mm_position变量中。
- self._maybe_apply_prompt_updates
将图片数据插入prompt_ids,并记录占位符信息。
1.2 其他
InputProcessor和InputPreprocessor剩余部分未对数据本身进行转换,只是定义了新的class去包裹,最终在InputProcessor.process_inputs中放在EngineCoreRequest中传递给receive侧。
2 请求发送
engine_core.add_request->SyncMPClient.add_request->SyncMPClient._send_input(EngineCoreRequestType.ADD, request)
SyncMPClient.resources管理了engine_manager(CoreEngineProcManager),以及llm_engine与core_engine通信的套接字socket。
SyncMPClient.resources.engine_manager.processes后台启动了多个engine_core,并记录了其句柄。
3 请求发送数据结构
->MultiModalProcessingInfo + prompt_ids,
kwargs
hashes
prompt_updates
->MultiModalInputs:
prompt_token_ids: 已经插入了多模态占位符的tokens, list长度为[972+prompt_len]。
mm_kwargs: Dict结构的mm data,image的data为一个list,list中每一个元素为一张图片的信息,包含{pixel_value:MultiModalFieldElem存储image token的tensor, tensor形状为[3888, 1536]; image_grid_thw: MultiModalFieldElem指示image token第一维度形状的长宽,tensor=[1, 72, 54]}
mm_hashes:
mm_placeholders: {'image': [PlaceholderRange(offset=15, length=972, is_embed=None)]}
MultiModalInputs两部分会在EngineCoreRequest(类型MultiModalFeatureSpec)中分别用data和identifier传入EngineCore计算。
->EngineCoreRequest
request_id
prompt_token_ids
mm_features: list[MultiModalFeatureSpec], 其中多模态数据仍然以MultiModalFeatureSpec数据结构传输(该结构为mm data+mm hash)构成。
sampling_params
请求接收与执行
1 请求接收
llm_engine利用socket将request传输给EngineCore
SyncMPClient._send_input ==> EngineCoreProc.process_input_sockets
EngineCoreProc.process_input_sockets:这里处理解析socket传来的数据,还会对ADD类型的数据,即模型推理数据进行缓存的读取。个人觉得把缓存数据读取放在这里不太好。
1)接收数据并分析请求类型
type_frame, *data_frames = input_socket.recv_multipart(copy=False)
request_type = EngineCoreRequestType(bytes(type_frame.buffer))
2)如果请求类型为ADD,对请求做preprocess处理
request = add_request_decoder.decode(data_frames)
request = self.preprocess_add_request(request)
- 将请求放入input_queue, 等待EngineCore.run_busy_loop从队列中读取数据
self.input_queue.put_nowait((request_type, request))
2 请求调度
数据结构:scheduler:Request.mm_features=list[MultiModalFeatureSpec]->MultiModalFeatureSpec.data/identifier
调度链:
DPEngineCoreProc._handle_client_request
->DPEngineCoreProc.run_busy_loop
EngineCoreProc会等待请求队列的请求,并根据请求类型处理(_handle_client_request),其中模型推理请求类型为‘ADD’。
EngineCoreProc.run_busy_loop轮询请求,并每轮执行:
1)_process_input_queue->_handle_client_request->add_request
EngineCoreProc.add_request->Scheduler.add_request
2)_process_engine_step->step_fn
self.step_fn()->step
self.output_queue.put_nowait(output)
->DPEngineCoreProc.step-->EngineCore.step
self.scheduler.schedule
self.model_executor.execute_model
Executor使用rpc调度的方式调用workers进行gpu的推理计算,workers由WorkerWrapperBase管理
self.model_executor.execute_model->Executor.execute(MultiprocExecutor)-> MultiprocExecutor.collective_rpc('execute_model')->WorkerWrapperBase.execute_model
3 Worker执行
-> ModelRunner -> GPUModelRunner.execute_model
GPUModelRunner的初始化中,提前申请了很多GPU的tensor,为GPU graph(compile作准备)。
GPUModelRunner.execute_model
-
self._prepare_inputs
-
self._preprocess
输入:
scheduler_output:
prompt_token_ids:[1001]
mm_features:
pixel_values:torch.Size([3888, 1536])
image_grid_thw:=tensor([[1, 72, 54]])
输出:
inputs_embeds: [1002, 2048]
positions: [3, 1002]
4 模型推理
GPUModelRunner: model_output = self._model_forward
1)llm模型推理
输入:input_ids,positions;
输出:model_output, tensor[batch, hidden_dim]
2)vlm模型推理
输入: inputs_embeds[token_nums, hidden_dim(=2048)], positions[3, token_nums]
输出:model_output, tensor[batch, hidden_dim(=2048)]
5 接收执行数据结构
->EngineCoreRequest
(从cache中取出缓存数据构成完整的request)
->SchedulerOutput
scheduled_new_reqs: NewRequestData{prompt_token_ids, mm_features, }
mm 缓存复用
InputProcessor
设计理念:针对每个模型和配置参数,生成推理请求各模态的每个数据item的hash和cache;同一模态的同一uuids代表着完全相同的模态数据,如图片,给同一uuids再次传入新的图片data将被忽略。vllm会在req process阶段计算hash和需要增加(missing)的模态数据,并传给worker,worker侧会管理hash与对应的模态数据。
一、创建
InputProcessor创建cache:
self.mm_processor_cache = processor_cache_from_config(vllm_config, mm_registry)
->MultiModalProcessorSenderCache(model_config): self._cache = MultiModalCache.get_lru_cache
->MultiModalCache.get_lru_cache->LRUCache
二、处理流程
->InputProcessor.process_inputs->InputPreProcessor.preprocess
mm_cache由InputProcessor管理, 数据结构InputProcessor.mm_processor_cache,数据结构MultiModalProcessorSenderCache
->InputPreProcessor._preprocess->InputPreProcessor._process_decoder_only_prompt->InputPreProcessor._prompt_to_llm_inputs->InputPreProcessor._process_text->InputPreProcessor._process_multimodal
->Qwen3VLMultiModalProcessor.apply->BaseMultiModalProcessor.apply
BaseMultiModalProcessor.cache = MultiModalProcessorSenderCache
1. BaseMultiModalProcessor._cached_apply_hf_processor:处理mm_items缓存,如果未进行缓存,则新增hash和缓存,如果有缓存,则检查是否需要更新缓存;最后返回req推理需要的prompt_ids, mm_info
1)_hash_mm_items: 计算本次请求全部mm items的hash值
使用model_id, modality, item, hf_processor_mm_kwargs, tokenization_kwargs为每个模态的每个item生成mm_hashes值,并用字典保存。其中后四项同一模态(如qwen3vl是it->t)维持不变,注意此处计算hash值只与item的类型挂钩,与item.data的实际值无关。hf_processor_mm_kwargs, tokenization_kwargs打印出来为空,暂时未用。
2)_get_cache_missing_items: 根据hash值验证哪些items已经被缓存,并返回未被缓存的items
返回值:
mm_is_cached:标注各模态是否存在cache数据;
mm_missing_data_items:还没有被缓存的cache数据;
3)_apply_hf_processor_main:对未进行缓存的data items进行preprocess,转换成模型推理需要的形式
输入:
prompt:str,输入的句子。
mm_missing_data_items: MultiModalDataItems,所有未缓存的多模态数据
输出:
prompt_id: prompt转换处理的token,还未进行mm插值。
mm_missing_processed_data:Dict[str, List[Dict[str,tensor]]], 转换后(以及转换成多模态模型推理需要格式)的mm数据。
is_update_applied:一直是False,暂时没用。
4)->mm_missing_kwargs:每个模态的数据是一个list,list的每个元素为推理需要的tensor。
- _merge_mm_kwargs
2. BaseMultiModalProcessor._maybe_apply_prompt_updates
prompt_ids: List[int[, 填充了占位符的tokens。
mm_placeholders: 指示占位符开始的位置,以及占位符本身。
EngineCoreProc
cache由EngineCoreProc管理: EngineCore.mm_receiver_cache
EngineCoreProc.process_input_sockets->EngineCore.preprocess_add_request
->EngineCore.mm_receiver_cache.get_and_update_features->BaseMultiModalReceiverCache.get_and_update_features
->BaseMultiModalReceiverCache.get_and_update_item
feature.data = self.get_and_update_item(feature.data, feature.identifier)
self._cache[mm_hash] = mm_item
Note: WorkerWrapperBase处也写了mm_cache的代码和逻辑,但是v0.13.0更新和读取cache都在EngineCoreProc。
online
启动
vllm/entrypoints/cli/main.py->vllm/entrypoints/cli/serve.py:
->ServeSubcommand.cmd->uvloop.run(run_server(args))->vllm/entrypoints/openai/api_server.py:run_server
->run_server_worker
->build_async_engine_client->build_async_engine_client_from_engine_args
->async_llm = AsyncLLM.from_vllm_config->AsyncLLM.__init__
1.InputProcessor/OutputProcessor与LLM相同
2.engine_core:后台拉起的EngineCoreProc和LLM复用MPClient代码
EngineCoreClient.make_async_mp_client->AsyncMPClient
请求传递与执行
https请求在哪里转换成async_engine需要的req的,req的内容是什么样子的?
1. 从路由接收请求
->api_server.py:: create_chat_completion, 接受的ChatCompletionRequest,即承载着原始http请求字段;
2. 将OPENAI格式的request转换成LLMEngine格式的request。
->OpenAIServingChat.create_chat_completion
- self._preprocess_chatrequest
输出:conversation, engine_prompts
conversation:[{'role': 'user', 'content': [{'type': 'text', 'text': '图片里面有什么?'}, {'type': 'image'}]}]
engine_prompts:[{'prompt_token_ids': [151644, 872, 198, 45930, 100420, 104139, 30, 151652, 151655, 151653, 151645, 198, 151644, 77091, 198], 'prompt': '<|im_start|>user\n图片里面有什么?<|vision_start|><|image_pad|><|vision_end|><|im_end|>\n<|im_start|>assistant\n', 'multi_modal_data': {'image': [MediaWithBytes(media=<PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=1920x2530 at 0x7F0828425430>, original_bytes=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08\x07\...)]}, 'multi_modal_uuids': {'image': [None]}}]
2)self._process_inputs
输出:engine_request:EngineCoreRequest
3)self.engine_client.generate
这里调用到AsyncLLM.generate
输出:AsyncGenerator[RequestOutput, None]
调度图总结
HTTP POST /v1/chat/completions
│
▼
create_chat_completion()
│
├─> chat_completion_parser() ──> prompt: str
├─> request_id = f"cmpl-xxx"
├─> sampling_params = request.to_sampling_params()
▼
engine.generate(prompt, sampling_params, request_id)
│
▼
AsyncLLMEngine.generate()
│
├─> prompt_token_ids = tokenizer.encode(prompt)
├─> seq = Sequence(...)
├─> seq_group = SequenceGroup(seqs=[seq], ...)
▼
scheduler.add_seq_group(seq_group) # ← 进入调度队列
浙公网安备 33010602011771号