线程池
定位
资源管理型基础组件
高并发调度核心组件
池化技术的核心成员
核心原理
本质是生产者-消费者模型。
1.其他线程:不断往任务队列添加任务(生产者)
2.工作线程:不断从队列取出任务并执行(消费者)
3.条件变量:实现线程的休眠与唤醒,避免空轮询。
声明
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <memory>
#include <functional>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
class ThreadPool {
public:
typedef std::shared_ptr<ThreadPool> ptr;
ThreadPool(int num);
~ThreadPool();
ThreadPool(ThreadPool &) = delete; // 禁止左值拷贝构造
ThreadPool(ThreadPool &&) = delete; // 禁止右值拷贝构造
ThreadPool &operator=(ThreadPool &) = delete; // 禁止左值拷贝赋值
ThreadPool &operator=(ThreadPool &&) = delete; // 禁止右值拷贝赋值
void pushTask(std::function<void()> task); // 添加任务
void waitAll(); // 等待所有任务执行完毕
private:
void entrance(); // 线程入口函数
std::vector<std::thread> m_threadList; // 线程数组
std::queue<std::function<void()>> m_taskList; // FIFO任务队列
std::condition_variable m_cvForQue; // 给任务队列的条件变量
std::condition_variable m_cvForCount; // 给任务计数器的条件变量
std::mutex m_mtxForQue; // 给任务队列的锁
std::mutex m_mtxForCount; // 给任务队列的锁
uint64 m_taskCount; // 正在执行的任务数
bool m_isRunning; // 运行标识
};
#endif
实现
#include "ThreadPool.h"
ThreadPool::ThreadPool(int num)
: m_isRunning(true), m_taskCount(0) {
for (int i = 0; i < num; ++i) {
m_threadList.emplace_back([this]() { this->entrance(); });
}
}
ThreadPool::~ThreadPool() {
m_cvForQue.notify_all();
m_cvForCount.notify_all();
for (auto &thread : m_threadList) {
if (thread.joinable()) {
thread.join();
}
}
}
// 添加任务
void ThreadPool::pushTask(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(m_mtxForQue);
m_taskList.push(std::move(task));
m_cvForQue.notify_one();
}
}
// 等待所有任务执行完毕
void ThreadPool::waitAll() {
std::unique_lock<std::mutex> lock(m_mtxForCount);
m_cvForCount.wait(lock, [this]() { return m_taskList.empty() && m_taskCount == 0; });
}
// 线程入口函数
void ThreadPool::entrance() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(m_mtxForQue);
m_cvForQue.wait(lock, [this]() { return m_isRunning && !m_taskList.empty(); });
if (!m_isRunning) { return; }
if (m_taskList.empty()) { continue; }
task = std::move(m_taskList.front());
m_taskList.pop();
}
{
std::unique_lock<std::mutex> lock(m_mtxForCount);
++m_taskCount;
}
task();
{
std::unique_lock<std::mutex> lock(m_mtxForCount);
--m_taskCount;
m_cvForCount.notify_all();
}
}
}
使用示例
#include "ThreadPool.h"
uint64_t count = 0;
std::mutex mtx;
void task() {
std::unique_lock<std::mutex> lock(mtx);
++count;
}
class Example {
public:
static void task() {
std::unique_lock<std::mutex> lock(mtx);
++count;
}
};
int main() {
// 创建线程池
ThreadPool::ptr threadPool = std::make_shared<ThreadPool>(10);
// 添加任务-普通函数
threadPool->pushTask(task);
// 添加任务-类静态成员函数
threadPool->pushTask(&Example::task);
// 添加任务-lambda表达式
threadPool->pushTask([]() {
std::unique_lock<std::mutex>lock(mtx);
++count;
});
// 阻塞等待所有任务执行完毕
threadPool->waitAll();
return 0;
}

浙公网安备 33010602011771号