Asio21-BeastWebSocket

Asio21-BeastWebSocket

什么是WebSocket

首先来看HTTP1.0,默认短连接,仅仅客户端可以发送请求,每次发送请求都要携带完整的HTTP头部.也就是说,每次通信都需要三次握手四次挥手,效率低下.

HTTP1.1针对这个问题,默认是长连接.只需要一次连接和断开,中间可以多次连续通信.但是问题是,每次通信仍然需携带完整的HTTP头部,很是臃肿.同时实时性很差,需要客户端不断轮询,获取信息.

为了解决这个问题,WebSocket出现了.WebSocket不是HTTP1.0/1.1的扩展,而是基于TCP的同HTTP一样的独立的应用层协议.默认是持久化的长连接,连接建立之后不需要每次发送臃肿的头部,只需要几个字节的帧头,开销极小,实时性高.WebSocket是全双工,支持服务器和客户端双向通信,而非HTTP的"请求-响应模型",只允许客户端请求.

关于长连接这里,HTTP1.1也支持啊,WebSocket有什么优势呢?举个通俗的例子:HTTP的长连接,相当于两地打通了一条道路,每次通信都需要卡车来回运输传输信息.而且是"回合制的一问一答".而WebSocket呢,就相当于在这个打通的道路上,建立了一条双轨道的传输带,不仅能双向传输,还能随时随地的传输.从HTTP的一问一答升级成了自由对话.

如何使用呢,WebSocket虽然独立HTTP协议,但是没有独立的端口和连接方式,WebSocket需要先实现HTTP1.1的连接,然后升级为WebSocket连接.

如下客户端发送HTTP请求,携带升级头部Upgrade:

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com

服务器同意升级:

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

Beast实现的WebSocket服务器

tree
├── Connection.cpp // 每个连接实例
├── Connection.h
├── ConnectionMgr.cpp // 管理连接
├── ConnectionMgr.h
├── main.cpp // 主入口
├── WebServer.cpp // 服务器
└── WebServer.h
// Connection.h
#pragma once

#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <mutex>
#include <queue>
#include <string>

namespace net = boost::asio;
namespace beast = boost::beast;

class Connection : public std::enable_shared_from_this<Connection> {
public:
    Connection(net::io_context& ioc);
    std::string get_uuid() const;
    net::ip::tcp::socket& get_socket();
    void AsyncAccept();
    void Start();
    void AsyncSend(const std::string& msg);
    void SendCallBack(const std::string& msg);

private:
    std::unique_ptr<beast::websocket::stream<beast::tcp_stream>> _ws_ptr;
    std::string _uuid;
    net::io_context& _ioc;
    beast::flat_buffer _buffer;
    std::queue<std::string> _send_queue;
    std::mutex _send_mutex;
};


// Connection.cpp
#include "Connection.h"
#include "ConnectionMgr.h"
#include <iostream>

Connection::Connection(net::io_context& ioc)
    : _ioc(ioc)
    , _ws_ptr(std::make_unique<beast::websocket::stream<beast::tcp_stream>>(net::make_strand(ioc)))
{
    boost::uuids::random_generator gen;
    _uuid = boost::uuids::to_string(gen());
}

std::string Connection::get_uuid() const
{
    return _uuid;
}

net::ip::tcp::socket& Connection::get_socket()
{
    return boost::beast::get_lowest_layer(*_ws_ptr).socket();
}

void Connection::AsyncAccept()
{
    _ws_ptr->async_accept([self = shared_from_this()](boost::system::error_code ec) {
        try {
            if (ec) {
                std::cerr << "Error in Connection::AsyncAccept: " << ec.message() << std::endl;
                return;
            }
            std::cout << "Connection accepted: " << self->get_uuid() << "\t from :" << self->get_socket().remote_endpoint().address().to_string() << std::endl;
            ConnectionMgr::get_instance().add_connection(self);
            self->Start();
        } catch (...) {
            std::cerr << "Exception in Connection::AsyncAccept" << std::endl;
        }
    });
}

void Connection::Start()
{
    _ws_ptr->async_read(_buffer, [self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_transferred) {
        try {
            if (ec) {
                std::cerr << "Error in Connection::Start: " << ec.message() << std::endl;
                ConnectionMgr::get_instance().remove_connection(self->get_uuid());
                return;
            }
            self->_ws_ptr->text(self->_ws_ptr->got_text());
            std::string msg = beast::buffers_to_string(self->_buffer.data());
            std::cout << "Received message: " << msg << std::endl;
            self->_buffer.consume(bytes_transferred);
            for (std::size_t i = 0; i < msg.size(); i++) {
                msg[i] = toupper(msg[i]);
            }
            // 异步发送
            self->AsyncSend(std::move(msg));
            // 继续监听读
            self->Start();

        } catch (...) {
            std::cerr << "Exception in Connection::Start" << std::endl;
            ConnectionMgr::get_instance().remove_connection(self->get_uuid());
        }
    });
}

void Connection::AsyncSend(const std::string& msg)
{
    {
        std::unique_lock<std::mutex> lock(_send_mutex);
        _send_queue.push(msg);
        if (_send_queue.size() > 1) {
            return;
        }
    }
    SendCallBack(msg);
}

void Connection::SendCallBack(const std::string& msg)
{
    _ws_ptr->async_write(net::buffer(msg), [self = shared_from_this(),this](boost::system::error_code ec, std::size_t bytes_transferred) {
        try {
            if (ec) {
                std::cerr << "Error in Connection::SendCallBack: " << ec.message() << std::endl;
                ConnectionMgr::get_instance().remove_connection(self->get_uuid());
                return;
            }
            {
                std::unique_lock<std::mutex> lock(_send_mutex);
                self->_send_queue.pop();
            }
            if (!self->_send_queue.empty()) {
                self->SendCallBack(self->_send_queue.front());
            }
        } catch (...) {
            std::cerr << "Exception in Connection::SendCallBack" << std::endl;
            ConnectionMgr::get_instance().remove_connection(self->get_uuid());
        }
    });
}

// ConnectionMgr.h
#pragma once

#include "Connection.h"
#include <boost/unordered_map.hpp>

class ConnectionMgr {
public:
    static ConnectionMgr& get_instance();
    void add_connection(std::shared_ptr<Connection> conn);
    void remove_connection(const std::string& id);

    ~ConnectionMgr();

private:
    ConnectionMgr();
    ConnectionMgr(const ConnectionMgr&) = delete;
    ConnectionMgr& operator=(const ConnectionMgr&) = delete;
    ConnectionMgr(ConnectionMgr&&) = delete;
    ConnectionMgr& operator=(ConnectionMgr&&) = delete;

private:
    boost::unordered_map<std::string, std::shared_ptr<Connection>> _connections;
};

// ConnectionMgr.cpp
#include "ConnectionMgr.h"

ConnectionMgr::~ConnectionMgr() = default;

ConnectionMgr::ConnectionMgr() = default;

ConnectionMgr& ConnectionMgr::get_instance()
{
    static ConnectionMgr instance;
    return instance;
}

void ConnectionMgr::add_connection(std::shared_ptr<Connection> conn)
{
    _connections[conn->get_uuid()] = conn;
}

void ConnectionMgr::remove_connection(const std::string& id)
{
    _connections.erase(id);
}

// WebServer.h
#pragma once

#include "ConnectionMgr.h"


class WebServer
{
public:
    WebServer(const WebServer&) = delete;
    WebServer& operator=(const WebServer&) = delete;

    WebServer(net::io_context&io_context,unsigned short port);
    void StartAccept();

private:
    net::ip::tcp::acceptor _acceptor;
    net::io_context& _io_context;
    
};

// WebServer.cpp
#include "WebServer.h"
#include <iostream>
#include <memory>

WebServer::WebServer(net::io_context& io_context, unsigned short port)
    : _io_context(io_context)
    , _acceptor(io_context, net::ip::tcp::endpoint(net::ip::tcp::v4(), port))
{
    std::cout << "Server listening on port " << port << std::endl;
}

void WebServer::StartAccept()
{
    auto conn_ptr = std::make_shared<Connection>(_io_context);
    _acceptor.async_accept(conn_ptr->get_socket(), [this, conn_ptr](const std::error_code& ec) {
        try {
            if (!ec) {
                conn_ptr->AsyncAccept();
            } else {
                std::cerr << "Error: " << ec.message() << std::endl;
            }
            StartAccept();

        } catch (const std::exception& e) {
            std::cerr << "Exception: " << e.what() << std::endl;
        }
    });
}

// main.cpp

#include "WebServer.h"
int main()
{
    net::io_context ioc;
    WebServer ws(ioc, 8080);
    ws.StartAccept();
    ioc.run();
    return 0;
}
posted @ 2025-12-24 23:13  大胖熊哈  阅读(5)  评论(0)    收藏  举报