基于 libhv 实现多路径 WebSocket 服务器:设计与实战
WebSocket作为全双工长连接协议,是实时数据推送场景的首选方案。本文将基于libhv库,详解如何设计一个支持多路径路由、连接自动管理、批量广播的高性能WebSocket服务器。
1、设计背景
在工业物联网、实时监控场景中,需要将不同类型的数据(声音分贝值、温度值、FFT数据)拆分Websocket推送路径,不同路径的客户端仅仅收到对应的数据。
2、架构设计
WsServerManager(单例)
核心职责:统筹WebSoket服务、路径路由
核心成员:m_ws_:WebSocketService; m_ws_channels_vec_:路径处理数据
核心接口:连接注册、数据推送
WsChannelHandler(路径处理)
核心职责:单路径连接增加,删除,广播
核心成员:m_channels_:弱指针Map
m_mutex_:互斥锁
核心接口:add_channels/remove_channels/broadcast
3、UML类图

4、代码详解
WsChannelHandler
添加通道:add_channel
inline void WsChannelHandler::add_channel(const WsChannelWeakPtr& channel)
{
std::lock_guard<std::mutex> locker(m_mutex_);
//弱指针 lock () 验证连接有效性,避免存储无效连接;
WebSocketChannelPtr channel_ptr = channel.lock();
if (channel_ptr) {
int channel_id = channel_ptr->id();
m_channels_[channel_id] = channel;//以连接 ID 为 Key 存储弱指针,便于后续快速查找 / 删除。
}
}
删除通道:remove_channel
inline void WsChannelHandler::remove_channel(int channel_id)
{
std::lock_guard<std::mutex> locker(m_mutex_);
if (m_channels_.erase(channel_id) > 0) {
std::cout<<"[WSS]Remove channel id {} ...,"<<channel_id<<std::endl;
}
}
数据广播:broadcast
inline void WsChannelHandler::broadcast(const void* data, int size)
{
if (data == nullptr || size <= 0) return;
std::vector<int> remove_key_vec;
std::lock_guard<std::mutex> locker(m_mutex_);
// 遍历所有连接发送数据
for (auto& pair : m_channels_) {
WebSocketChannelPtr channel_ptr = pair.second.lock();
if (channel_ptr) {
// 发送数据并检测发送结果
int ret = channel_ptr->send((const char*)data, size);
if (ret < 0) {
remove_key_vec.push_back(pair.first);
}
} else {
// 弱指针失效,标记删除
remove_key_vec.push_back(pair.first);
}
}
// 批量删除无效连接
for (int key : remove_key_vec) {
m_channels_.erase(key);
}
}
连接注册:register_ws_channel
将客户端连接按路径映射到对应的 WsChannelHandler,实现 “分路径管理”
int WsServerManager::register_ws_channel(const std::string& ws_path, const WebSocketChannelPtr& channel)
{
if (!channel) return -1;
int ret = 0;
WsChannelWeakPtr channel_weak = channel;
if ("/tempData" == ws_path) {
m_ws_channels_vec_[static_cast<size_t>(WsPathType::Temp_ADTA)].add_channel(channel_weak);
} else if ("/fftData" == ws_path) {
m_ws_channels_vec_[static_cast<size_t>(WsPathType::FFT_DATA)].add_channel(channel_weak);
}
// ... 其他路径映射
else {
ret = -1;
}
return ret;
}
事件回调:init_web_server
void WsServerManager::init_web_server()
{
// 连接建立回调:注册连接到对应路径
m_ws_.onopen = [](const WebSocketChannelPtr& channel, const HttpRequestPtr& req) {
WsServerManager::get_instance().register_ws_channel(req->path, channel);
};
// 连接断开回调:从所有路径移除该连接
m_ws_.onclose = [](const WebSocketChannelPtr& channel) {
int channel_id = channel->id();
auto& manager = WsServerManager::get_instance();
for (size_t i = 0; i < manager.m_ws_channels_vec_.size(); ++i) {
manager.m_ws_channels_vec_[i].remove_channel(channel_id);
}
};
// 消息接收回调(可扩展)
m_ws_.onmessage = [](const WebSocketChannelPtr& channel, const std::string& msg) {
// 可实现回声、指令处理等逻辑
};
}
绑定到 HTTP 服务器
libhv 的 HttpServer 内置 ws 成员,绑定 WebSocketService 后,可复用 HTTP 端口同时支持 HTTP 和 WebSocket 协议,无需额外监听端口。
void WsServerManager::register_web_server_to_http(hv::HttpServer& server)
{
server.ws = &(this->m_ws_);
}
主函数
int main() {
hv::HttpServer server;
// 初始化WebSocket管理器
WsServerManager::get_instance().init_websocket(server);
// 启动服务器
server.setPort(8080);
if (server.start() != 0) return -1;
// 循环向/fftData路径推送测试数据
char test_data[] = "hello fft clients";
while(1)
{
sleep(1);
WsServerManager::get_instance().send_ws_data(WsPathType::FFT_DATA, test_data, sizeof(test_data));
}
getchar();
server.stop();
return 0;
}
5、测试连接(Apipost)
- 打开 Apipost,新建 WebSocket 请求;
- 输入 URL:ws://localhost:8080/fftData;
- 点击 “连接”,控制台输出 “Add channel id X ...”,表示连接注册成功;
- 持续接收 “hello fft clients” 消息,每秒 1 条.
- 断开客户端连接:控制台输出 “Remove channel id X ...”,Map 中删除该连接;
- 多客户端连接:每个客户端独立分配 ID,广播时均能收到数据。
6、仓库地址
https://gitee.com/tangxiao1/industrial-ws-data-server.git

浙公网安备 33010602011771号