vllm 多机多卡部署大模型 (Ray + vLLM )
Ray 和 vLLM 介绍
如果需要把一个大模型用多卡 / 多机跑起来,追求推理性能:直接用 vLLM 自身的 MPI 多机部署。
如果需要管理多个模型服务、动态扩缩容、统一调度多机资源、构建复杂分布式推理系统:需要用 Ray 封装 vLLM,借助 Ray 的分布式能力提升系统的灵活性和可扩展性。
Ray和vLLM :两者不是竞争关系,而是互补关系,解决大模型部署中的不同问题。
| 工具 | 核心定位 | 核心能力 | 解决的问题 |
|---|---|---|---|
| vLLM | 高性能大模型推理引擎 | 1. 基于 PagedAttention 实现超高吞吐量
|
解决「单模型 / 多卡推理的性能瓶颈」,让单模型在显卡上跑得更快、更省显存 |
| Ray | 分布式计算框架(分布式应用平台) | 1. 跨机资源管理(CPU/GPU/ 内存)
|
解决「多模型、多服务、跨机资源调度」的问题,是分布式应用的「基座」 |
Ray + vLLM 协同
vLLM负责模型的推理核心,而Ray则负责分布式环境的协调和资源管理。
Ray框架则负责构建和管理分布式集群。在部署vLLM分布式推理应用时,我们首先需要使用Ray启动一个分布式集群。Ray会自动处理节点间的通信、任务调度和资源分配。
优势
- 更灵活的资源管理:Ray 可以精细调度每台机器的 GPU/CPU 资源,避免 vLLM 直接部署时的资源浪费,支持按需分配显卡给不同模型。
- 弹性扩缩容:可以根据请求量动态增加 / 减少 vLLM 推理实例,应对流量波动。
- 多模型管理:轻松部署和管理多个不同的 vLLM 模型服务,统一入口调度。
- 更好的容错性:Ray 会监控工作节点状态,节点故障时自动重启任务,比 vLLM 原生 MPI 部署的容错能力更强。
- 简化分布式部署:无需手动配置 MPI 免密、hostfile,Ray 自带跨机通信和节点管理,降低多机部署门槛。
#################################################
需预先安装 torch、torchvision、torchaudio、transformers
再安装 ray、vllm
#################################################
# 创建conda 环境 conda create -n ray-vllm python=3.10 conda activate ray-vllm # 安装torch # CUDA 12.1 pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121 # 安装 transformer 和其他依赖 pip install transformers==4.57.1 pip install accelerate pip install fsspec pip install tensorboard ################################################# # 安装ray # 安装最新版 Ray(包含所有组件) pip install "ray[default]" # 基础安装 # 或者安装完整版 pip install "ray[default,air,tune,serve]" # 如果需要特定版本 pip install "ray[default]==2.8.0" ################################################# # 安装vllm # 方法1:从源码安装(推荐,兼容性更好) git clone https://github.com/vllm-project/vllm.git cd vllm pip install -e . # 可编辑安装 # 方法2:直接安装(可能会有依赖冲突) pip install vllm # 安装特定版本(推荐) pip install vllm==0.11.0 ################################################# # 验证 Ray 安装 # 启动 Ray 集群(单节点) ray start --head # 或者通过 Python 验证 python -c "import ray; ray.init(); print('Ray initialized successfully')" # 停止 Ray ray stop ################################################# # 验证 vllm 安装 # 创建测试文件 test_vllm.py from vllm import LLM, SamplingParams # 简单的推理测试 sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=100) llm = LLM(model="facebook/opt-125m") # 使用小模型测试 outputs = llm.generate(["Hello, my name is"], sampling_params) print(outputs[0].outputs[0].text)
核心工作逻辑(Ray 集群的调度机制)
Ray 集群的核心是「主节点统一调度,工作节点被动执行」,具体到 vLLM 部署:
先完成 Ray 集群的搭建:
- 第一步:在一台机器上启动 Ray 主节点(Head Node)(命令
ray start --head)。 - 第二步:将其他所有需要参与分布式推理的机器,作为 Ray 工作节点(Worker Node) 加入该主节点(命令
ray start --address=<主节点IP:6379>)。
此时所有节点会组成一个统一的 Ray 集群,主节点掌握所有节点的资源信息(GPU、CPU、显存等)。
- 仅在 Ray 主节点 上执行 vLLM 启动命令(即你的
sh vllm-start.sh):
- vLLM 会通过
--distributed-executor-backend=ray参数,连接到本地的 Ray 主节点。 - vLLM 会向 Ray 主节点提交「分布式推理任务」,并声明所需资源(如
--tensor-parallel-size=2所需的 2 块 GPU)。 - Ray 主节点会自动调度任务,将模型的张量并行分片、推理计算等工作,分配到集群内可用的工作节点(或主节点自身)的 GPU 上。
- 所有工作节点会被动接收 Ray 主节点的调度指令,自动加载所需的 vLLM 相关组件和模型分片,无需你手动在工作节点上操作。
- vLLM 会通过
Ray 2.32.0 集群运行需要多个端口协同,仅开放 6379 远远不够,工作节点短暂在线后离线,大概率是部分关键端口未映射 / 未开放,导致主节点与工作节点的保活连接中断。
Ray 核心必备端口(必须开放 / 映射)
| 端口 | 组件 / 用途 | 是否必须 | 备注 |
|---|---|---|---|
| 6379 | GCS Server(集群元数据存储) | 是 | 主节点核心端口,所有节点必须能访问 |
| 8265 | Ray Dashboard(集群监控) | 否(但便于排查) | 可视化集群状态,不影响节点保活 |
| 30000-32000 | 节点间 P2P 通信、工作进程通信、心跳保活 | 是 | 随机端口段,Ray 默认使用该区间进行节点间直接通信 |
| 10001 | Ray Client(客户端连接) | 否 | 仅需远程客户端连接时开放 |
ray部署,ray集群head节点和全部worker节点
在各个节点上运行
## lf001主节点
export VLLM_HOST_IP=lf001
ray start --head \
--node-ip-address=lf001 \
--port=6379 \
--include-dashboard=True \
--dashboard-host=0.0.0.0 \
--dashboard-port=8265 \
--num-gpus=1 \
--resources='{"node:lf001": 1}'
##lf002 节点
export VLLM_HOST_IP=lf002
ray start --address="lf001:6379" \
--node-ip-address=lf002 \
--num-gpus=1 \
--resources='{"node:lf002": 1}'
# 查看状态
ray status
## 查看节点状态
ray list nodes
ray 集群启动结果:


1. Cluster(集群模块):集群资源与节点的 “全景地图”
核心作用:查看 Ray 集群的节点状态、资源分配、集群配置,是了解集群整体健康状况的第一入口。
- 关键信息展示:
- 节点列表:所有 Active(活跃)、Pending(待启动)、Failed(失败)的节点,包含节点 IP、主机名、角色(Head/Worker)。
- 资源详情:每个节点的 CPU、GPU、内存、对象存储内存(Object Store)的「总资源」「已使用资源」「剩余资源」。
- 集群配置:Ray 版本、启动参数、资源调度策略等。
- 实际使用场景:
- 快速验证 “Worker 节点是否成功加入集群”(对应你之前的 2 节点集群,可在此确认两个节点是否都处于 Active 状态)。
- 排查 “资源不足” 问题:比如 vLLM 启动失败,可查看 GPU / 内存是否已被其他任务占用。
- 确认节点角色:区分哪个是 Head 节点,哪个是 Worker 节点,方便后续定位问题节点。
2. Jobs(任务模块):提交 / 管理 / 监控 Ray 任务(Jobs)的入口
核心作用:负责提交 Ray 任务、查看任务运行状态、获取任务执行结果、终止异常任务,这里的 “Job” 是 Ray 的顶层任务单位(一个独立的计算任务 / 脚本)。
- 关键信息展示:
- 任务列表:所有提交的任务,包含任务 ID、任务名称、提交时间、运行状态(Pending/Running/Succeeded/Failed/Cancelled)。
- 任务详情:任务的提交命令、环境变量、日志输出、执行时长、占用资源。
- 操作功能:提交新任务(上传脚本 / 输入命令)、终止运行中的任务、重新运行已完成的任务。
- 实际使用场景:
- 替代终端提交 Ray 任务,可视化操作更便捷(比如提交 vLLM 相关的分布式任务)。
- 排查 “任务启动失败”:查看任务的状态和日志,确认是命令错误还是资源不足。
- 终止异常挂起的任务,释放集群资源。
3. Actors(监控模块):监控 Ray Actor 的运行状态
核心作用:查看和监控 Ray 集群中所有Actor 实例的状态,Actor 是 Ray 中 “有状态的分布式对象”(长期运行、可保存状态、支持远程调用),vLLM 通过 Ray 启动时会创建大量 Actor(比如
RayWorkerWrapper就是 Actor)。- 关键信息展示:
- Actor 列表:所有已创建的 Actor,包含 Actor ID、Actor 类名(比如
EngineCore、RayWorkerWrapper)、所属节点、运行状态、创建时间、占用资源。 - Actor 详情:Actor 的远程调用记录、依赖关系、异常信息。
- Actor 列表:所有已创建的 Actor,包含 Actor ID、Actor 类名(比如
- 实际使用场景:
- 排查 vLLM 启动失败的核心入口:查看
RayWorkerWrapper等 Actor 是否正常创建,是否处于 Failed 状态,定位哪个节点的 Actor 异常。 - 确认 Actor 是否均匀分布在各个 Worker 节点(比如 vLLM 的张量并行是否对应 2 个节点的 Actor)。
- 发现内存泄漏:某个 Actor 长期占用资源且不释放,可在此定位并排查。
- 排查 vLLM 启动失败的核心入口:查看
4. Metrics(指标模块):集群与任务的 “性能仪表盘”
核心作用:以时序图表的形式展示 Ray 集群、节点、任务、Actor 的核心性能指标,用于监控性能、排查瓶颈、优化资源配置。
- 关键指标展示(分为多个分类图表):
- 集群级指标:总 CPU/GPU/ 内存使用率、任务提交速率、任务完成速率。
- 节点级指标:单个节点的 CPU/GPU/ 内存 / 网络 IO / 磁盘 IO 使用率。
- 任务 / Actor 指标:任务执行时长、Actor 远程调用延迟、对象存储(Object Store)的读写速率。
- 自定义指标:若业务(如 vLLM)埋入了自定义监控指标,也会在此展示。
- 实际使用场景:
- 排查 vLLM 性能瓶颈:比如 GPU 使用率过低(可能是张量并行配置不合理)、网络 IO 过高(跨节点通信瓶颈)。
- 监控集群资源水位:避免资源使用率过高导致任务卡顿或 OOM。
- 长期趋势分析:观察集群在高负载下的性能表现,优化资源配置(如增加 GPU / 内存)。
5. Logs(日志模块):集群与任务的 “集中日志查询中心”
核心作用:集中收集、查询、过滤 Ray 集群中所有节点、任务、Actor 的日志,无需逐个节点登录查看日志,是分布式环境下问题排查的核心工具。
- 关键功能展示:
- 日志分类:按「节点」「任务」「Actor」「Ray 组件」(如 Head 节点、GCS)分类查询日志。
- 日志过滤:按时间范围、日志级别(DEBUG/INFO/WARN/ERROR)、关键词(如
Gloo、Connection refused)过滤日志。 - 日志实时刷新:支持实时查看滚动输出的日志(类似终端的
tail -f)。
- 实际使用场景:
- 排查之前的 Gloo 通信错误:无需逐个节点登录,直接在此按关键词
Gloo、Connection refused过滤,查看所有节点的相关日志,定位具体哪个节点的通信失败。 - 查看 vLLM 启动过程的完整日志:按任务 ID 或 Actor ID 筛选,获取详细的启动流程和异常堆栈。
- 快速定位节点级异常:比如某个 Worker 节点日志中出现大量 OOM 报错,确认该节点资源不足。
- 排查之前的 Gloo 通信错误:无需逐个节点登录,直接在此按关键词
6. Server(服务模块):Ray 集群自身服务的状态监控
核心作用:监控 Ray 集群自身核心服务(如 GCS、Dashboard Server)的运行状态,确保 Ray 集群的 “基础设施” 正常。
- 关键信息展示:
- Ray 核心服务列表:GCS(Global Control Store,Ray 的全局控制中心)、Dashboard Server、Autoscaler(自动扩缩容服务)等。
- 服务状态:每个服务的运行状态(Running/Failed)、所属节点、进程 ID(PID)、启动时间。
- 实际使用场景:
- 排查 Ray 集群自身故障:比如任务无法提交,可能是 GCS 服务异常,可在此确认 GCS 是否处于 Running 状态。
- 确认 Dashboard Server 是否正常运行:若无法访问 Ray 页面,可在此查看 Dashboard Server 的状态。
- 排查自动扩缩容失败:若集群无法自动添加 Worker 节点,可查看 Autoscaler 服务的状态和日志。
注:ray work process需要配置主机名映射:


vllm启动脚本:
#!/bin/bash echo $GLOO_SOCKET_IFNAME export NCCL_IB_DISABLE=1 export NCCL_SOCKET_IFNAME=eth0 export GLOO_SOCKET_IFNAME=eth0 export NCCL_DEBUG=DEBUG export VLLM_LOGGING_LEVEL=DEBUG export VLLM_DISTRIBUTED_BACKEND=nccl export TORCH_DISTRIBUTED_BACKEND=nccl export RAY_ADDRESS=10.60.182.19:6379 export VLLM_USE_RAY_SPMD_WORKER=0 export TORCH_DISTRIBUTED_INIT_TIMEOUT=600 export GLOO_TIMEOUT=600 python -m vllm.entrypoints.openai.api_server \ --model /opt/models/ \ --tensor-parallel-size 2 \ --pipeline-parallel-size 1 \ --trust-remote-code \ --gpu-memory-utilization 0.75 \ --max-num-seqs 64 \ --max-model-len 4096 \ --host 0.0.0.0 \ --port 8000 \ --distributed-executor-backend=ray
ray适配各种深度学习算法:
- 并行优势体现:上述示例中因数据量 / 任务量较小,并行优势可能不明显,当任务量(如 1000 个独立任务)或数据量(如 100 万条数据)增大时,并行耗时会远低于串行耗时。
- 多节点环境适配:仅需将
ray.init()改为ray.init(address="Ray Head节点IP:6379"),其余代码完全不变,Ray 会自动将任务调度到集群中的所有节点。 - 资源控制:若需限制每个任务 / Actor 的资源,可在
@ray.remote中指定,如@ray.remote(num_cpus=2, num_gpus=0.5)(分配 2 个 CPU 核心和 0.5 张 GPU)。
- Ray Task(装饰函数)适合无状态、独立可并行的算法任务,是最基础且常用的并行方式。
- Ray Actor(装饰类)适合有状态、长期运行的算法任务,支持维护独立状态,适合需要累积结果的场景。
核心 API:
@ray.remote:装饰函数,将其转为可并行执行的远程任务。函数名.remote():提交远程任务,立即返回任务 ID(非实际结果),不阻塞主线程。ray.get():根据任务 ID 获取实际结果,阻塞主线程直到所有任务完成。- 若要分批获取结果,可使用
ray.wait()(非阻塞,适合大规模任务)。
import ray import time # 1. 初始化 Ray(单机环境,自动启动本地 Ray 集群;多节点环境指定 address 即可) ray.init(ignore_reinit_error=True) # ignore_reinit_error 避免重复初始化报错 # 2. 定义并行任务(用 @ray.remote 装饰,标记为 Ray 远程任务) # 模拟一个耗时算法(添加 1 秒延时,体现并行优势) @ray.remote def calculate_square(num: int) -> int: """计算数字的平方,模拟耗时算法任务""" time.sleep(1) # 模拟算法执行耗时(如数据处理、模型推理) return num * num # 3. 串行执行(作为对比,查看耗时) print("=== 串行执行 ===") start_time = time.time() serial_results = [calculate_square(num) for num in range(10)] # 直接调用函数(串行) serial_cost = time.time() - start_time print(f"串行结果:{serial_results}") print(f"串行耗时:{serial_cost:.2f} 秒\n") # 4. 并行执行(使用 Ray Task 分布式调度) print("=== 并行执行 ===") start_time = time.time() # 4.1 提交所有远程任务(返回任务 ID,不阻塞) remote_tasks = [calculate_square.remote(num) for num in range(10)] # 4.2 获取所有任务结果(阻塞,直到所有任务完成) parallel_results = ray.get(remote_tasks) parallel_cost = time.time() - start_time print(f"并行结果:{parallel_results}") print(f"并行耗时:{parallel_cost:.2f} 秒") # 5. 关闭 Ray 集群(可选,单机环境运行完毕后可关闭) ray.shutdown()
import ray import time # 1. 初始化 Ray ray.init(ignore_reinit_error=True) # 2. 定义有状态 Actor 类(用 @ray.remote 装饰,标记为 Ray Actor) @ray.remote class CounterActor: """有状态计数器,模拟累积算法结果的有状态任务""" def __init__(self, name: str): # 初始化状态(如模型参数、累积指标、历史记录等) self.name = name self.count = 0 # 核心状态:计数器值 def increment(self, step: int = 1) -> int: """增加计数器值,模拟算法更新状态""" time.sleep(0.5) # 模拟状态更新耗时(如模型参数更新) self.count += step return self.count def get_count(self) -> tuple: """获取当前计数器状态,模拟获取算法累积结果""" return (self.name, self.count) # 3. 创建 3 个 Actor 实例(并行运行,各自维护独立状态) actor1 = CounterActor.remote("计数器1") actor2 = CounterActor.remote("计数器2") actor3 = CounterActor.remote("计数器3") # 4. 并行调用 Actor 方法(更新状态,无阻塞提交任务) print("=== 并行更新 Actor 状态 ===") start_time = time.time() # 每个 Actor 执行 5 次增量操作 remote_tasks = [] for _ in range(5): remote_tasks.append(actor1.increment.remote(2)) # 每次加 2 remote_tasks.append(actor2.increment.remote(3)) # 每次加 3 remote_tasks.append(actor3.increment.remote(1)) # 每次加 1 # 等待所有更新任务完成 ray.get(remote_tasks) # 5. 获取所有 Actor 的最终状态 actor_results = ray.get([ actor1.get_count.remote(), actor2.get_count.remote(), actor3.get_count.remote() ]) # 6. 输出结果和耗时 cost_time = time.time() - start_time print(f"所有 Actor 最终状态:{actor_results}") print(f"并行更新耗时:{cost_time:.2f} 秒") # 7. 关闭 Ray 集群 ray.shutdown()
import ray import time import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split # 1. 初始化 Ray ray.init(ignore_reinit_error=True) # 2. 准备数据和训练模型(模拟已训练好的业务模型) print("=== 准备数据和训练模型 ===") X, y = make_classification(n_samples=1000, n_features=20, random_state=42) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.8, random_state=42) # 训练 RandomForest 模型 model = RandomForestClassifier(n_estimators=100, random_state=42) model.fit(X_train, y_train) print(f"模型训练完成,测试数据量:{len(X_test)} 条\n") # 3. 定义并行预测任务(拆分批量数据,独立预测) @ray.remote def batch_predict(model, X_batch: np.ndarray) -> np.ndarray: """批量数据预测,模拟机器学习算法推理""" time.sleep(0.5) # 模拟推理耗时(如复杂模型预测) return model.predict(X_batch) # 4. 串行预测(对比耗时) print("=== 串行预测 ===") start_time = time.time() serial_predictions = model.predict(X_test) serial_cost = time.time() - start_time print(f"串行预测完成,结果长度:{len(serial_predictions)}") print(f"串行耗时:{serial_cost:.2f} 秒\n") # 5. 并行预测(拆分数据为多个批次,并行调度) print("=== 并行预测 ===") start_time = time.time() # 5.1 拆分测试数据为 8 个批次(按需调整批次数量,适配CPU核心数) batch_size = len(X_test) // 8 X_batches = [X_test[i*batch_size : (i+1)*batch_size] for i in range(8)] # 补充最后一个批次的剩余数据 if len(X_test) % 8 != 0: X_batches.append(X_test[8*batch_size:]) # 5.2 提交并行预测任务 remote_tasks = [batch_predict.remote(model, batch) for batch in X_batches] # 5.3 获取并合并预测结果 parallel_batch_results = ray.get(remote_tasks) parallel_predictions = np.concatenate(parallel_batch_results) # 5.4 输出结果和耗时 parallel_cost = time.time() - start_time print(f"并行预测完成,结果长度:{len(parallel_predictions)}") print(f"并行耗时:{parallel_cost:.2f} 秒") print(f"串行与并行结果是否一致:{np.array_equal(serial_predictions, parallel_predictions)}") # 6. 关闭 Ray 集群 ray.shutdown()

浙公网安备 33010602011771号