Linux:生产消费模型 & 读者写者模型


生产消费模型

生产消费模型是一种用于处理多线程之间任务资源分配的设计模式,其可以提高多线程处理任务的效率。

假设我们现在有多个线程,一部分线程负责获取任务,称为productor,另一部线程负责执行任务,称为consumer

有一个设计模式如图所示:

在这里插入图片描述

左侧是派发任务的线程productor,右侧是执行任务的线程consumer,他们之间是一对一的关系。上图中这个模式有以下问题:

在这里插入图片描述

现在生产者productor-3有任务要派发,但是consumer-3正在执行上一个任务,于是productor-3就只能等待。但是此时明明有两个线程consumer-1consumer-2是空闲的,它们却不能执行这个任务,这就出现了很明显的资源分配不合理问题。

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题:

在这里插入图片描述

生产者和消费者彼此之间不直接通讯,而通过任务队列来进行通讯:

  • 生产者生产完数据之后不用等待消费者处理,直接扔给任务队列
  • 消费者不找生产者要数据,而是直接从任务队列里取

任务队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个任务队列就是用来给生产者和消费者解耦的。

本博客讲解两种任务队列的实现方式:阻塞队列环形队列


阻塞队列

阻塞队列BlockQueue常用于实现生产消费模型,示意如下:

在这里插入图片描述

相比于普通队列,阻塞队列有以下特性:

  1. 当阻塞队列满了,此时再放数据则会被阻塞等待,直到队列中有空间为止
  2. 当阻塞队列为空,此时读取数据则会被阻塞等待,直到队列中有数据为止

接下来我们就来实现一下这个阻塞队列:

基本结构

既然需要一个阻塞队列的类,那么第一步就是需要一个队列,我们的队列要求有上限,超过某个值就要阻塞,所以还要有一个成员来标识队列的容量上限

template <typename T>
class blockQueue
{
private:
    std::queue<T> _blockQueue; // 阻塞队列
    int _cap;                   // 队列的上限
};

此处阻塞队列为STL自带的queue,队列的容量上限为_cap。此处的模板参数T,最后是一个可调用对象,也就是一个可以执行的任务

接下来要考虑的就是线程之间的互斥与同步,那么就要回答以下几个问题:

  1. 消费者消费者可以同时访问队列吗?不可以,所以消费者之间要互斥
  2. 生产者生产者可以同时访问队列吗?不可以,所以生产者之间要互斥
  3. 生产者消费者可以同时访问队列吗?不可以,所以生产者与消费者之间要互斥

综上,其实就是所有线程访问阻塞队列时都要互斥,那么只需要一把锁就可以了。

随后就是同步问题:

  1. 什么时候消费者可以拿任务?当任务队列有任务时,否则消费者去等待队列
  2. 什么时候生产者可以生产任务?当任务队列没满时,否则生产者去等待队列

生产者消费者可以共用一个等待队列吗?或者说,它们可以共用一个条件变量吗?

是不行的,如果它们共用一个条件变量,就会导致唤醒等待队列时,不知道唤醒的是生产者还是消费者。

综上:我们需要一把锁维护所有线程访问阻塞队列时的互斥两个条件变量分别维护生产者与消费者的同步

template <typename T>
class blockQueue
{
private:
    std::queue<T> _blockQueue; // 阻塞队列
    int _cap;                  // 队列的上限

    pthread_mutex_t _mutex;       // 访问阻塞队列时互斥
    pthread_cond_t _consum_cond;  // 消费者的条件变量
    pthread_cond_t _product_cond; // 生产者的条件变量
};

以上就是我们所需要的所有成员了,接下来写一下构造函数和析构函数:


构造与析构

在构造函数中,需要外部传入一个值,来指定这个阻塞队列的总上限,以及初始化互斥锁和条件变量。

代码:

template <typename T>
class blockQueue
{
public:
    blockQueue(int cap = 5)
        : _cap(cap)
        , _blockQueue(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_consum_cond, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
    }

    ~blockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consum_cond);
        pthread_cond_destroy(&_product_cond);
    }
};

投放任务

接下来就设计接口enQueue,通过该接口可以往阻塞队列中放任务。要考虑以下线程安全问题:

  1. 访问阻塞队列前先加锁,访问完毕后解锁
  2. 如果阻塞队列满了,就到条件变量_product_cond下面去等待
  3. 如果阻塞队列没慢,或者被从条件变量下唤醒,此时就可以投入任务了
  4. 投入任务后,唤醒_consum_cond下面等待的消费之来执行任务

代码:

template <typename T>
class blockQueue
{
public:
    void enQueue(const T& in)
    {
        pthread_mutex_lock(&_mutex);

        while (_blockQueue.size() == _cap)
        {
            pthread_cond_wait(&_product_cond, &_mutex);
        }

        _blockQueue.push(in);
        pthread_cond_signal(&_consum_cond);

        pthread_mutex_unlock(&_mutex);
    }
};

以上代码中, pthread_mutex_lock(&_mutex)先加锁,随后while语句判断当前阻塞队列是否满,如果满了就执行pthread_cond_wait(&_product_cond, &_mutex)去阻塞等待。

当不满足条件,或者线程被唤醒,此时就可以_blockQueue.push(in)来投放任务了,当投放完任务,此时通过pthread_cond_signal(&_consum_cond)唤醒一个消费者来执行任务。最后释放掉自己的锁。


获取任务

接下来就是接口popQueue消费者要去队列中拿任务,相似的要考虑以下线程安全问题:

  1. 访问阻塞队列前先加锁,访问完毕后解锁
  2. 如果阻塞队列为空,就到条件变量_consum_cond下面去等待
  3. 如果阻塞队列不为空,或者被从条件变量下唤醒,此时就可以获取任务了
  4. 获取任务后,唤醒条件变量_product_cond下等待的生产者,表示当前有任务拿走了,位置空出来了,可以来生产任务了

代码:

template <typename T>
class blockQueue
{
public:
    void popQueue(T& out)
    {
        pthread_mutex_lock(&_mutex);

        while (_blockQueue.empty())
        {
            pthread_cond_wait(&_consum_cond, &_mutex);
        }

        out = _blockQueue.front();
        _blockQueue.pop();

        pthread_cond_signal(&_product_cond);

        pthread_mutex_unlock(&_mutex);
    }
};

传入一个T&类型的引用,用于输出任务。

一开始通过pthread_mutex_lock加锁,保证访问阻塞队列的互斥。随后判断while判断阻塞队列是否为空,如果为空就通过pthread_cond_wait(&_consum_cond, &_mutex)进入等待队列阻塞等待。

当任务队列不为空,或者被从_consum_cond的等待队列中唤醒,此时就可以拿走队列中的任务了。拿走任务后,通过pthread_cond_signal(&_product_cond)唤醒生产者来投放任务,最后释放锁。


总代码

现在我把这个阻塞队列放到文件blockQueue.hpp中。

blockQueue.hpp

#pragma once

#include <iostream>
#include <queue>
#include <string>

#include <unistd.h>
#include <pthread.h>

template <typename T>
class blockQueue
{
public:
    blockQueue(int cap = 5)
        : _cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_consum_cond, nullptr);
        pthread_cond_init(&_product_cond, nullptr);
    }

    ~blockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consum_cond);
        pthread_cond_destroy(&_product_cond);
    }

    void enQueue(const T& in)
    {
        pthread_mutex_lock(&_mutex);

        while (_blockQueue.size() == _cap)
        {
            pthread_cond_wait(&_product_cond, &_mutex);
        }

        _blockQueue.push(in);
        pthread_cond_signal(&_consum_cond);

        pthread_mutex_unlock(&_mutex);
    }

    void popQueue(T& out)
    {
        pthread_mutex_lock(&_mutex);

        while (_blockQueue.empty())
        {
            pthread_cond_wait(&_consum_cond, &_mutex);
        }

        out = _blockQueue.front();
        _blockQueue.pop();

        pthread_cond_signal(&_product_cond);

        pthread_mutex_unlock(&_mutex);
    }

private:
    std::queue<T> _blockQueue; // 阻塞队列
    int _cap;                  // 队列的上限

    pthread_mutex_t _mutex;       // 访问阻塞队列时互斥
    pthread_cond_t _consum_cond;  // 消费者的条件变量
    pthread_cond_t _product_cond; // 生产者的条件变量
};

接下来实现基于环形队列阻塞队列,在那之前,我们要学一下POSIX 信号量

POSIX 信号量

基本概念

信号量(Semaphore)是一种用于控制多个线程或进程对共享资源访问的同步机制。它本质上是一个计数器,用来表示某种资源的可用数量

信号量的核心概念:

  • 计数器: 信号量维护一个计数器,它表示当前可用的资源数量。
  • 等待: 当一个线程想要访问资源时,它会检查信号量的计数器。如果计数器大于 0,则表示有可用资源,线程可以访问资源并使计数器减 1。如果计数器为 0,则表示没有可用资源,线程会进入等待状态,直到有其他线程释放资源。
  • 释放: 当一个线程完成对资源的访问后,它会将计数器加 1,并唤醒一个等待的线程(如果有)。

信号量的类型:

  • 二元信号量(Binary Semaphore): 计数器只能取 0 或 1 的值,表示资源是可用(1)还是不可用(0)。此时就是互斥锁,保证同一时间只有一个线程访问共享资源。
  • 计数信号量(Counting Semaphore): 计数器可以取任意非负整数,表示可用资源的数量。常用于控制对资源池的访问。

你可以简单地认为,信号量就是一个计数器,只不过是原子性的


接口

信号量的类型为sem_t,所有信号量的操作,都基于一个sem_t类型的变量。

sem_init

sem_init函数用于初始化一个信号量,需要头文件<semaphore.h>,函数原型如下:

int sem_init(sem_t *sem, int pshared, unsigned int value);

参数:

  • sem:指向sem_t信号量的指针
  • pshared:若该值为0,则信号量在线程之间共享;若该非0,信号量在进程之间共享。
  • value:信号量的初始值

sem_destory

sem_destory函数用于销毁一个信号量,需要头文件<semaphore.h>,函数原型如下:

int sem_destroy(sem_t *sem);

sem_wait
sem_wait函数用于申请一个信号量,需要头文件<semaphore.h>,函数原型如下:

int sem_wait(sem_t *sem);

sem_post

sem_post函数用于释放一个信号量,需要头文件<semaphore.h>,函数原型如下:

int sem_post(sem_t *sem);

环形队列

生产消费模型使用的环形队列,与数据结构中的环形队列不同,此处的环形队列是基于信号量完成的。

如图所示:

在这里插入图片描述

上图是一个环形队列,生产者消费者都绕着环的顺时针走,黑色代表这个位置有数据,白色代表这个位置没有数据。生产者在前面生产,消费者跟在后面消费,这就是基于环形队列的生产消费模型。

我们可以把环形队列中的资源用信号量表示。一个信号量_room_sem用于表示:该环形队列中有几个数据。另外一个信号量_data_sem表示:该环形队列有几个空位置

就上图来说:_room_sem = 4_data_sem = 4

  • 只有_room_sem的值大于0,表示还有空间可以放数据,此时生产者才可以生产
  • 只有_data_sem的值大于0,表示当前有数据可以读取,此时消费者才可以消费

接下来我们分析一下生产者消费者之间的同步与互斥关系:

  • 生产者生产者之间:互斥,两个生产者不能在同一个位置放入数据
  • 消费者消费者之间:互斥,两个消费者不能同时读取一个位置的数据
  • 生产者消费者之间:有互斥关系,但无需额外维护

此处最重要的就是,第三条:基于环形队列的模型,生产者消费者之间有互斥关系,但无需额外维护

为什么呢?我们之前讲解阻塞队列时,要保证生产者消费者不会同时访问阻塞队列,因为有可能会造成生产者消费者访问同一块数据的问题。

但是在环形队列中,什么情况下生产者消费者才会访问同一块数据呢?环形队列全空,或者全满的时候

也就是下图:

在这里插入图片描述

此时生产者消费者访问同一块数据,但是:

  • 对于全满:信号量_room_sem = 0,此时生产者不会访问队列;
  • 对于全空:信号量_data_sem = 0,此时消费者不会访问队列;

除去这两种情况,其余时候生产者消费者可以同时访问同一个队列的不同部分!所以相比于阻塞队列环形队列可以让生产和消费同时进行。

接下来就来实现一下这个基于信号量的环形队列


基本结构

我们的环形队列,不能用简单的队列,因为queue只提供push尾插,pop头删这样的接口。而我们需要对这个队列进行随机访问,此时最好用vector来模拟。

其次我们要有以下信息:

  • _cap:该队列的最大值
  • _consumer_step:当前消费者访问哪一个数据块,也就是数组下标
  • _productor_step:当前生产者访问哪一个数据块,也就是数组下标

其次就是两个信号量:

  • _room_sem = 0:当前有几个空位置
  • _data_sem = 0:当前有几个有数据的位置

还有两把锁:

  • _consumer_mutex消费者消费者之间的互斥
  • _productor_mutex生产者生产者之间的互斥

代码:

template <typename T>
class ringQueue
{
private:
    std::vector<T> _ringQueue; // 环形队列
    int _cap;                  // 队列的总容量

    int _consumer_step;  // 消费者当前位置
    int _productor_step; // 生产者当前位置

    sem_t _room_sem; // 空位置数目
    sem_t _data_sem; // 有数据的位置数目

    pthread_mutex_t _consumer_mutex;  // 消费者之间互斥
    pthread_mutex_t _productor_mutex; // 生产者之间互斥
};

构造与析构

构造函数中,要用户传入cap,指定该环形队列的最大值,随后初始化各个信号量。而析构函数中,则是销毁信号量

代码:

template <typename T>
class ringQueue
{
public:
    ringQueue(int cap = 5)
        : _cap(cap)
        , _ringQueue(cap)
        , _consumer_step(0)
        , _productor_step(0)
    {
        sem_init(&_room_sem, 0, cap);
        sem_init(&_data_sem, 0, 0);

        pthread_mutex_init(&_consumer_mutex, nullptr);
        pthread_mutex_init(&_consumer_mutex, nullptr);
    }

    ~ringQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_consumer_mutex);
        pthread_mutex_destroy(&_productor_mutex);
    }
};

此处构造函数中,sem_init(&_room_sem, 0, cap);,第二个参数0表示该信号量在线程键共享,cap为环形队列最大容量,因为一开始所有位置都为空,所以_room_sem = cap,而_data_sem = 0


投放任务

接下来设计enQueue接口,往环形队列中放任务,过程如下:

  1. 申请信号量
  2. 竞争锁
  3. 放数据

先展示代码,后解释:

template <typename T>
class ringQueue
{
public:
    void enQueue(const T& in)
    {
        sem_wait(&_room_sem);

        pthread_mutex_lock(&_productor_mutex);

        _ringQueue[_productor_step] = in;
        _productor_step = (_productor_step + 1) % _cap;

        pthread_mutex_unlock(&_productor_mutex);

        sem_post(&_data_sem);
    }
};

第一步sem_wait为申请一个信号量_room_sem,所谓申请一个信号量,就相当于_room_sem--,即表示可用资源减少一个。但是如果_room_sem = 0,那么此时申请这个信号量的线程就会阻塞,直到信号量不为0

随后pthread_mutex_lock加锁,与其他生产者保持互斥,再_ringQueue[_productor_step] = in,即投放任务到环形队列的头部。

_productor_step = (_productor_step + 1) _cap则是让队列头走到下一个位置,为了形成一个逻辑上的环,最后还要对_cap取模。

最后释放锁,再sem_post(&_data_sem),这相当于信号量_data_sem++,也就是当前队列中的数据多加了一个。


获取任务

popQueue接口用于消费者获取任务,有了上一个接口铺垫,这个接口就很简单了:

template <typename T>
class ringQueue
{
public:
    void popQueue(T &out)
    {
        sem_wait(&_data_sem);

        pthread_mutex_lock(&_consumer_mutex);

        out = _ringQueue[_consumer_step];
        _productor_step = (_consumer_step + 1) % _cap;

        pthread_mutex_unlock(&_consumer_mutex);

        sem_post(&_room_sem);
    }
};

首先申请一个信号量sem_wait(&_data_sem),相当于_data_sem--,表示这个当前队列中的数据少了一个,随后加锁保持消费者之间的互斥。

当进入临界区后,out = _ringQueue[_consumer_step]将数据读取到输出型参数out中,再_productor_step = (_consumer_step + 1) % _cap,让环形队列的尾巴走到下一个位置。

最后释放锁,sem_post(&_room_sem)相当于_room_sem++,表示这个位置的数据已经拿走,空位置多了一个。


总代码

现在把环形队列封装在文件ringQueue.hpp中:

#pragma once

#include <iostream>
#include <string>
#include <vector>

#include <unistd.h>
#include <pthread.h>
#include <semaphore.h>

template <typename T>
class ringQueue
{
public:
    ringQueue(int cap = 5)
        : _cap(cap)
        , _ringQueue(cap)
        , _consumer_step(0)
        , _productor_step(0)
    {
        sem_init(&_room_sem, 0, cap);
        sem_init(&_data_sem, 0, 0);

        pthread_mutex_init(&_consumer_mutex, nullptr);
        pthread_mutex_init(&_consumer_mutex, nullptr);
    }

    ~ringQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_consumer_mutex);
        pthread_mutex_destroy(&_productor_mutex);
    }

    void enQueue(const T& in)
    {
        sem_wait(&_room_sem);

        pthread_mutex_lock(&_productor_mutex);

        _ringQueue[_productor_step] = in;
        _productor_step = (_productor_step + 1) % _cap;

        pthread_mutex_unlock(&_productor_mutex);

        sem_post(&_data_sem);
    }

    void popQueue(T &out)
    {
        sem_wait(&_data_sem);

        pthread_mutex_lock(&_consumer_mutex);

        out = _ringQueue[_consumer_step];
        _productor_step = (_consumer_step + 1) % _cap;

        pthread_mutex_unlock(&_consumer_mutex);

        sem_post(&_room_sem);
    }

private:
    std::vector<T> _ringQueue; // 环形队列
    int _cap;                  // 队列的总容量

    int _consumer_step;  // 消费者当前位置
    int _productor_step; // 生产者当前位置

    sem_t _room_sem; // 空位置数目
    sem_t _data_sem; // 有数据的位置数目

    pthread_mutex_t _consumer_mutex;  // 消费者之间互斥
    pthread_mutex_t _productor_mutex; // 生产者之间互斥
};

读者写者模型

读者写者模型也是是一种用于处理多线程之间资源分配的设计模式,其可以提高多线程处理任务的效率。

但是其与生产者消费者略有不同。生产消费模型中就像一个商店,生产者向商店投放商品,消费者消费商品后,会把商品带走

而读者写者模型中就像一个黑板报,写者在黑板报上写内容,读者读完后,不会把黑板上的内容擦掉,其他人还可以看到同样的内容

简单分析一下同步与互斥关系:

  • 写者写者:互斥关系,不能同时往一个区域写入
  • 读者写者:互斥与同步,写者在写的时候,读者不能看,不然会拿到不完整的数据
  • 读者读者既不互斥,也不同步

黑板报是可以多个人一起看的,因为看黑板报的人不会修改这个黑板报。同理,因为读者不会修改共享资源,此时可以被多个读者一起访问

读者写者模型有两种模式:读者优先模式写者优先模式

  • 读者优先模式:只要读者在读,写者就必须等待,直到没有任何读者读取,写者才可以写入
  • 写者优先模式:如果写者想要写入,后续来的读者都不能读取,只能等待。当所有之前的读者都走光了,写者就可以写入了

Linux已经提供了读者写者模型的相关接口 - 读写锁,我们就不额外实现了。


读写锁

rwlock 读写锁包含在pthread库中,使用方式与mutex 互斥锁几乎一模一样,我简单介绍一下:

读写锁的类型是pthread_rwlock_t。全局的读写锁直接使用PTHREAD_RWLOCK_INITIALIZER初始化,无需销毁。局部的读写锁则用init初始化,destroy销毁。

接口如下:

创建

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

销毁

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

加锁
略微不同的就是加锁,由于读者写者模型中,线程有两种身份,此时加锁也有两种方式:

  • 读者加锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
  • 写者加锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

解锁

unlock可以同时释放rdlockwrlock

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

相关推荐

  1. [python3] 读者-模式

    2024-06-18 09:36:03       23 阅读
  2. 信号量&线程池&读者模型

    2024-06-18 09:36:03       10 阅读
  3. Linux生产者消费者模型

    2024-06-18 09:36:03       13 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-18 09:36:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-18 09:36:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-18 09:36:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-18 09:36:03       20 阅读

热门阅读

  1. linux vim最全面教程

    2024-06-18 09:36:03       5 阅读
  2. 新手入门GitHub:一步步教你上手

    2024-06-18 09:36:03       8 阅读
  3. The difference between Radar and ADS-B

    2024-06-18 09:36:03       9 阅读
  4. 三生随记——AI的觉醒

    2024-06-18 09:36:03       8 阅读
  5. c++期末复习题

    2024-06-18 09:36:03       7 阅读
  6. 一个换衣服的项目流程总结

    2024-06-18 09:36:03       8 阅读
  7. Brief introduction of SIFT algorithm

    2024-06-18 09:36:03       6 阅读