Asio17-MultiThreadPool

Asio17-MultiThreadPool.md

区别

Asio-16也是一种多线程模式,那么他跟这一节有什么区别呢?

简单而言,上一节的结构是,n个线程,每个线程一个io_context在运行,相当于底层多个epoll.这一节呢,也是n个线程,但是只有1个线程有io_context在run,底层相当于一个epoll.

区别在于我们上一节的多线程,每一个线程在各自的io_context取出回调,交给上层的逻辑层去处理,每个线程只管自己的一部分,没有线程安全问题.

这一节的多线程呢,每一个线程都在一个io_context等待着取出回调,然后交给上层的逻辑层.但是多个线程同时在一个任务队列取回调,必然要引发线程安全问题的.那么怎么做呢?我们使用了asio提供的boost::asio::strand来实现串行的取出回调.

结构如下:

image-20250925201539437

首先实现ThreadPool:

// .h
#include "Singleton.hpp"

#include <boost/asio.hpp>

class ThreadPool : public Singleton<ThreadPool> {
public:
    friend class Singleton<ThreadPool>;
    ~ThreadPool();
    boost::asio::io_context& GetContext();
    void Stop();

private:
    ThreadPool(int num = std::thread::hardware_concurrency());
    boost::asio::io_context _io_context;
    std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> _work_guard;
    std::vector<std::thread> _threads;
};

// .cpp
#include "ThreadPool.h"
#include <boost/asio/io_context.hpp>

void ThreadPool::Stop()
{
    _work_guard.reset();
    _io_context.stop();
    for (auto& thread : _threads) {
        thread.join();
    }
}

ThreadPool::ThreadPool(int num)
    : _work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_io_context.get_executor()))
{
    for (std::size_t i = 0; i < num; ++i) {
        _threads.emplace_back([this]() {
            _io_context.run();
        });
    }
}

boost::asio::io_context& ThreadPool::GetContext()
{
    return _io_context;
}

ThreadPool::~ThreadPool()
{
    Stop();
}

同时CSession添加一个变量:

strand<io_context::executor_type> _strand;

其实流程还是有点难以理解,到底怎么多线程的?

我个人理解,每个CSession都会有一个独立的变量strand,而这个stand类似一个队列,我们看到异步读写的回调函数是这样调用的:

boost::asio::async_read(_socket, boost::asio::buffer(_recv_msg_node->_data, data_len), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleMsg, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())));

实际上当有回调函数的时候,这个任务就会进入这个strand队列之中,而由于所有的CSession里面的strand初始化的时候都绑定了同一个io_context,而这个io_context跑在多个线程之中,那么asio就会分配线程去处理strand中的回调.

实际类似这样:

// 创建3个session
Session session1(io), session2(io), session3(io);

// 提交任务
session1.操作("A1");  // strand1队列: [A1]
session1.操作("A2");  // strand1队列: [A1] → [A2]

session2.操作("B1");  // strand2队列: [B1]  
session2.操作("B2");  // strand2队列: [B1] → [B2]

session3.操作("C1");  // strand3队列: [C1]
session3.操作("C2");  // strand3队列: [C1] → [C2]

// 4个线程执行io_context.run(),可能的执行情况:
线程1: 取出strand1的A1执行 → 完成后取出strand1的A2执行
线程2: 取出strand2的B1执行 → 完成后取出strand2的B2执行  // 与线程1并行!
线程3: 取出strand3的C1执行 → 完成后取出strand3的C2执行  // 与其他并行!
线程4: 空闲或执行其他任务

也就是说,每个线程之内是串行的完成队列里的各个任务,而线程间是并行的!

根据测试呢,发现,ThreadServices版本比ThreadPool版本要快1秒左右(前者4s,后者5-6s,客户端100个线程,每个线程500个循环,收发包).

客户端代码

#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
#include <json/json.h>
#include <json/reader.h>
#include <json/value.h>
#include <thread>
#include <vector>

using namespace std;
using boost::asio::ip::tcp;

constexpr int MAX_LENGTH = 1024 * 2;
constexpr int HEAD_TOTAL = 4; // 2字节id + 2字节len

/*---------- 主函数 ----------*/
int main()
{
    auto start = chrono::high_resolution_clock::now();

    vector<thread> thr;
    for (int i = 0; i < 100; ++i) {
        thr.emplace_back([i] {
            try {
                boost::asio::io_context ioc;
                tcp::socket sock(ioc);
                sock.connect(tcp::endpoint { {}, 9999 });

                for (int seq = 0; seq < 500; ++seq) {
                    /* 1. 构造 JSON 正文 */
                    Json::Value root;
                    root["id"] = 1001;
                    root["msg"] = "hello world";
                    string body = root.toStyledString();

                    /* 2. 打包头部 + 正文 */
                    uint16_t msg_id = 1001;
                    uint16_t body_len = static_cast<uint16_t>(body.size());

                    unsigned char out[HEAD_TOTAL + body.size()];
                    memcpy(reinterpret_cast<char*>(out), &msg_id, 2);
                    memcpy(reinterpret_cast<char*>(out) + 2, &body_len, 2);
                    memcpy(reinterpret_cast<char*>(out) + 4, body.data(), body.size());

                    /* 3. 一次性写出 */
                    boost::asio::write(sock, boost::asio::buffer(reinterpret_cast<char*>(out), HEAD_TOTAL + body.size()));

                    /* 4. 收头部 */
                    unsigned char head[HEAD_TOTAL];
                    boost::asio::read(sock, boost::asio::buffer(head, HEAD_TOTAL));

                    uint16_t reply_id = 0, reply_len = 0;
                    memcpy(&reply_id, head, 2);
                    memcpy(&reply_len, head + 2, 2);

                    if (reply_len == 0 || reply_len > MAX_LENGTH - 1) {
                        cerr << "bad reply_len=" << reply_len << '\n';
                        break;
                    }

                    /* 5. 收正文 */
                    // vector<unsigned char> payload(reply_len);
                    char payload[MAX_LENGTH] = { 0 };
                    boost::asio::read(sock, boost::asio::buffer(payload, reply_len));

                    /* 6. 解析 JSON */
                    Json::Value reply;
                    string payload_str(payload, reply_len);
                    if (!Json::Reader {}.parse(payload_str, reply)) {
                        cerr << "json parse fail\n";
                        continue;
                    }

                    cout << "id=" << reply["id"]
                         << " msg=" << reply["msg"].asString()
                         << " thr=" << i
                         << " seq=" << seq << '\n';
                }
            } catch (exception& e) {
                cerr << "Exception: " << e.what() << '\n';
            }
        });
        this_thread::sleep_for(chrono::milliseconds(10));
    }

    for (auto& t : thr)
        t.join();

    auto dur = chrono::duration_cast<chrono::seconds>(
        chrono::high_resolution_clock::now() - start);
    cout << "Time spent: " << dur.count() << " s\n";
    return 0;
}

这里其实很不"现代",用了很多裸指针,裸数组.主要是为了兼容之前的c风格.如果cpp风格的话,可以使用vector完全替代裸数组.

posted @ 2025-12-24 23:13  大胖熊哈  阅读(5)  评论(0)    收藏  举报