线程池

定位

资源管理型基础组件
高并发调度核心组件
池化技术的核心成员

核心原理

本质是生产者-消费者模型
1.其他线程:不断往任务队列添加任务(生产者)
2.工作线程:不断从队列取出任务并执行(消费者)
3.条件变量:实现线程的休眠与唤醒,避免空轮询。

声明

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <memory>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>

class ThreadPool {
public:
    typedef std::shared_ptr<ThreadPool> ptr;

    ThreadPool(int num);
    ~ThreadPool();

    ThreadPool(ThreadPool &)                        = delete;           // 禁止左值拷贝构造
    ThreadPool(ThreadPool &&)                       = delete;           // 禁止右值拷贝构造
    ThreadPool &operator=(ThreadPool &)             = delete;           // 禁止左值拷贝赋值
    ThreadPool &operator=(ThreadPool &&)            = delete;           // 禁止右值拷贝赋值

    void                    pushTask(std::function<void()> task);       // 添加任务
    void                    waitAll();                                  // 等待所有任务执行完毕

private:
    void                    entrance();                                 // 线程入口函数

    std::vector<std::thread>                m_threadList;               // 线程数组
    std::queue<std::function<void()>>       m_taskList;                 // FIFO任务队列
    std::condition_variable                 m_cvForQue;                 // 给任务队列的条件变量
    std::condition_variable                 m_cvForCount;               // 给任务计数器的条件变量
    std::mutex                              m_mtxForQue;                // 给任务队列的锁
    std::mutex                              m_mtxForCount;              // 给任务队列的锁
    uint64                                  m_taskCount;                // 正在执行的任务数
    bool                                    m_isRunning;                // 运行标识
};

#endif

实现

#include "ThreadPool.h"

ThreadPool::ThreadPool(int num) 
: m_isRunning(true), m_taskCount(0) {
    for (int i = 0; i < num; ++i) {
        m_threadList.emplace_back([this]() { this->entrance(); });
    }
}

ThreadPool::~ThreadPool() {
    m_cvForQue.notify_all();
    m_cvForCount.notify_all();
    for (auto &thread : m_threadList) {
        if (thread.joinable()) {
            thread.join();
        }
    }
}
// 添加任务
void ThreadPool::pushTask(std::function<void()> task) {
    {
        std::unique_lock<std::mutex> lock(m_mtxForQue);
        m_taskList.push(std::move(task));
        m_cvForQue.notify_one();
    }
}

// 等待所有任务执行完毕
void ThreadPool::waitAll() {
    std::unique_lock<std::mutex> lock(m_mtxForCount);
    m_cvForCount.wait(lock, [this]() { return m_taskList.empty() && m_taskCount == 0; });
}

// 线程入口函数
void ThreadPool::entrance() {
    while (true) {
        std::function<void()> task;
        {
            std::unique_lock<std::mutex> lock(m_mtxForQue);
            m_cvForQue.wait(lock, [this]() { return m_isRunning && !m_taskList.empty(); });
            if (!m_isRunning)           { return; }
            if (m_taskList.empty())     { continue; }
            task = std::move(m_taskList.front());
            m_taskList.pop();
        }
        {
            std::unique_lock<std::mutex> lock(m_mtxForCount);
            ++m_taskCount;
        }
        task();
        {
            std::unique_lock<std::mutex> lock(m_mtxForCount);
            --m_taskCount;
            m_cvForCount.notify_all();
        }
    }
}

使用示例

#include "ThreadPool.h"

uint64_t count = 0;
std::mutex mtx;

void task() {
    std::unique_lock<std::mutex> lock(mtx);
    ++count;
}

class Example {
public:
    static void task() {
        std::unique_lock<std::mutex> lock(mtx);
        ++count;
    }
};

int main() {
    // 创建线程池
    ThreadPool::ptr threadPool = std::make_shared<ThreadPool>(10);
    // 添加任务-普通函数
    threadPool->pushTask(task);
    // 添加任务-类静态成员函数
    threadPool->pushTask(&Example::task);
    // 添加任务-lambda表达式
    threadPool->pushTask([]() {
        std::unique_lock<std::mutex>lock(mtx);
        ++count; 
    });
    // 阻塞等待所有任务执行完毕
    threadPool->waitAll();
    return 0;
}
posted @ 2026-03-22 16:24  SINGLERS  阅读(2)  评论(0)    收藏  举报