【手写数据库内核组件】0501多线程并发模型,任务分发多工作者执行架构实现,多线程读写状态时volatile存储类型使用技巧

0501 多线程管理

专栏内容

个人主页我的主页
管理社区开源数据库
座右铭:天行健,君子以自强不息;地势坤,君子以厚德载物.

一、概述


现代的CPU都会采用多个core的形式具有并行执行的能力,同一时间可以打开多个应用程序,即使是我们的手机,它的CPU也是非常强大的多核处理器。

如何让我们开发的应用程序充分利用多核CPU呢,这就不得不说多线程模型。

本文就来分享一下多线程模型的搭建与使用。

二、 原理与机制


在应用程序的架构中,一般采用分层原则,业务以任务的形式发布,而执行者接收任务只负责执行,并记录结果。

基于这样的总体架构设计,对于多线程的使用分为两种方式:

  1. 当任务产生时,再启动线程,任务执行完成后,线程也随之结束;
  2. 希望在应用程序启动时,有一定数量的执行者线程就开始处理待命状态,这个数量也决定了应用程序的并发,也就是处理任务的吞吐量;

第一种模式,适合一些任务量不大的业务逻辑,没有业务任务时,不需要占用系统资源;

而对于第二种模式,适合大量任务的场景,频繁的启动和销毁线程反而会带来大量的开销,最好是提前准备好线程,每个线程能执行不同的任务,线程是可重入的。

在这里插入图片描述

对于数据库程序而言,部分业务应用会以短连接的形式连接到数据库,可能执行一条或几条SQL就断开了,面对这样大量的短连接时,数据库内核需要保持一定数量的工作线程,来提升处理性能。

如何让多个线程保持等待状态,同时当有任务时还可以唤醒呢?
下面让一步步分解来看。

三、多条流水线的工厂

对于执行任务的线程,我们叫它Worker线程,它们在程序启动时就会创建,然后不停的执行任务,类似于流水线生产一样。

而在一个工厂会有多条这样的流水线,当工厂接到订单时,就会派发给其中一个生产线,并制定生产计划。

下面我们来看Worker线程的定义和工厂的定义,以及它们的初始化。

3.1 Worker信息结构定义

工作者线程需要记录一些信息,如运行状态,线程ID,还有对应的处理接口等,当然每个工作者会有一个唤醒器,也就是信号量。

typedef enum WORKER_STATE
{
    TW_IDLE,
    TW_RUNNING,
    TW_UNKNOWN
}WS_STATE;

typedef struct ThreadWorkerInfo 
{
    unsigned int tw_threadid;
    volatile WS_STATE tw_state;
    SemLock taskIdleLock;
    TaskProcess taskEntry;
}ThreadWorkerInfo;

说明

  • tw_threadid, 是创建工作者线程的ID;
  • tw_state,工作者线程的状态,运行之后是idle状态;当有任务执行时,为running状态;执行结束后,又回到了idle状态;
  • taskIdleLock,当工作者空闲时,会设置有效,有任务时唤醒工作者。这里可以使用信号量,初始化计数器为0;
  • taskEntry,任务处理接口;当有任务时,调用对应的任务处理接口进行处理;

注意,这里的tw_state的存储类型采用 volatile ,后面会看到这个值会被两个线程修改和访问,因为并没有竞争,所以没有进行加锁保护,为了数据的一致性,每次都会从内存进行读取。

3.2 工厂的结构定义

工厂记录了所有工作者的信息,当有任务产生时,来选择空闲的工作者进行派发。

#define WORK_THREAD_NUM 16
typedef struct ThreadFactoryInfo 
{
    ThreadWorkerInfo workerInfoList[WORK_THREAD_NUM];
}ThreadFactoryInfo;

说明

  • 工作者数量为静态定义,也可以动态数组的形式定义;
  • 当有任务产生时,遍历数组,找到空闲工作者进行派发;

3.3 工厂的建立

在程序启动时,我们将工厂进行建立,此时流水线工作者准备就绪,都处于空闲状态。

工厂遍历数组,初始化每一个工作者。

int CreeateWorkerThread(ThreadWorkerInfo *work)
{
    int ret = 0;
    pthread_t threadId;
   
    if(NULL == InitializeSem(0, &worker->taskIdleLock))
    {
        return -1;
    }


    ret = pthread_create(&threadId, NULL, threadEntry, (void *)worker);
    if (ret != 0) 
    {
        return -1;
    }

    worker->tw_threadid = (unsigned int)threadId;
    worker->tw_state = TW_IDLE;
    worker->taskEntry = NULL;

    return 0;
}

工作者的初始化说明

  • 信号量的初始化,初始计数器为0;
  • 启动线程,这里的线程的执行入口为threadEntry,在下一小节介绍;
  • 线程的入参为worker信息本身;
  • 初始化线程状态为idle, 此时线程的任务处理接口为NULL;

当然,在程序结束时,我们需要对创建的信号量和线程资源进行回收。

int DestoryWorkerThread(ThreadWorkerInfo *worker)
{
    int* ret = 0;

    if(NULL == worker)
        return 0;
    
    if(worker->tw_threadid > 0)
    {
        pthread_join((pthread_t)&worker->tw_threadid, (void **)&ret);
    }

    DestorySem(&workerInfo->taskIdleLock);

    return 0;
}

线程默认情况下需要通过pthread_join进行回收资源,当然也可以设置为分离状态,这里就不再对线程关注。

四、分发任务

任务的产生和分发,可以由主线程进行,当接收到网络消息或键盘指令后,生成任务,然后进行派发。

在这里插入图片描述

派发的流程

  • 准备任务;
  • 查找空闲工作者;
  • 找到后空闲工作者后,将任务派发给工作者;
  • 唤醒工作者;

代码实现

static ThreadFactoryInfo factory;

ThreadWorkerInfo * GetIdleWorker()
{
    int index = WORK_THREAD_NUM - 1;

    for(; index >= 0; index --)
    {
        if(factory->workerInfoList[index].tw_state == TW_IDLE)
            return &factory->workerInfoList[index];
    }

    return NULL;
}

int PushTask(TaskProcess taskProc)
{
    ThreadWorkerInfo *idleWorker = NULL;
    
    idleWorker = GetIdleWorker();
    if(NULL == idleWorker)
    {
        return -1;
    }

    idleWorker->taskEntry = taskProc;
    worker->tw_state = TW_RUNNING;
    PostSem(&idleWorker->taskIdleLock);

    return 0;
}

注意

在派发任务时,要注意操作的顺序;

先赋值任务处理接口和运行状态,再进行唤醒;

这样就不会竞争访问taskEntry,同时在信号量的唤醒操作中默认带有内存同步操作。

五、执行任务

工作者线程创建后,调用线程主函数threadEntry,在此处工作者处于就绪状态。

在这里插入图片描述

代码实现如下:

static void* threadEntry(void *arg)
{
    ThreadWorkerInfo *worker = (ThreadWorkerInfo*)arg;
    int ret = 0;

    if(NULL == worker)
        return NULL;
    
    while(worker->tw_threadid > 0)
    {
        ret = WaitSem(&worker->taskIdleLock);
        if(ret < 0)
        {
            break;
        }

        if(NULL != worker->taskEntry)
        {
            worker->taskEntry(&workerInfo->taskContext);
            worker->tw_state = TW_IDLE;
            worker->taskEntry = NULL;
        }        
    }

    return NULL;
}

说明

  • 在线程启动后,会等待信号量的通知;
  • 如果信号量被通知,此时检查任务是否被分发;
  • 有任务时,调用任务处理接口,执行任务;
  • 当任务执行完成后,继续等待信号量通知;

六、总结


本文分享了并发编程模型中,分发-并发执行的经典架构;

在这一架构中,工作者线程通过信号量的等待处理就绪状态;

分发者当有任务产生时,先派发任务,再唤醒工作者。

结尾


非常感谢大家的支持,在浏览的同时别忘了留下您宝贵的评论,如果觉得值得鼓励,请点赞,收藏,我会更加努力!

作者邮箱:study@senllang.onaliyun.com
如有错误或者疏漏欢迎指出,互相学习。

注:未经同意,不得转载!

相关推荐

  1. longjmp和线线实例

    2024-07-17 10:22:06       21 阅读
  2. C++线文件

    2024-07-17 10:22:06       24 阅读
  3. JVM线和锁

    2024-07-17 10:22:06       60 阅读
  4. 线架构

    2024-07-17 10:22:06       40 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-17 10:22:06       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-17 10:22:06       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-17 10:22:06       57 阅读
  4. Python语言-面向对象

    2024-07-17 10:22:06       68 阅读

热门阅读

  1. 初识Flutter问答&学习步骤

    2024-07-17 10:22:06       21 阅读
  2. golang mux组件兼容转移url

    2024-07-17 10:22:06       21 阅读
  3. 用户excel对CAN进行图形化展示

    2024-07-17 10:22:06       23 阅读
  4. SpringBoot如何使用Kafka来优化接口请求的并发

    2024-07-17 10:22:06       25 阅读
  5. 力扣---46.全排列

    2024-07-17 10:22:06       24 阅读
  6. PFA、PEEK和PP的材质、特点及用途

    2024-07-17 10:22:06       22 阅读
  7. 2024.07.16 oracle函数练习

    2024-07-17 10:22:06       29 阅读
  8. 第一章 Typescript小白快速入门

    2024-07-17 10:22:06       19 阅读
  9. webpack生产环境下的配置

    2024-07-17 10:22:06       26 阅读
  10. Matlab学习笔记01 - 基本数据类型

    2024-07-17 10:22:06       27 阅读
  11. spring-boot2.x整合Kafka步骤

    2024-07-17 10:22:06       19 阅读
  12. 武汉大学学报哲学社会科学版

    2024-07-17 10:22:06       22 阅读