Asio21-BeastWebSocket
Asio21-BeastWebSocket
什么是WebSocket
首先来看HTTP1.0,默认短连接,仅仅客户端可以发送请求,每次发送请求都要携带完整的HTTP头部.也就是说,每次通信都需要三次握手四次挥手,效率低下.
HTTP1.1针对这个问题,默认是长连接.只需要一次连接和断开,中间可以多次连续通信.但是问题是,每次通信仍然需携带完整的HTTP头部,很是臃肿.同时实时性很差,需要客户端不断轮询,获取信息.
为了解决这个问题,WebSocket出现了.WebSocket不是HTTP1.0/1.1的扩展,而是基于TCP的同HTTP一样的独立的应用层协议.默认是持久化的长连接,连接建立之后不需要每次发送臃肿的头部,只需要几个字节的帧头,开销极小,实时性高.WebSocket是全双工,支持服务器和客户端双向通信,而非HTTP的"请求-响应模型",只允许客户端请求.
关于长连接这里,HTTP1.1也支持啊,WebSocket有什么优势呢?举个通俗的例子:HTTP的长连接,相当于两地打通了一条道路,每次通信都需要卡车来回运输传输信息.而且是"回合制的一问一答".而WebSocket呢,就相当于在这个打通的道路上,建立了一条双轨道的传输带,不仅能双向传输,还能随时随地的传输.从HTTP的一问一答升级成了自由对话.
如何使用呢,WebSocket虽然独立HTTP协议,但是没有独立的端口和连接方式,WebSocket需要先实现HTTP1.1的连接,然后升级为WebSocket连接.
如下客户端发送HTTP请求,携带升级头部Upgrade:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com
服务器同意升级:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Beast实现的WebSocket服务器
tree
├── Connection.cpp // 每个连接实例
├── Connection.h
├── ConnectionMgr.cpp // 管理连接
├── ConnectionMgr.h
├── main.cpp // 主入口
├── WebServer.cpp // 服务器
└── WebServer.h
// Connection.h
#pragma once
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <mutex>
#include <queue>
#include <string>
namespace net = boost::asio;
namespace beast = boost::beast;
class Connection : public std::enable_shared_from_this<Connection> {
public:
Connection(net::io_context& ioc);
std::string get_uuid() const;
net::ip::tcp::socket& get_socket();
void AsyncAccept();
void Start();
void AsyncSend(const std::string& msg);
void SendCallBack(const std::string& msg);
private:
std::unique_ptr<beast::websocket::stream<beast::tcp_stream>> _ws_ptr;
std::string _uuid;
net::io_context& _ioc;
beast::flat_buffer _buffer;
std::queue<std::string> _send_queue;
std::mutex _send_mutex;
};
// Connection.cpp
#include "Connection.h"
#include "ConnectionMgr.h"
#include <iostream>
Connection::Connection(net::io_context& ioc)
: _ioc(ioc)
, _ws_ptr(std::make_unique<beast::websocket::stream<beast::tcp_stream>>(net::make_strand(ioc)))
{
boost::uuids::random_generator gen;
_uuid = boost::uuids::to_string(gen());
}
std::string Connection::get_uuid() const
{
return _uuid;
}
net::ip::tcp::socket& Connection::get_socket()
{
return boost::beast::get_lowest_layer(*_ws_ptr).socket();
}
void Connection::AsyncAccept()
{
_ws_ptr->async_accept([self = shared_from_this()](boost::system::error_code ec) {
try {
if (ec) {
std::cerr << "Error in Connection::AsyncAccept: " << ec.message() << std::endl;
return;
}
std::cout << "Connection accepted: " << self->get_uuid() << "\t from :" << self->get_socket().remote_endpoint().address().to_string() << std::endl;
ConnectionMgr::get_instance().add_connection(self);
self->Start();
} catch (...) {
std::cerr << "Exception in Connection::AsyncAccept" << std::endl;
}
});
}
void Connection::Start()
{
_ws_ptr->async_read(_buffer, [self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_transferred) {
try {
if (ec) {
std::cerr << "Error in Connection::Start: " << ec.message() << std::endl;
ConnectionMgr::get_instance().remove_connection(self->get_uuid());
return;
}
self->_ws_ptr->text(self->_ws_ptr->got_text());
std::string msg = beast::buffers_to_string(self->_buffer.data());
std::cout << "Received message: " << msg << std::endl;
self->_buffer.consume(bytes_transferred);
for (std::size_t i = 0; i < msg.size(); i++) {
msg[i] = toupper(msg[i]);
}
// 异步发送
self->AsyncSend(std::move(msg));
// 继续监听读
self->Start();
} catch (...) {
std::cerr << "Exception in Connection::Start" << std::endl;
ConnectionMgr::get_instance().remove_connection(self->get_uuid());
}
});
}
void Connection::AsyncSend(const std::string& msg)
{
{
std::unique_lock<std::mutex> lock(_send_mutex);
_send_queue.push(msg);
if (_send_queue.size() > 1) {
return;
}
}
SendCallBack(msg);
}
void Connection::SendCallBack(const std::string& msg)
{
_ws_ptr->async_write(net::buffer(msg), [self = shared_from_this(),this](boost::system::error_code ec, std::size_t bytes_transferred) {
try {
if (ec) {
std::cerr << "Error in Connection::SendCallBack: " << ec.message() << std::endl;
ConnectionMgr::get_instance().remove_connection(self->get_uuid());
return;
}
{
std::unique_lock<std::mutex> lock(_send_mutex);
self->_send_queue.pop();
}
if (!self->_send_queue.empty()) {
self->SendCallBack(self->_send_queue.front());
}
} catch (...) {
std::cerr << "Exception in Connection::SendCallBack" << std::endl;
ConnectionMgr::get_instance().remove_connection(self->get_uuid());
}
});
}
// ConnectionMgr.h
#pragma once
#include "Connection.h"
#include <boost/unordered_map.hpp>
class ConnectionMgr {
public:
static ConnectionMgr& get_instance();
void add_connection(std::shared_ptr<Connection> conn);
void remove_connection(const std::string& id);
~ConnectionMgr();
private:
ConnectionMgr();
ConnectionMgr(const ConnectionMgr&) = delete;
ConnectionMgr& operator=(const ConnectionMgr&) = delete;
ConnectionMgr(ConnectionMgr&&) = delete;
ConnectionMgr& operator=(ConnectionMgr&&) = delete;
private:
boost::unordered_map<std::string, std::shared_ptr<Connection>> _connections;
};
// ConnectionMgr.cpp
#include "ConnectionMgr.h"
ConnectionMgr::~ConnectionMgr() = default;
ConnectionMgr::ConnectionMgr() = default;
ConnectionMgr& ConnectionMgr::get_instance()
{
static ConnectionMgr instance;
return instance;
}
void ConnectionMgr::add_connection(std::shared_ptr<Connection> conn)
{
_connections[conn->get_uuid()] = conn;
}
void ConnectionMgr::remove_connection(const std::string& id)
{
_connections.erase(id);
}
// WebServer.h
#pragma once
#include "ConnectionMgr.h"
class WebServer
{
public:
WebServer(const WebServer&) = delete;
WebServer& operator=(const WebServer&) = delete;
WebServer(net::io_context&io_context,unsigned short port);
void StartAccept();
private:
net::ip::tcp::acceptor _acceptor;
net::io_context& _io_context;
};
// WebServer.cpp
#include "WebServer.h"
#include <iostream>
#include <memory>
WebServer::WebServer(net::io_context& io_context, unsigned short port)
: _io_context(io_context)
, _acceptor(io_context, net::ip::tcp::endpoint(net::ip::tcp::v4(), port))
{
std::cout << "Server listening on port " << port << std::endl;
}
void WebServer::StartAccept()
{
auto conn_ptr = std::make_shared<Connection>(_io_context);
_acceptor.async_accept(conn_ptr->get_socket(), [this, conn_ptr](const std::error_code& ec) {
try {
if (!ec) {
conn_ptr->AsyncAccept();
} else {
std::cerr << "Error: " << ec.message() << std::endl;
}
StartAccept();
} catch (const std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}
});
}
// main.cpp
#include "WebServer.h"
int main()
{
net::io_context ioc;
WebServer ws(ioc, 8080);
ws.StartAccept();
ioc.run();
return 0;
}

浙公网安备 33010602011771号