vllm 多机多卡部署大模型 (Ray + vLLM )

Ray 和 vLLM 介绍

如果需要把一个大模型用多卡 / 多机跑起来,追求推理性能:直接用 vLLM 自身的 MPI 多机部署。

如果需要管理多个模型服务、动态扩缩容、统一调度多机资源、构建复杂分布式推理系统:需要用 Ray 封装 vLLM,借助 Ray 的分布式能力提升系统的灵活性和可扩展性。

Ray和vLLM :两者不是竞争关系,而是互补关系,解决大模型部署中的不同问题。

工具核心定位核心能力解决的问题
vLLM 高性能大模型推理引擎 1. 基于 PagedAttention 实现超高吞吐量
 
2. 支持张量并行、流水线并行
 
3. 提供简单的 API 服务封装
解决「单模型 / 多卡推理的性能瓶颈」,让单模型在显卡上跑得更快、更省显存
Ray 分布式计算框架(分布式应用平台) 1. 跨机资源管理(CPU/GPU/ 内存)
 
2. 分布式任务调度与执行
 
3. 容错、弹性扩缩容
 
4. 支持构建分布式服务
解决「多模型、多服务、跨机资源调度」的问题,是分布式应用的「基座」

 

Ray + vLLM 协同

vLLM负责模型的推理核心,而Ray则负责分布式环境的协调和资源管理。

Ray框架则负责构建和管理分布式集群。在部署vLLM分布式推理应用时,我们首先需要使用Ray启动一个分布式集群。Ray会自动处理节点间的通信、任务调度和资源分配。

优势

  1. 更灵活的资源管理:Ray 可以精细调度每台机器的 GPU/CPU 资源,避免 vLLM 直接部署时的资源浪费,支持按需分配显卡给不同模型。
  2. 弹性扩缩容:可以根据请求量动态增加 / 减少 vLLM 推理实例,应对流量波动。
  3. 多模型管理:轻松部署和管理多个不同的 vLLM 模型服务,统一入口调度。
  4. 更好的容错性:Ray 会监控工作节点状态,节点故障时自动重启任务,比 vLLM 原生 MPI 部署的容错能力更强。
  5. 简化分布式部署:无需手动配置 MPI 免密、hostfile,Ray 自带跨机通信和节点管理,降低多机部署门槛。
 

#################################################
需预先安装 torch、torchvision、torchaudio、transformers
再安装 ray、vllm
#################################################
# 创建conda 环境
conda create -n ray-vllm python=3.10
conda activate ray-vllm

# 安装torch
# CUDA 12.1
pip install torch==2.1.2 torchvision==0.16.2 torchaudio==2.1.2 --index-url https://download.pytorch.org/whl/cu121

# 安装 transformer 和其他依赖
pip install transformers==4.57.1
pip install accelerate
pip install fsspec
pip install tensorboard

#################################################

# 安装ray
# 安装最新版 Ray(包含所有组件)
pip install "ray[default]"  # 基础安装
# 或者安装完整版
pip install "ray[default,air,tune,serve]"

# 如果需要特定版本
pip install "ray[default]==2.8.0"

#################################################

# 安装vllm
# 方法1:从源码安装(推荐,兼容性更好)
git clone https://github.com/vllm-project/vllm.git
cd vllm
pip install -e .  # 可编辑安装

# 方法2:直接安装(可能会有依赖冲突)
pip install vllm

# 安装特定版本(推荐)
pip install vllm==0.11.0

#################################################

# 验证 Ray 安装
# 启动 Ray 集群(单节点)
ray start --head

# 或者通过 Python 验证
python -c "import ray; ray.init(); print('Ray initialized successfully')"

# 停止 Ray
ray stop

#################################################
# 验证 vllm 安装
# 创建测试文件 test_vllm.py
from vllm import LLM, SamplingParams

# 简单的推理测试
sampling_params = SamplingParams(temperature=0.8, top_p=0.95, max_tokens=100)
llm = LLM(model="facebook/opt-125m")  # 使用小模型测试

outputs = llm.generate(["Hello, my name is"], sampling_params)
print(outputs[0].outputs[0].text)

 

核心工作逻辑(Ray 集群的调度机制)

 Ray 集群的核心是「主节点统一调度,工作节点被动执行」,具体到 vLLM 部署:

先完成 Ray 集群的搭建:

  • 第一步:在一台机器上启动 Ray 主节点(Head Node)(命令 ray start --head)。
  • 第二步:将其他所有需要参与分布式推理的机器,作为 Ray 工作节点(Worker Node) 加入该主节点(命令 ray start --address=<主节点IP:6379>)。

此时所有节点会组成一个统一的 Ray 集群,主节点掌握所有节点的资源信息(GPU、CPU、显存等)。

  • 仅在 Ray 主节点 上执行 vLLM 启动命令(即你的 sh vllm-start.sh):
    1. vLLM 会通过 --distributed-executor-backend=ray 参数,连接到本地的 Ray 主节点。
    2. vLLM 会向 Ray 主节点提交「分布式推理任务」,并声明所需资源(如 --tensor-parallel-size=2 所需的 2 块 GPU)。
    3. Ray 主节点会自动调度任务,将模型的张量并行分片、推理计算等工作,分配到集群内可用的工作节点(或主节点自身)的 GPU 上。
    4. 所有工作节点会被动接收 Ray 主节点的调度指令,自动加载所需的 vLLM 相关组件和模型分片,无需你手动在工作节点上操作。

 

Ray 2.32.0 集群运行需要多个端口协同,仅开放 6379 远远不够,工作节点短暂在线后离线,大概率是部分关键端口未映射 / 未开放,导致主节点与工作节点的保活连接中断。

Ray 核心必备端口(必须开放 / 映射)
端口组件 / 用途是否必须备注
6379 GCS Server(集群元数据存储) 主节点核心端口,所有节点必须能访问
8265 Ray Dashboard(集群监控) 否(但便于排查) 可视化集群状态,不影响节点保活
30000-32000 节点间 P2P 通信、工作进程通信、心跳保活 随机端口段,Ray 默认使用该区间进行节点间直接通信
10001 Ray Client(客户端连接) 仅需远程客户端连接时开放

 

ray部署,ray集群head节点和全部worker节点

在各个节点上运行

## lf001主节点
export VLLM_HOST_IP=lf001
ray start --head \
  --node-ip-address=lf001 \
  --port=6379 \
  --include-dashboard=True \
  --dashboard-host=0.0.0.0 \
  --dashboard-port=8265 \
  --num-gpus=1 \
  --resources='{"node:lf001": 1}'

##lf002 节点
export VLLM_HOST_IP=lf002
ray start --address="lf001:6379" \
  --node-ip-address=lf002 \
  --num-gpus=1 \
  --resources='{"node:lf002": 1}'

 # 查看状态
 ray status

## 查看节点状态
ray list nodes

 ray 集群启动结果:

image

image

1. Cluster(集群模块):集群资源与节点的 “全景地图”

核心作用:查看 Ray 集群的节点状态、资源分配、集群配置,是了解集群整体健康状况的第一入口。
  • 关键信息展示:
    1. 节点列表:所有 Active(活跃)、Pending(待启动)、Failed(失败)的节点,包含节点 IP、主机名、角色(Head/Worker)。
    2. 资源详情:每个节点的 CPU、GPU、内存、对象存储内存(Object Store)的「总资源」「已使用资源」「剩余资源」。
    3. 集群配置:Ray 版本、启动参数、资源调度策略等。
  • 实际使用场景:
    • 快速验证 “Worker 节点是否成功加入集群”(对应你之前的 2 节点集群,可在此确认两个节点是否都处于 Active 状态)。
    • 排查 “资源不足” 问题:比如 vLLM 启动失败,可查看 GPU / 内存是否已被其他任务占用。
    • 确认节点角色:区分哪个是 Head 节点,哪个是 Worker 节点,方便后续定位问题节点。

2. Jobs(任务模块):提交 / 管理 / 监控 Ray 任务(Jobs)的入口

核心作用:负责提交 Ray 任务、查看任务运行状态、获取任务执行结果、终止异常任务,这里的 “Job” 是 Ray 的顶层任务单位(一个独立的计算任务 / 脚本)。
  • 关键信息展示:
    1. 任务列表:所有提交的任务,包含任务 ID、任务名称、提交时间、运行状态(Pending/Running/Succeeded/Failed/Cancelled)。
    2. 任务详情:任务的提交命令、环境变量、日志输出、执行时长、占用资源。
    3. 操作功能:提交新任务(上传脚本 / 输入命令)、终止运行中的任务、重新运行已完成的任务。
  • 实际使用场景:
    • 替代终端提交 Ray 任务,可视化操作更便捷(比如提交 vLLM 相关的分布式任务)。
    • 排查 “任务启动失败”:查看任务的状态和日志,确认是命令错误还是资源不足。
    • 终止异常挂起的任务,释放集群资源。

3. Actors(监控模块):监控 Ray Actor 的运行状态

核心作用:查看和监控 Ray 集群中所有Actor 实例的状态,Actor 是 Ray 中 “有状态的分布式对象”(长期运行、可保存状态、支持远程调用),vLLM 通过 Ray 启动时会创建大量 Actor(比如RayWorkerWrapper就是 Actor)。
  • 关键信息展示:
    1. Actor 列表:所有已创建的 Actor,包含 Actor ID、Actor 类名(比如EngineCoreRayWorkerWrapper)、所属节点、运行状态、创建时间、占用资源。
    2. Actor 详情:Actor 的远程调用记录、依赖关系、异常信息。
  • 实际使用场景:
    • 排查 vLLM 启动失败的核心入口:查看RayWorkerWrapper等 Actor 是否正常创建,是否处于 Failed 状态,定位哪个节点的 Actor 异常。
    • 确认 Actor 是否均匀分布在各个 Worker 节点(比如 vLLM 的张量并行是否对应 2 个节点的 Actor)。
    • 发现内存泄漏:某个 Actor 长期占用资源且不释放,可在此定位并排查。

4. Metrics(指标模块):集群与任务的 “性能仪表盘”

核心作用:以时序图表的形式展示 Ray 集群、节点、任务、Actor 的核心性能指标,用于监控性能、排查瓶颈、优化资源配置。
  • 关键指标展示(分为多个分类图表):
    1. 集群级指标:总 CPU/GPU/ 内存使用率、任务提交速率、任务完成速率。
    2. 节点级指标:单个节点的 CPU/GPU/ 内存 / 网络 IO / 磁盘 IO 使用率。
    3. 任务 / Actor 指标:任务执行时长、Actor 远程调用延迟、对象存储(Object Store)的读写速率。
    4. 自定义指标:若业务(如 vLLM)埋入了自定义监控指标,也会在此展示。
  • 实际使用场景:
    • 排查 vLLM 性能瓶颈:比如 GPU 使用率过低(可能是张量并行配置不合理)、网络 IO 过高(跨节点通信瓶颈)。
    • 监控集群资源水位:避免资源使用率过高导致任务卡顿或 OOM。
    • 长期趋势分析:观察集群在高负载下的性能表现,优化资源配置(如增加 GPU / 内存)。

5. Logs(日志模块):集群与任务的 “集中日志查询中心”

核心作用:集中收集、查询、过滤 Ray 集群中所有节点、任务、Actor 的日志,无需逐个节点登录查看日志,是分布式环境下问题排查的核心工具。
  • 关键功能展示:
    1. 日志分类:按「节点」「任务」「Actor」「Ray 组件」(如 Head 节点、GCS)分类查询日志。
    2. 日志过滤:按时间范围、日志级别(DEBUG/INFO/WARN/ERROR)、关键词(如GlooConnection refused)过滤日志。
    3. 日志实时刷新:支持实时查看滚动输出的日志(类似终端的tail -f)。
  • 实际使用场景:
    • 排查之前的 Gloo 通信错误:无需逐个节点登录,直接在此按关键词GlooConnection refused过滤,查看所有节点的相关日志,定位具体哪个节点的通信失败。
    • 查看 vLLM 启动过程的完整日志:按任务 ID 或 Actor ID 筛选,获取详细的启动流程和异常堆栈。
    • 快速定位节点级异常:比如某个 Worker 节点日志中出现大量 OOM 报错,确认该节点资源不足。

6. Server(服务模块):Ray 集群自身服务的状态监控

核心作用:监控 Ray 集群自身核心服务(如 GCS、Dashboard Server)的运行状态,确保 Ray 集群的 “基础设施” 正常。
  • 关键信息展示:
    1. Ray 核心服务列表:GCS(Global Control Store,Ray 的全局控制中心)、Dashboard Server、Autoscaler(自动扩缩容服务)等。
    2. 服务状态:每个服务的运行状态(Running/Failed)、所属节点、进程 ID(PID)、启动时间。
  • 实际使用场景:
    • 排查 Ray 集群自身故障:比如任务无法提交,可能是 GCS 服务异常,可在此确认 GCS 是否处于 Running 状态。
    • 确认 Dashboard Server 是否正常运行:若无法访问 Ray 页面,可在此查看 Dashboard Server 的状态。
    • 排查自动扩缩容失败:若集群无法自动添加 Worker 节点,可查看 Autoscaler 服务的状态和日志。

 注:ray work process需要配置主机名映射:

image

image

 vllm启动脚本:

#!/bin/bash
echo $GLOO_SOCKET_IFNAME
export NCCL_IB_DISABLE=1
export NCCL_SOCKET_IFNAME=eth0
export GLOO_SOCKET_IFNAME=eth0
export NCCL_DEBUG=DEBUG
export VLLM_LOGGING_LEVEL=DEBUG

export VLLM_DISTRIBUTED_BACKEND=nccl
export TORCH_DISTRIBUTED_BACKEND=nccl
export RAY_ADDRESS=10.60.182.19:6379

export VLLM_USE_RAY_SPMD_WORKER=0
export TORCH_DISTRIBUTED_INIT_TIMEOUT=600
export GLOO_TIMEOUT=600

python -m vllm.entrypoints.openai.api_server \
  --model /opt/models/ \
  --tensor-parallel-size 2 \
  --pipeline-parallel-size 1 \
  --trust-remote-code \
  --gpu-memory-utilization 0.75 \
  --max-num-seqs 64 \
  --max-model-len 4096 \
  --host 0.0.0.0 \
  --port 8000 \
  --distributed-executor-backend=ray

 

ray适配各种深度学习算法:

  • 并行优势体现:上述示例中因数据量 / 任务量较小,并行优势可能不明显,当任务量(如 1000 个独立任务)或数据量(如 100 万条数据)增大时,并行耗时会远低于串行耗时。
  • 多节点环境适配:仅需将 ray.init() 改为 ray.init(address="Ray Head节点IP:6379"),其余代码完全不变,Ray 会自动将任务调度到集群中的所有节点。
  • 资源控制:若需限制每个任务 / Actor 的资源,可在 @ray.remote 中指定,如 @ray.remote(num_cpus=2, num_gpus=0.5)(分配 2 个 CPU 核心和 0.5 张 GPU)。
  • Ray Task(装饰函数)适合无状态、独立可并行的算法任务,是最基础且常用的并行方式。
  • Ray Actor(装饰类)适合有状态、长期运行的算法任务,支持维护独立状态,适合需要累积结果的场景。

核心 API:

  • @ray.remote:装饰函数,将其转为可并行执行的远程任务。
  • 函数名.remote():提交远程任务,立即返回任务 ID(非实际结果),不阻塞主线程。
  • ray.get():根据任务 ID 获取实际结果,阻塞主线程直到所有任务完成。
  • 若要分批获取结果,可使用 ray.wait()(非阻塞,适合大规模任务)。
import ray
import time

# 1. 初始化 Ray(单机环境,自动启动本地 Ray 集群;多节点环境指定 address 即可)
ray.init(ignore_reinit_error=True)  # ignore_reinit_error 避免重复初始化报错

# 2. 定义并行任务(用 @ray.remote 装饰,标记为 Ray 远程任务)
# 模拟一个耗时算法(添加 1 秒延时,体现并行优势)
@ray.remote
def calculate_square(num: int) -> int:
    """计算数字的平方,模拟耗时算法任务"""
    time.sleep(1)  # 模拟算法执行耗时(如数据处理、模型推理)
    return num * num

# 3. 串行执行(作为对比,查看耗时)
print("=== 串行执行 ===")
start_time = time.time()
serial_results = [calculate_square(num) for num in range(10)]  # 直接调用函数(串行)
serial_cost = time.time() - start_time
print(f"串行结果:{serial_results}")
print(f"串行耗时:{serial_cost:.2f} 秒\n")

# 4. 并行执行(使用 Ray Task 分布式调度)
print("=== 并行执行 ===")
start_time = time.time()
# 4.1 提交所有远程任务(返回任务 ID,不阻塞)
remote_tasks = [calculate_square.remote(num) for num in range(10)]
# 4.2 获取所有任务结果(阻塞,直到所有任务完成)
parallel_results = ray.get(remote_tasks)
parallel_cost = time.time() - start_time
print(f"并行结果:{parallel_results}")
print(f"并行耗时:{parallel_cost:.2f} 秒")

# 5. 关闭 Ray 集群(可选,单机环境运行完毕后可关闭)
ray.shutdown()
import ray
import time

# 1. 初始化 Ray
ray.init(ignore_reinit_error=True)

# 2. 定义有状态 Actor 类(用 @ray.remote 装饰,标记为 Ray Actor)
@ray.remote
class CounterActor:
    """有状态计数器,模拟累积算法结果的有状态任务"""
    def __init__(self, name: str):
        # 初始化状态(如模型参数、累积指标、历史记录等)
        self.name = name
        self.count = 0  # 核心状态:计数器值
    
    def increment(self, step: int = 1) -> int:
        """增加计数器值,模拟算法更新状态"""
        time.sleep(0.5)  # 模拟状态更新耗时(如模型参数更新)
        self.count += step
        return self.count
    
    def get_count(self) -> tuple:
        """获取当前计数器状态,模拟获取算法累积结果"""
        return (self.name, self.count)

# 3. 创建 3 个 Actor 实例(并行运行,各自维护独立状态)
actor1 = CounterActor.remote("计数器1")
actor2 = CounterActor.remote("计数器2")
actor3 = CounterActor.remote("计数器3")

# 4. 并行调用 Actor 方法(更新状态,无阻塞提交任务)
print("=== 并行更新 Actor 状态 ===")
start_time = time.time()
# 每个 Actor 执行 5 次增量操作
remote_tasks = []
for _ in range(5):
    remote_tasks.append(actor1.increment.remote(2))  # 每次加 2
    remote_tasks.append(actor2.increment.remote(3))  # 每次加 3
    remote_tasks.append(actor3.increment.remote(1))  # 每次加 1

# 等待所有更新任务完成
ray.get(remote_tasks)

# 5. 获取所有 Actor 的最终状态
actor_results = ray.get([
    actor1.get_count.remote(),
    actor2.get_count.remote(),
    actor3.get_count.remote()
])

# 6. 输出结果和耗时
cost_time = time.time() - start_time
print(f"所有 Actor 最终状态:{actor_results}")
print(f"并行更新耗时:{cost_time:.2f} 秒")

# 7. 关闭 Ray 集群
ray.shutdown()
import ray
import time
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# 1. 初始化 Ray
ray.init(ignore_reinit_error=True)

# 2. 准备数据和训练模型(模拟已训练好的业务模型)
print("=== 准备数据和训练模型 ===")
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.8, random_state=42)

# 训练 RandomForest 模型
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
print(f"模型训练完成,测试数据量:{len(X_test)} 条\n")

# 3. 定义并行预测任务(拆分批量数据,独立预测)
@ray.remote
def batch_predict(model, X_batch: np.ndarray) -> np.ndarray:
    """批量数据预测,模拟机器学习算法推理"""
    time.sleep(0.5)  # 模拟推理耗时(如复杂模型预测)
    return model.predict(X_batch)

# 4. 串行预测(对比耗时)
print("=== 串行预测 ===")
start_time = time.time()
serial_predictions = model.predict(X_test)
serial_cost = time.time() - start_time
print(f"串行预测完成,结果长度:{len(serial_predictions)}")
print(f"串行耗时:{serial_cost:.2f} 秒\n")

# 5. 并行预测(拆分数据为多个批次,并行调度)
print("=== 并行预测 ===")
start_time = time.time()
# 5.1 拆分测试数据为 8 个批次(按需调整批次数量,适配CPU核心数)
batch_size = len(X_test) // 8
X_batches = [X_test[i*batch_size : (i+1)*batch_size] for i in range(8)]
# 补充最后一个批次的剩余数据
if len(X_test) % 8 != 0:
    X_batches.append(X_test[8*batch_size:])

# 5.2 提交并行预测任务
remote_tasks = [batch_predict.remote(model, batch) for batch in X_batches]

# 5.3 获取并合并预测结果
parallel_batch_results = ray.get(remote_tasks)
parallel_predictions = np.concatenate(parallel_batch_results)

# 5.4 输出结果和耗时
parallel_cost = time.time() - start_time
print(f"并行预测完成,结果长度:{len(parallel_predictions)}")
print(f"并行耗时:{parallel_cost:.2f} 秒")
print(f"串行与并行结果是否一致:{np.array_equal(serial_predictions, parallel_predictions)}")

# 6. 关闭 Ray 集群
ray.shutdown()

 

posted @ 2026-01-30 17:56  wangssd  阅读(143)  评论(0)    收藏  举报