生产者消费者模型之阻塞队列

引入

有了超市作为商品的"缓冲区"消费者  不用从 生产者那里 “费力” 获取了,同时  生产者  不用有消费者买了才继续生产,消费者可以一直获取商品(如果超市有的话),生产者可以一直生产商品(如果超市没满的话),这样就在一定程度上实现了“解耦”。

生产者和消费者都需要访问超市,超市就是“临界资源”。

在编码角度,有三种关系需要我们进行维护:

三种关系:  生产者和生产者 (互斥),消费者和消费者 (互斥),生产者和消费者(互斥与同步)

两种角色: 生产者,消费者

一个交易场所:缓冲区

基于BlockQueue的生产者消费者模型

 

任何时候,只能有一个消费者进行消费,或者只有一个生产者进行生产。

缓冲区中是否没有数据了,只有  消费者  最清楚;

缓冲区中是否数据满了,只有   生产者  最清楚。

可能会这么认为:

当生产了数据直接给消费者不就行了么?

当有数据了,直接消费不就行了么? 

我们不应该这么狭隘的认为,因为,生产数据需要时间,消费数据同样需要时间,一定程度上,最耗时的是“消费数据”和“生产数据”。 

代码实现

BlockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <mutex>
#include <pthread.h>
#include "lockGuard.hpp"

const int gDefaultCap=5;

template<class T>
class BlockQueue
{
private:
    bool isQueueEmpty()
    {
        return bq_.size() == 0;
    }
    bool isQueueFull()
    {
        return bq_.size() == capacity_;
    }
public:
    BlockQueue(int capacity=gDefaultCap)
        :capacity_(capacity)
    {
        pthread_mutex_init(&mtx_,nullptr);
        pthread_cond_init(&Empty_,nullptr);
        pthread_cond_init(&Full_,nullptr);
    }
    void push(const T& in)  //生产者
    {
        // pthread_mutex_lock(&mtx_);
        //pthread_cond_wait是一个函数,可能会调用失败或者存在伪唤醒的情况
        // while(isQueueFull()) pthread_cond_wait(&Full_,&mtx_);
        // bq_.push(in);
        // pthread_cond_signal(&Empty_);
        // pthread_mutex_unlock(&mtx_);

        lockGuard lockguard(&mtx_);//自动调用构造函数

        while(isQueueFull())
            pthread_cond_wait(&Full_,&mtx_);
        bq_.push(in);
        pthread_cond_signal(&Empty_);
    }//自动调用lockguarg析构函数
    void pop(T *out)
    {
        lockGuard lockguard(&mtx_);
        // pthread_mutex_lock(&mtx_);
        while(isQueueEmpty())
            pthread_cond_wait(&Empty_,&mtx_);
        *out=bq_.front();
        bq_.pop();

        pthread_cond_signal(&Full_);

        // pthread_mutex_unlock(&mtx_);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&mtx_);
        pthread_cond_destroy(&Empty_);
        pthread_cond_destroy(&Full_);
    }
private:
    std::queue<T> bq_;      //阻塞队列
    int capacity_;          //容量上限
    pthread_mutex_t mtx_;   //通过互斥锁保证队列安全
    pthread_cond_t Empty_;  //用来表示bq是否空的条件
    pthread_cond_t Full_;   //用来标识bq是否满的条件
};

 lockGuard.hpp

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t *mtx):pmtx_(mtx)
    {}
    void lock()
    {
        std::cout<<"要进行加锁"<<std::endl;
        pthread_mutex_lock(pmtx_);
    }
    void unlock()
    {
        std::cout<<"要进行解锁"<<std::endl;
        pthread_mutex_unlock(pmtx_);
    }
    ~Mutex()
    {}
private:
    pthread_mutex_t *pmtx_;
};

//RAII风格的加锁方式
class lockGuard
{
public:
    lockGuard(pthread_mutex_t *mtx)
        :mtx_(mtx)
    {
        mtx_.lock();
    }
    ~lockGuard()
    {
        mtx_.unlock();
    }
private:
    Mutex mtx_;
};

ConProd.cc

#include "BlockQueue.hpp"
#include "Task.hpp"

#include <pthread.h>
#include <unistd.h>
#include <ctime>

int myAdd(int x, int y)
{
    return x + y;
}

void* consumer(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while(true)
    {
        // 获取任务
        Task t;
        bqueue->pop(&t);
        // 完成任务
        std::cout << pthread_self() <<" consumer: "<< t.x_ << "+" << t.y_ << "=" << t() << std::endl;
        // sleep(1);
    }

    return nullptr;
}

void* productor(void *args)
{
    BlockQueue<Task> *bqueue = (BlockQueue<Task> *)args;
    while(true)
    {
        // 制作任务 -- 不一定是从生产者来的
        int x = rand()%10 + 1;
        usleep(rand()%1000);
        int y = rand()%5 + 1;
        // int x, y;
        // std::cout << "Please Enter x: ";
        // std::cin >> x;
        // std::cout << "Please Enter y: ";
        // std::cin >> y;
        Task t(x, y, myAdd);
        // 生产任务
        bqueue->push(t);
        // 输出消息
        std::cout <<pthread_self() <<" productor: "<< t.x_ << "+" << t.y_ << "=?" << std::endl;
        sleep(1);
    }
    return nullptr;
}

int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ 0x32457);
    BlockQueue<Task> *bqueue = new BlockQueue<Task>();

    pthread_t c[2],p[2];
    pthread_create(c, nullptr, consumer, bqueue);
    pthread_create(c + 1, nullptr, consumer, bqueue);
    pthread_create(p, nullptr, productor, bqueue);
    pthread_create(p + 1, nullptr, productor, bqueue);

    pthread_join(c[0], nullptr);
    pthread_join(c[1], nullptr);
    pthread_join(p[0], nullptr);
    pthread_join(p[1], nullptr);

    delete bqueue;

    return 0;
}

运行结果:

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-21 02:54:01       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-21 02:54:01       106 阅读
  3. 在Django里面运行非项目文件

    2024-04-21 02:54:01       87 阅读
  4. Python语言-面向对象

    2024-04-21 02:54:01       96 阅读

热门阅读

  1. NLP和LLMs: 理解它们之间的区别

    2024-04-21 02:54:01       35 阅读
  2. Oracle中的时间戳转换与使用

    2024-04-21 02:54:01       36 阅读
  3. Android开发学习(1)--楔子

    2024-04-21 02:54:01       37 阅读
  4. [论文笔记] megatron训练参数:dataloader_type

    2024-04-21 02:54:01       31 阅读
  5. Blender3.6下载地址

    2024-04-21 02:54:01       37 阅读
  6. 前端遇到的问题

    2024-04-21 02:54:01       35 阅读
  7. Oracle存储过程的使用与实例

    2024-04-21 02:54:01       34 阅读
  8. C# 语言类型(三)—数组/枚举类型/结构体

    2024-04-21 02:54:01       36 阅读
  9. 用爬虫玩转石墨文档

    2024-04-21 02:54:01       36 阅读
  10. C语言经典例题-20

    2024-04-21 02:54:01       41 阅读
  11. 【第一章 先导篇】1. 规范化的学习模型

    2024-04-21 02:54:01       32 阅读