Asio17-MultiThreadPool
Asio17-MultiThreadPool.md
区别
Asio-16也是一种多线程模式,那么他跟这一节有什么区别呢?
简单而言,上一节的结构是,n个线程,每个线程一个io_context在运行,相当于底层多个epoll.这一节呢,也是n个线程,但是只有1个线程有io_context在run,底层相当于一个epoll.
区别在于我们上一节的多线程,每一个线程在各自的io_context取出回调,交给上层的逻辑层去处理,每个线程只管自己的一部分,没有线程安全问题.
这一节的多线程呢,每一个线程都在一个io_context等待着取出回调,然后交给上层的逻辑层.但是多个线程同时在一个任务队列取回调,必然要引发线程安全问题的.那么怎么做呢?我们使用了asio提供的boost::asio::strand来实现串行的取出回调.
结构如下:

首先实现ThreadPool:
// .h
#include "Singleton.hpp"
#include <boost/asio.hpp>
class ThreadPool : public Singleton<ThreadPool> {
public:
friend class Singleton<ThreadPool>;
~ThreadPool();
boost::asio::io_context& GetContext();
void Stop();
private:
ThreadPool(int num = std::thread::hardware_concurrency());
boost::asio::io_context _io_context;
std::unique_ptr<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> _work_guard;
std::vector<std::thread> _threads;
};
// .cpp
#include "ThreadPool.h"
#include <boost/asio/io_context.hpp>
void ThreadPool::Stop()
{
_work_guard.reset();
_io_context.stop();
for (auto& thread : _threads) {
thread.join();
}
}
ThreadPool::ThreadPool(int num)
: _work_guard(new boost::asio::executor_work_guard<boost::asio::io_context::executor_type>(_io_context.get_executor()))
{
for (std::size_t i = 0; i < num; ++i) {
_threads.emplace_back([this]() {
_io_context.run();
});
}
}
boost::asio::io_context& ThreadPool::GetContext()
{
return _io_context;
}
ThreadPool::~ThreadPool()
{
Stop();
}
同时CSession添加一个变量:
strand<io_context::executor_type> _strand;
其实流程还是有点难以理解,到底怎么多线程的?
我个人理解,每个CSession都会有一个独立的变量strand,而这个stand类似一个队列,我们看到异步读写的回调函数是这样调用的:
boost::asio::async_read(_socket, boost::asio::buffer(_recv_msg_node->_data, data_len), boost::asio::bind_executor(_strand, std::bind(&CSession::HandleMsg, this, std::placeholders::_1, std::placeholders::_2, SharedSelf())));
实际上当有回调函数的时候,这个任务就会进入这个strand队列之中,而由于所有的CSession里面的strand初始化的时候都绑定了同一个io_context,而这个io_context跑在多个线程之中,那么asio就会分配线程去处理strand中的回调.
实际类似这样:
// 创建3个session
Session session1(io), session2(io), session3(io);
// 提交任务
session1.操作("A1"); // strand1队列: [A1]
session1.操作("A2"); // strand1队列: [A1] → [A2]
session2.操作("B1"); // strand2队列: [B1]
session2.操作("B2"); // strand2队列: [B1] → [B2]
session3.操作("C1"); // strand3队列: [C1]
session3.操作("C2"); // strand3队列: [C1] → [C2]
// 4个线程执行io_context.run(),可能的执行情况:
线程1: 取出strand1的A1执行 → 完成后取出strand1的A2执行
线程2: 取出strand2的B1执行 → 完成后取出strand2的B2执行 // 与线程1并行!
线程3: 取出strand3的C1执行 → 完成后取出strand3的C2执行 // 与其他并行!
线程4: 空闲或执行其他任务
也就是说,每个线程之内是串行的完成队列里的各个任务,而线程间是并行的!
根据测试呢,发现,ThreadServices版本比ThreadPool版本要快1秒左右(前者4s,后者5-6s,客户端100个线程,每个线程500个循环,收发包).
客户端代码
#include <boost/asio.hpp>
#include <chrono>
#include <iostream>
#include <json/json.h>
#include <json/reader.h>
#include <json/value.h>
#include <thread>
#include <vector>
using namespace std;
using boost::asio::ip::tcp;
constexpr int MAX_LENGTH = 1024 * 2;
constexpr int HEAD_TOTAL = 4; // 2字节id + 2字节len
/*---------- 主函数 ----------*/
int main()
{
auto start = chrono::high_resolution_clock::now();
vector<thread> thr;
for (int i = 0; i < 100; ++i) {
thr.emplace_back([i] {
try {
boost::asio::io_context ioc;
tcp::socket sock(ioc);
sock.connect(tcp::endpoint { {}, 9999 });
for (int seq = 0; seq < 500; ++seq) {
/* 1. 构造 JSON 正文 */
Json::Value root;
root["id"] = 1001;
root["msg"] = "hello world";
string body = root.toStyledString();
/* 2. 打包头部 + 正文 */
uint16_t msg_id = 1001;
uint16_t body_len = static_cast<uint16_t>(body.size());
unsigned char out[HEAD_TOTAL + body.size()];
memcpy(reinterpret_cast<char*>(out), &msg_id, 2);
memcpy(reinterpret_cast<char*>(out) + 2, &body_len, 2);
memcpy(reinterpret_cast<char*>(out) + 4, body.data(), body.size());
/* 3. 一次性写出 */
boost::asio::write(sock, boost::asio::buffer(reinterpret_cast<char*>(out), HEAD_TOTAL + body.size()));
/* 4. 收头部 */
unsigned char head[HEAD_TOTAL];
boost::asio::read(sock, boost::asio::buffer(head, HEAD_TOTAL));
uint16_t reply_id = 0, reply_len = 0;
memcpy(&reply_id, head, 2);
memcpy(&reply_len, head + 2, 2);
if (reply_len == 0 || reply_len > MAX_LENGTH - 1) {
cerr << "bad reply_len=" << reply_len << '\n';
break;
}
/* 5. 收正文 */
// vector<unsigned char> payload(reply_len);
char payload[MAX_LENGTH] = { 0 };
boost::asio::read(sock, boost::asio::buffer(payload, reply_len));
/* 6. 解析 JSON */
Json::Value reply;
string payload_str(payload, reply_len);
if (!Json::Reader {}.parse(payload_str, reply)) {
cerr << "json parse fail\n";
continue;
}
cout << "id=" << reply["id"]
<< " msg=" << reply["msg"].asString()
<< " thr=" << i
<< " seq=" << seq << '\n';
}
} catch (exception& e) {
cerr << "Exception: " << e.what() << '\n';
}
});
this_thread::sleep_for(chrono::milliseconds(10));
}
for (auto& t : thr)
t.join();
auto dur = chrono::duration_cast<chrono::seconds>(
chrono::high_resolution_clock::now() - start);
cout << "Time spent: " << dur.count() << " s\n";
return 0;
}
这里其实很不"现代",用了很多裸指针,裸数组.主要是为了兼容之前的c风格.如果cpp风格的话,可以使用vector完全替代裸数组.

浙公网安备 33010602011771号