引入
有了超市作为商品的"缓冲区"消费者 不用从 生产者那里 “费力” 获取了,同时 生产者 不用有消费者买了才继续生产,消费者可以一直获取商品(如果超市有的话),生产者可以一直生产商品(如果超市没满的话),这样就在一定程度上实现了“解耦”。
生产者和消费者都需要访问超市,超市就是“临界资源”。
在编码角度,有三种关系需要我们进行维护:
三种关系: 生产者和生产者 (互斥),消费者和消费者 (互斥),生产者和消费者(互斥与同步)
两种角色: 生产者,消费者
一个交易场所:缓冲区
基于BlockQueue的生产者消费者模型
任何时候,只能有一个消费者进行消费,或者只有一个生产者进行生产。
缓冲区中是否没有数据了,只有 消费者 最清楚;
缓冲区中是否数据满了,只有 生产者 最清楚。
可能会这么认为:
当生产了数据直接给消费者不就行了么?
当有数据了,直接消费不就行了么?
我们不应该这么狭隘的认为,因为,生产数据需要时间,消费数据同样需要时间,一定程度上,最耗时的是“消费数据”和“生产数据”。
代码实现
BlockQueue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"
const int gDefaultCap=5;
template<class T>
class BlockQueue
{
private:
bool isQueueEmpty()
{
return bq_.size() == 0;
}
bool isQueueFull()
{
return bq_.size() == capacity_;
}
public:
BlockQueue(int capacity=gDefaultCap)
:capacity_(capacity)
{
pthread_mutex_init(&mtx_,nullptr);
pthread_cond_init(&Empty_,nullptr);
pthread_cond_init(&Full_,nullptr);
}
void push(const T& in) //生产者
{
// pthread_mutex_lock(&mtx_);
//pthread_cond_wait是一个函数,可能会调用失败或者存在伪唤醒的情况
// while(isQueueFull()) pthread_cond_wait(&Full_,&mtx_);
// bq_.push(in);
// pthread_cond_signal(&Empty_);
// pthread_mutex_unlock(&mtx_);
lockGuard lockguard(&mtx_);//自动调用构造函数
while(isQueueFull())
pthread_cond_wait(&Full_,&mtx_);
bq_.push(in);
pthread_cond_signal(&Empty_);
}//自动调用lockguarg析构函数
void pop(T *out)
{
lockGuard lockguard(&mtx_);
// pthread_mutex_lock(&mtx_);
while(isQueueEmpty())
pthread_cond_wait(&Empty_,&mtx_);
*out=bq_.front();
bq_.pop();
pthread_cond_signal(&Full_);
// pthread_mutex_unlock(&mtx_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&Empty_);
pthread_cond_destroy(&Full_);
}
private:
std::queue<T> bq_; //阻塞队列
int capacity_; //容量上限
pthread_mutex_t mtx_; //通过互斥锁保证队列安全
pthread_cond_t Empty_; //用来表示bq是否空的条件
pthread_cond_t Full_; //用来标识bq是否满的条件
};
lockGuard.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
{}
void lock()
{
std::cout<<"要进行加锁"<<std::endl;
pthread_mutex_lock(pmtx_);
}
void unlock()
{
std::cout<<"要进行解锁"<<std::endl;
pthread_mutex_unlock(pmtx_);
}
~Mutex()
{}
private:
pthread_mutex_t *pmtx_;
};
//RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx)
:mtx_(mtx)
{
mtx_.lock();
}
~lockGuard()
{
mtx_.unlock();
}
private:
Mutex mtx_;
};
ConProd.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <unistd.h>
#include <ctime>
int myAdd(int x, int y)
{
return x + y;
}
void* consumer(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 获取任务
Task t;
bqueue->pop(&t);
// 完成任务
std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;
// sleep(1);
}
return nullptr;
}
void* productor(void *args)
{
BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
while(true)
{
// 制作任务 -- 不一定是从生产者来的
int x = rand()%10 + 1;
usleep(rand()%1000);
int y = rand()%5 + 1;
// int x, y;
// std::cout << "Please Enter x: ";
// std::cin >> x;
// std::cout << "Please Enter y: ";
// std::cin >> y;
Task t(x, y, myAdd);
// 生产任务
bqueue->push(t);
// 输出消息
std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;
sleep(1);
}
return nullptr;
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
BlockQueue<Task> *bqueue = new BlockQueue<Task>();
pthread_t c[2],p[2];
pthread_create(c, nullptr, consumer, bqueue);
pthread_create(c + 1, nullptr, consumer, bqueue);
pthread_create(p, nullptr, productor, bqueue);
pthread_create(p + 1, nullptr, productor, bqueue);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
delete bqueue;
return 0;
}
运行结果: