线程池类的封装

1.LINUX类中,以pthread_mutex_t,sem_t,pthread_cond_t类的线程池,代码如下

#include <exception>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
#include <iostream>
#include <wait.h>
#include <list>
#include <memory>
#include <vector>

class sem{
    public:
    sem(){
        if(sem_init(&m_sem,0,0)!=0){
            throw std::exception();
        }//最开始设置为0
    }
    ~sem(){
        sem_destroy(&m_sem);
    }
    bool wait(){
        return sem_wait(&m_sem)==0;
    }
    bool post(){
        return sem_post(&m_sem)==0;
    }
private:
sem_t m_sem;
};

class lock{
    public:
    lock(){//创建并且初始化
        if(pthread_mutex_init(&m_mutex,NULL)!=0){
            throw std::exception();
        }
    }
    ~lock(){//销毁锁
        pthread_mutex_destroy(&m_mutex);
    }
    bool locker(){//上锁
        return pthread_mutex_lock(&m_mutex)==0;
    }
    bool unlocker(){
        return pthread_mutex_unlock(&m_mutex)==0;
    }
    private: 
    pthread_mutex_t m_mutex;
};

//封装条件变量
class cond{
 public:
 cond(){
     if(pthread_mutex_init(&m_mutex,NULL)!=0){
            throw std::exception();
        }
    if(pthread_cond_init(&m_cond,NULL)!=0){
         throw std::exception();
    }
    pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_cond);//初始化问题 马上销毁
 }
 ~ cond(){
     pthread_mutex_destroy(&m_mutex);
    pthread_cond_destroy(&m_cond);
 }

 bool wait(){
    int ret=0;
    pthread_mutex_lock(&m_mutex);
    ret=pthread_cond_wait(&m_cond,&m_mutex);
    pthread_mutex_unlock(&m_mutex);
    return ret==0;
 }
 bool signal(){
    return pthread_cond_signal(&m_cond)==0;
 }
 private:
 pthread_cond_t m_cond;
 pthread_mutex_t m_mutex;
};


//线程池
template<typename T>
threadpool<T>::threadpool(int thread_number,int max_re):m_thread_number(thread_number),m_max_request(max_re),m_stop(false),threads(nullptr)
{
if(thread_number<=0||max_re<=0){
    throw std::exception();
}
threads=new pthread_t (thread_number);
if(!threads) throw std::exception();

for(int i=0;i<thread_number;i++){
    std::cout<<"now create thread is  "<<i<<std::endl;
    if(pthread_create(threads+i,NULL,worker,this)!=0){//初始化为一个线程池,每一个都刚入work工作函数 创造失败就删了
        delete [] threads;
         throw std::exception();
    }
    if(pthread_detach(threads[i])){
        delete [] threads;
         throw std::exception();//开始分离工作
    }
}
}


template<typename T>
bool threadpool<T>::append(T* requset){
    m_queuelock.locker();//将请求加入请求列对时要进行加锁 这个注意
    if(m_work.size()>m_max_request){
        m_queuelock.unlocker();
        return false;
    }
    m_work.push_back(requset);
    m_queuelock.unlocker();
   m_queuestat.post();//原子变量++,资源数就是请求数量 进行++
   return true;
}


template<typename T>
void* threadpool<T>::worker(void* arg){
    threadpool* pool=(threadpool*)arg;
    pool->run();
    return pool;
}

template<typename T>
void threadpool<T>::run(){
    while(!m_stop){
        m_queuestat.wait();// 线程进来拿取资源 资源数-1
        m_queuelock.locker();//每当一个线程拿出一个请求时 都要上锁 
        if(m_work.empty()){
            m_queuelock.unlocker();
            continue;
        }
        T* request=m_work.front();
        m_work.pop_front();
        m_queuelock.unlocker();//拿出后解锁
        if(!request){
            continue;
        }
        request->process();//开始工作
    }
}


template<typename T>
threadpool<T>::~threadpool(){
    delete [] threads;
    m_stop=false;
}

2.利用C++11 std;:thread类和原子变量进行封装的线程池类 代码如下:

class ThreadPool  {
public:
    ThreadPool(const ThreadPool&) = delete;
    ThreadPool&operator=(const ThreadPool&) = delete;

    static ThreadPool& instance() {
        static ThreadPool ins;//所有该类都共享这一个变量
        return ins;
    }
//线程池一定是保证其是唯一的 所以用单例模式
    using Task = std::packaged_task<void()>;
//任务队列都是存储的该类型


    ~ThreadPool() {
        stop();
    }

    template <class F, class... Args>
    auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
        using RetType = decltype(f(args...));
        if (stop_.load())
            return std::future<RetType>{};

        auto task = std::make_shared<std::packaged_task<RetType()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...));

        std::future<RetType> ret = task->get_future();
        {
            std::lock_guard<std::mutex> cv_mt(cv_mt_);
            tasks_.emplace([task] { (*task)(); });
        }
        cv_lock_.notify_one();
        return ret;
    }

    int idleThreadCount() {
        return thread_num_;
    }

private:
    ThreadPool(unsigned int num = 5)
        : stop_(false) {
            {
                if (num < 1)
                    thread_num_ = 1;
                else
                    thread_num_ = num;
            }
            start();//初始化线程池的数量
                   //启动线程池
    }
    void start() {
        for (int i = 0; i < thread_num_; ++i) {
            pool_.emplace_back([this]() {
                while (!this->stop_.load()) {
                    Task task;
                    {
                        std::unique_lock<std::mutex> cv_mt(cv_mt_);
                        this->cv_lock_.wait(cv_mt, [this] {
                            return this->stop_.load() || !this->tasks_.empty();
                        });
                        if (this->tasks_.empty())
                            return;

                        task = std::move(this->tasks_.front());
                        this->tasks_.pop();
                    }
                    this->thread_num_--;
                    task();
                    this->thread_num_++;
                }
            });
        }
    }
    void stop() {
        stop_.store(true);
        cv_lock_.notify_all();
        for (auto& td : pool_) {
            if (td.joinable()) {
                std::cout << "join thread " << td.get_id() << std::endl;
                td.join();
            }
        }
    }

private:
    std::mutex               cv_mt_;
    std::condition_variable  cv_lock_;
    std::atomic_bool         stop_;
    std::atomic_int          thread_num_;
    std::queue<Task>         tasks_;
    std::vector<std::thread> pool_;
};

相关推荐

  1. 线封装

    2024-07-15 13:14:03       26 阅读
  2. 线相关学习

    2024-07-15 13:14:03       36 阅读
  3. python sqlite3 线封装

    2024-07-15 13:14:03       46 阅读
  4. android 线管理工具

    2024-07-15 13:14:03       47 阅读
  5. Qt线封装FFmpeg播放器

    2024-07-15 13:14:03       42 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-15 13:14:03       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-15 13:14:03       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-15 13:14:03       57 阅读
  4. Python语言-面向对象

    2024-07-15 13:14:03       68 阅读

热门阅读

  1. pycharm打包exe

    2024-07-15 13:14:03       25 阅读
  2. ffmpeg三大命令行工具

    2024-07-15 13:14:03       20 阅读
  3. 【Python 项目】照片马赛克 - 2

    2024-07-15 13:14:03       26 阅读
  4. [k8s源码]2.CURD deployment

    2024-07-15 13:14:03       23 阅读
  5. 善的忽视、恶的纵容

    2024-07-15 13:14:03       24 阅读