Linux:通过线程互斥同步实现基于BlockingQueue的生产消费者模型

一、总体调度:主函数Main.cc

#include "BlockQueue.hpp"
#include "Thread.hpp"
#include <string>
#include <vector>
#include <functional>
#include <unistd.h>
#include <ctime>

using namespace ThreadModule;
int a=10;
using Task = std::function<void()>;
using blockqueue_t=BlockQueue<Task>;

void PrintHello()
{
    std::cout<<"hello world"<<std::endl;
}
void Consumer(blockqueue_t &bq)
{
    while(true)
    {
        //1.从blockqueue取下来任务
        Task t;
        bq.Pop(&t);//pop完成后t中会保存任务
        //2.处理这个任务
        t();//消费者私有
    }
}
void Productor(blockqueue_t &bq)
{
    srand(time(nullptr)^pthread_self());
    int cnt=10;
    while(true)
    {
        sleep(1);
        Task t=PrintHello;
        bq.Enqueue(t);
    }
}
//productor和consumer共用此函数
void StartComm(std::vector<Thread<blockqueue_t>> *threads,int num,blockqueue_t &bq,func_t<blockqueue_t> func)
{
    for(int i=0;i<num;i++)
    {
        std::string name="thread-"+std::to_string(i+1);
        threads->emplace_back(func,bq,name);
        threads->back().Start();
    }
}

//启动consumer
void StartConsumer(std::vector<Thread<blockqueue_t>> *thread,int num,blockqueue_t &bq)
{
    StartComm(thread,num,bq,Consumer);
}

//启动productor
void StartProductor(std::vector<Thread<blockqueue_t>> *thread,int num,blockqueue_t& bq)
{
    StartComm(thread,num,bq,Productor);
}
void WaitAllThread(std::vector<Thread<blockqueue_t>> &threads)
{
    for (auto &thread : threads)
    {
        thread.Join();
    }
}
int main()
{
    blockqueue_t *bq=new blockqueue_t(5);
    std::vector<Thread<blockqueue_t>> threads;

    StartConsumer(&threads,3,*bq);
    StartProductor(&threads,1,*bq);
    WaitAllThread(threads);

    return 0;
}

二、线程功能的封装:Thread.hpp

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    template<typename T>
    using func_t=std::function<void(T&)>;

    template<typename T>
    class Thread
    {
    public:
    void Excute()
    {
        _func(_data);
    }
    public:
    Thread(func_t<T> func,T&data,const std::string& name="none-name")
    :_func(func),_data(data),_threadname(name),_stop(true)
    {}
    static void* threadroutine(void* args)//static成员函数没有this
    {
        Thread<T> *self = static_cast<Thread<T> *>(args);
        self->Excute();
        return nullptr;
    }
    bool Start()
    {
        int n=pthread_create(&_tid,nullptr,threadroutine,this);//把this传给threadroutine让其完成调用
        if(!n)
        {
            _stop=false;
            return true;
        }
        else
        {
            return false;
        }
    }
    void Detach()
    {
        if(!_stop)
        {
            pthread_detach(_tid);
        }
    }
    void Join()
    {
        if(!_stop)
        {
            pthread_join(_tid,nullptr);
        }
    }
    std::string name()
    {
        return _threadname;
    }
    void Stop()
    {
        _stop = true;
    }
    ~Thread()
    {}
    private:
    pthread_t _tid;
    std::string _threadname;
    T& _data;//要传入所执行函数的参数
    func_t<T> _func;//线程要执行的函数
    bool _stop;
    };
}
#endif

三、临界区阻塞队列:BlockQueue

#ifndef __BLOCK_QUEUE_HPP__
#define __BLOCK_QUEUE_HPP__

#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>

template <typename T>
class BlockQueue
{
private:
    bool IsFull()
    {
        return _block_queue.size()==_cap;
    }
    bool IsEmpty()
    {
        return _block_queue.empty();
    }
public:
BlockQueue(int cap):_cap(cap)
{
    _productor_wait_num=0;
    _consumer_wait_num=0;
    pthread_mutex_init(&_mutex,nullptr);
    pthread_cond_init(&_product_cond,nullptr);
    pthread_cond_init(&_consum_cond,nullptr);
}
void Enqueue(T& in)
{
    pthread_mutex_lock(&_mutex);
    while(IsFull())//保证代码的健壮性
    {
         // 生产线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
        // 1. pthread_cond_wait调用是: a. 让调用线程等待 b. 自动释放曾经持有的_mutex锁 c. 当条件满足,线程唤醒,pthread_cond_wait要求线性
        // 必须重新竞争_mutex锁,竞争成功,方可返回!!!
        // 之前:安全
        _productor_wait_num++;
        pthread_cond_wait(&_product_cond,&_mutex);//  只要等待,必定会有唤醒,唤醒的时候,就要继续从这个位置向下运行!!
        _productor_wait_num--;
    }
    //进行生产
    _block_queue.push(in);
    if(_consumer_wait_num > 0)
            pthread_cond_signal(&_consum_cond); // pthread_cond_broadcast
        pthread_mutex_unlock(&_mutex);
}
void Pop(T *out)
{
    pthread_mutex_lock(&_mutex);
    while(IsEmpty())
    {
    // 消费线程去等待,是在临界区中休眠的!你现在还持有锁呢!!!
    // 1. pthread_cond_wait调用是: a. 让调用进程等待 b. 自动释放曾经持有的_mutex锁
        _consumer_wait_num++;
        pthread_cond_wait(&_consum_cond, &_mutex);  // 伪唤醒
        _consumer_wait_num--;
    }
    //进行消费
    *out=_block_queue.front();//让main.cc的消费者拿到任务
    _block_queue.pop();
    //通知生产者生产
    if(_productor_wait_num>0)
        pthread_cond_signal(&_product_cond);
    pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
    pthread_mutex_destroy(&_mutex);
    pthread_cond_destroy(&_product_cond);
    pthread_cond_destroy(&_consum_cond);

}
private:
    std::queue<T> _block_queue;//阻塞队列,是被整体使用的
    int _cap;//总上限
    pthread_mutex_t _mutex;//保护_block_queue的锁
    pthread_cond_t _product_cond;//专门给生产者提供的条件变量
    pthread_cond_t _consum_cond;//专门给生产者提供的条件变量

    int _productor_wait_num;
    int _consumer_wait_num;

};
#endif

四、Makefile

cp:Main.cc
	g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
	rm -f cp

最近更新

  1. TCP协议是安全的吗?

    2024-06-10 20:08:06       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-10 20:08:06       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-10 20:08:06       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-10 20:08:06       18 阅读

热门阅读

  1. leetcode290:单词规律

    2024-06-10 20:08:06       13 阅读
  2. 回溯算法复原ip,子集1和子集2

    2024-06-10 20:08:06       9 阅读
  3. 43.django里写自定义的sql进行查询

    2024-06-10 20:08:06       7 阅读
  4. 独孤思维:副业圈很多骗子

    2024-06-10 20:08:06       9 阅读
  5. Hive 面试题(九)

    2024-06-10 20:08:06       12 阅读
  6. 力扣395.至少有K个重复字符的最长子串

    2024-06-10 20:08:06       7 阅读