Asio16-MultiThreadServicesPool

Asio16-MultiThreadServicesPool.md

前面的设计,我们对asio的使用都是单线程模式,为了提升网络io并发处理的效率,这一次我们设计多线程模式下asio的使用方式。总体来说asio有两个多线程模型,第一个是启动多个线程,每个线程管理一个iocontext。第二种是只启动一个iocontext,被多个线程共享,后面的文章会对比两个模式的区别,这里先介绍第一种模式,多个线程,每个线程管理独立的iocontext服务。

image-20250925151546676

特点:

  • 每个io_context在不同的线程里面run(),所以同一个socket会被注册在同一个io_context里,它的回调函数也会被单独的一个线程回调,那么对于同一个socket,他的回调函数每次触发都是在同一个线程里,就不会有线程安全问题,网络io层面上的并发是线程安全的。
  • 因为我们设置的池子的大小为cpu的核心数,如果为16,那么每次GetService返回的io_context都是从下标0-15,当下标为16又模除回0,所以第一个创建的CSession,和第17个创建的CSession使用的都是用一个io_context.也就是说,这两个CSession运行在同一个线程.所以如果两个socket对应的上层逻辑处理,如果有交互或者访问共享区,会存在线程安全问题。
  • 多线程相比单线程,极大的提高了并发能力,因为单线程仅有一个io_context服务用来监听读写事件,就绪后回调函数在一个线程里串行调用, 如果一个回调函数的调用时间较长肯定会影响后续的函数调用,毕竟是穿行调用。而采用多线程方式,可以在一定程度上减少前一个逻辑调用影响下一个调用的情况,比如两个socket被部署到不同的iocontext上,但是当两个socket部署到同一个iocontext上时仍然存在调用时间影响的问题。不过我们已经通过逻辑队列的方式将网络线程和逻辑线程解耦合了,不会出现前一个调用时间影响下一个回调触发的问题。

池子的实现:

// .h
#include "Singleton.hpp"
#include <boost/asio.hpp>
#include <boost/asio/executor_work_guard.hpp> // <-- add this line

#include <atomic>
#include <memory>
#include <thread>
#include <vector>

class ServicesPool : public Singleton<ServicesPool> {
public:
    friend class Singleton<ServicesPool>;
    ~ServicesPool();
    ServicesPool(const ServicesPool&) = delete;
    ServicesPool& operator=(const ServicesPool&) = delete;

    using Services = boost::asio::io_context;
    using Work = boost::asio::executor_work_guard<Services::executor_type>;
    using WorkPtr = std::unique_ptr<Work>;

    boost::asio::io_context& GetService();
    void Stop();

private:
    ServicesPool(std::size_t pool_size = std::thread::hardware_concurrency());

private:
    std::vector<std::jthread> _threads;
    std::vector<Services> _services;
    std::vector<WorkPtr> _works;

    std::atomic<std::size_t> _next_io_context;
};


// .cpp
#include "ServicesPool.h"
#include <iostream>

ServicesPool::ServicesPool(std::size_t pool_size)
    : _services(pool_size)

    , _next_io_context(0)
{
    _works.reserve(pool_size);
    for (std::size_t i = 0; i < pool_size; ++i) {
        _works.emplace_back(std::make_unique<Work>(boost::asio::make_work_guard(_services[i])));
        // _works.emplace_back(std::make_unique<Work>(_services[i].get_executor()));
    }

    for (std::size_t i = 0; i < pool_size; ++i) {
        _threads.emplace_back([this, i]() {
            _services[i].run();
        });
    }
}

ServicesPool::~ServicesPool()
{
    std::cout << "ServicesPool::~ServicesPool()" << std::endl;
}

boost::asio::io_context& ServicesPool::GetService()
{
    std::size_t index = _next_io_context.fetch_add(1, std::memory_order_relaxed);
    return _services[index % _services.size()];
}

void ServicesPool::Stop()
{
    for (auto& work : _works) {
        work.reset();
    }
}

main.cpp:

try {
        auto pool = ServicesPool::GetInstance();
        boost::asio::io_context io_context;
        boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
        signals.async_wait([&](const boost::system::error_code& error, int signal_number) {
            io_context.stop();
        });

        CServer s(io_context, 9999);
        io_context.run();

    } catch (std::exception& e) {
        std::cerr << "Exception: " << e.what() << endl;
    }

主要的结构就是,main里面创建一个io_context,这个io_context传入CServer s中用于异步的检测连接到来,accept,同时我们注册了信号,用于优雅地退出.而在CServer触发accept的回调的时候,取出线程池中的io_context,用于对应的CSession通信.

posted @ 2025-12-24 23:13  大胖熊哈  阅读(7)  评论(0)    收藏  举报