线程安全队列
#ifndef SAFEQUEUE_H
#define SAFEQUEUE_H
#include<mutex>
#include<queue>
template<typename T>
class SafeQueue{
public:
bool push(T& elm)
{
std::lock_guard<std::mutex> lock(_mutex);
_queue.push(elm);
return true;
}
bool pop(T& elm)
{
std::lock_guard<std::mutex> lock(_mutex);
if(_queue.empty())
return false;
elm=_queue.front();
_queue.pop();
return true;
}
size_t size() const
{
return _queue.size();
}
bool empty() const
{
return _queue.empty();
}
private:
std::queue<T> _queue;
std::mutex _mutex;
};
#endif
线程池
任务提交函数(生产者)
//任务提交函数
template<typename F,typename... Args>
auto submit(F&& f,Args&&... args) -> std::future<decltype(f(args...))>
{
using retType=decltype(f(args...));
std::function<retType()> function=std::bind(std::forward<F>(f),std::forward<Args>(args)...);
auto task_ptr=std::make_shared<std::packaged_task<retType()>>(function);
std::future<retType> future=task_ptr->get_future();
Task wrapper_ptr=[task_ptr](){
(*task_ptr)();
};
_taskQ.push(wrapper_ptr);
_condLock.notify_one();
return future;
}
线程工作函数(消费者)
//线程工作函数
void threadWork(int id){
while(!_shutdown){
std::unique_lock<std::mutex> lock(_mutexPool);
if(_taskQ.empty()){
_condLock.wait(lock);
}
Task task;
bool is_task=_taskQ.pop(std::ref(task));
if(is_task)
task();
}
}
#include "safeQueue.h"
#include<vector>
#include<mutex>
#include<thread>
#include<future>
#include<utility>
#include<memory>
#include<functional>
#include<atomic>
#include<iostream>
class ThreadPool
{
public:
//初始化线程池大小,并设置线程池的关闭标记为false
ThreadPool(size_t size=std::thread::hardware_concurrency())
:_threads(size),_shutdown(false)
{}
//析构函数,关闭线程池
~ThreadPool(){
shutdown();
}
//初始化线程池
void init()
{
std::lock_guard<std::mutex> lock(_mutexPool);
for(int i=0;i<_threads.size();++i)
_threads[i]=std::thread(&ThreadPool::threadWork,this,i);
}
//任务提交函数
template<typename F,typename... Args>
auto submit(F&& f,Args&&... args) -> std::future<decltype(f(args...))>
{
using retType=decltype(f(args...));
std::function<retType()> function=std::bind(std::forward<F>(f),std::forward<Args>(args)...);
auto task_ptr=std::make_shared<std::packaged_task<retType()>>(function);
std::future<retType> future=task_ptr->get_future();
Task wrapper_ptr=[task_ptr](){
(*task_ptr)();
};
_taskQ.push(wrapper_ptr);
_condLock.notify_one();
return future;
}
//线程工作函数
void threadWork(int id){
while(!_shutdown){
std::unique_lock<std::mutex> lock(_mutexPool);
if(_taskQ.empty()){
_condLock.wait(lock);
}
Task task;
bool is_task=_taskQ.pop(std::ref(task));
if(is_task)
task();
}
}
//关闭线程池
bool shutdown()
{
_shutdown=true;
_condLock.notify_all();
for(auto& thread:_threads)
{
if(thread.joinable())
{
std::cout<<"join thread "<<std::this_thread::get_id()<<std::endl;
thread.join();
}
}
}
private:
using Task=std::function<void()>;
std::vector<std::thread> _threads;
SafeQueue<Task> _taskQ;
std::mutex _mutexPool;
std::condition_variable _condLock;
std::atomic<bool> _shutdown;
};
测试代码
#include"threadPool.h"
#include <iostream>
#include <random>
std::random_device rd; //真实随机数产生器
std::mt19937 mt(rd()); //生成计算随机数mt;
std::uniform_int_distribution<int> dist(-1000, 1000);//生成-1000到1000之间的离散均匀分部数
auto rnd = std::bind(dist, mt);
//设置线程睡眠时间
void simulate_hard_computation() {
std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
}
// 添加两个数字的简单函数并打印结果
void multiply(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
}
//添加并输出结果
void multiply_output(int & out, const int a, const int b) {
simulate_hard_computation();
out = a * b;
std::cout << a << " * " << b << " = " << out << std::endl;
}
// 结果返回
int multiply_return(const int a, const int b) {
simulate_hard_computation();
const int res = a * b;
std::cout << a << " * " << b << " = " << res << std::endl;
return res;
}
void example() {
// 创建3个线程的线程池
ThreadPool pool(3);
// 初始化线程池
pool.init();
// 提交乘法操作,总共30个
for (int i = 1; i < 3; ++i) {
for (int j = 1; j < 10; ++j) {
pool.submit(multiply, i, j);
}
}
// 使用ref传递的输出参数提交函数
int output_ref;
auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
// 等待乘法输出完成
future1.get();
std::cout << "Last operation result is equals to " << output_ref << std::endl;
// 使用return参数提交函数
auto future2 = pool.submit(multiply_return, 5, 3);
// 等待乘法输出完成
int res = future2.get();
std::cout << "Last operation result is equals to " << res << std::endl;
//关闭线程池
// pool.shutdown();
}
int main()
{
example();
return 0;
}
参考
C++ 线程池 - 敬方的个人博客 | JF Blog (wangpengcheng.github.io)
基于C++11的线程池(threadpool),简洁且可以带任意多的参数 - _Ong - 博客园 (cnblogs.com)