1.LINUX类中,以pthread_mutex_t,sem_t,pthread_cond_t类的线程池,代码如下
#include <exception>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <iostream>
#include <wait.h>
#include <list>
#include <memory>
#include <vector>
class sem{
public:
sem(){
if(sem_init(&m_sem,0,0)!=0){
throw std::exception();
}//最开始设置为0
}
~sem(){
sem_destroy(&m_sem);
}
bool wait(){
return sem_wait(&m_sem)==0;
}
bool post(){
return sem_post(&m_sem)==0;
}
private:
sem_t m_sem;
};
class lock{
public:
lock(){//创建并且初始化
if(pthread_mutex_init(&m_mutex,NULL)!=0){
throw std::exception();
}
}
~lock(){//销毁锁
pthread_mutex_destroy(&m_mutex);
}
bool locker(){//上锁
return pthread_mutex_lock(&m_mutex)==0;
}
bool unlocker(){
return pthread_mutex_unlock(&m_mutex)==0;
}
private:
pthread_mutex_t m_mutex;
};
//封装条件变量
class cond{
public:
cond(){
if(pthread_mutex_init(&m_mutex,NULL)!=0){
throw std::exception();
}
if(pthread_cond_init(&m_cond,NULL)!=0){
throw std::exception();
}
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);//初始化问题 马上销毁
}
~ cond(){
pthread_mutex_destroy(&m_mutex);
pthread_cond_destroy(&m_cond);
}
bool wait(){
int ret=0;
pthread_mutex_lock(&m_mutex);
ret=pthread_cond_wait(&m_cond,&m_mutex);
pthread_mutex_unlock(&m_mutex);
return ret==0;
}
bool signal(){
return pthread_cond_signal(&m_cond)==0;
}
private:
pthread_cond_t m_cond;
pthread_mutex_t m_mutex;
};
//线程池
template<typename T>
threadpool<T>::threadpool(int thread_number,int max_re):m_thread_number(thread_number),m_max_request(max_re),m_stop(false),threads(nullptr)
{
if(thread_number<=0||max_re<=0){
throw std::exception();
}
threads=new pthread_t (thread_number);
if(!threads) throw std::exception();
for(int i=0;i<thread_number;i++){
std::cout<<"now create thread is "<<i<<std::endl;
if(pthread_create(threads+i,NULL,worker,this)!=0){//初始化为一个线程池,每一个都刚入work工作函数 创造失败就删了
delete [] threads;
throw std::exception();
}
if(pthread_detach(threads[i])){
delete [] threads;
throw std::exception();//开始分离工作
}
}
}
template<typename T>
bool threadpool<T>::append(T* requset){
m_queuelock.locker();//将请求加入请求列对时要进行加锁 这个注意
if(m_work.size()>m_max_request){
m_queuelock.unlocker();
return false;
}
m_work.push_back(requset);
m_queuelock.unlocker();
m_queuestat.post();//原子变量++,资源数就是请求数量 进行++
return true;
}
template<typename T>
void* threadpool<T>::worker(void* arg){
threadpool* pool=(threadpool*)arg;
pool->run();
return pool;
}
template<typename T>
void threadpool<T>::run(){
while(!m_stop){
m_queuestat.wait();// 线程进来拿取资源 资源数-1
m_queuelock.locker();//每当一个线程拿出一个请求时 都要上锁
if(m_work.empty()){
m_queuelock.unlocker();
continue;
}
T* request=m_work.front();
m_work.pop_front();
m_queuelock.unlocker();//拿出后解锁
if(!request){
continue;
}
request->process();//开始工作
}
}
template<typename T>
threadpool<T>::~threadpool(){
delete [] threads;
m_stop=false;
}
2.利用C++11 std;:thread类和原子变量进行封装的线程池类 代码如下:
class ThreadPool {
public:
ThreadPool(const ThreadPool&) = delete;
ThreadPool&operator=(const ThreadPool&) = delete;
static ThreadPool& instance() {
static ThreadPool ins;//所有该类都共享这一个变量
return ins;
}
//线程池一定是保证其是唯一的 所以用单例模式
using Task = std::packaged_task<void()>;
//任务队列都是存储的该类型
~ThreadPool() {
stop();
}
template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
if (stop_.load())
return std::future<RetType>{};
auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<RetType> ret = task->get_future();
{
std::lock_guard<std::mutex> cv_mt(cv_mt_);
tasks_.emplace([task] { (*task)(); });
}
cv_lock_.notify_one();
return ret;
}
int idleThreadCount() {
return thread_num_;
}
private:
ThreadPool(unsigned int num = 5)
: stop_(false) {
{
if (num < 1)
thread_num_ = 1;
else
thread_num_ = num;
}
start();//初始化线程池的数量
//启动线程池
}
void start() {
for (int i = 0; i < thread_num_; ++i) {
pool_.emplace_back([this]() {
while (!this->stop_.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(cv_mt_);
this->cv_lock_.wait(cv_mt, [this] {
return this->stop_.load() || !this->tasks_.empty();
});
if (this->tasks_.empty())
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
this->thread_num_--;
task();
this->thread_num_++;
}
});
}
}
void stop() {
stop_.store(true);
cv_lock_.notify_all();
for (auto& td : pool_) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}
private:
std::mutex cv_mt_;
std::condition_variable cv_lock_;
std::atomic_bool stop_;
std::atomic_int thread_num_;
std::queue<Task> tasks_;
std::vector<std::thread> pool_;
};