C-线程池

1、介绍

一个线程池由三部分组成:
        (1)管理者线程,用于实时判断线程池中的任务个数、线程个数;并根据两者之间的变化来创建或销毁线程,与任务队列达到平衡。对应于示例中的thread_manage()函数。
        (2)工作线程,线程池中线程,个数不定,在没有任务时处于等待状态,可以循环的执行任务。对应于示例中的thread_work()函数。
        (3)任务队列,用于存放没有处理的任务,链表、数组等结构存储。

2、示例

代码参考:

        手写线程池 - C语言版 | 爱编程的大丙

        C语言实现简单的线程池项目(附完整工程文件)_线程池项目文档-CSDN博客

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

typedef struct TaskiLists
{
    void (*function)(void* arg);
    void* arg;
}TaskLists;


struct ThreadPool
{
    // 任务队列
    TaskLists* task_list;
    // 任务队列的容量
    int task_list_cap;
    // 当前任务队列中任务的数量
    int task_list_size;
    // 任务队列队头数据
    int task_list_front;
    // 任务队列队尾数据
    int task_list_rear;


    // 线程池中线程队列
    pthread_t* threads_id;
    // 线程池中线程的最大数量
    int threads_max;
    // 线程池中线程的最小数量
    int threads_min;
    // 工作的线程个数
    int threads_busy_num;
    // 存活的线程个数
    int threads_live_num;
    // 要销毁的线程个数
    int threads_exit_num;

    // 管理者线程
    pthread_t thread_manage;

    // 锁  锁住整个线程池
    pthread_mutex_t lock_pool;
    // 锁  锁住threads_busy_num变量
    pthread_mutex_t lock_threads_busy_num;
    // 条件变量  task_list是不是达到最大容量
    pthread_cond_t task_list_full;
    // 条件变量  task_list是不是为空
    pthread_cond_t task_list_empty;

    // 是否销毁线程,销毁=1;不销毁=0
    int shut_down;
};

typedef struct ThreadPool ThreadPool;

//############################################################################################################
//############################################################################################################

// 给线程池添加任务
void thread_pool_add(ThreadPool* pool, void(*func)(void*), void* arg)
{
    pthread_mutex_lock(&(pool->lock_pool));
    // 如果任务列表已满,添加进程阻塞
    while (pool->task_list_size == pool->task_list_cap && pool->shut_down == 0)
    {
        pthread_cond_wait(&(pool->task_list_full), &(pool->lock_pool));
    }
    
    // 如果线程池要关闭了,直接返回
    if (pool->shut_down == 1)
    {
        pthread_mutex_unlock(&(pool->lock_pool));
        return;
    }

    // 添加任务
    pool->task_list[pool->task_list_rear].function = func;
    pool->task_list[pool->task_list_rear].arg = arg;
    pool->task_list_size += 1;
    pool->task_list_rear = (pool->task_list_rear + 1) % pool->task_list_cap;
    printf("--------- add %d %ld\n", pool->task_list_size, pthread_self());

    // 通过条件变量通知任务开始处理
    pthread_cond_signal(&(pool->task_list_empty));

    pthread_mutex_unlock(&(pool->lock_pool));
    return;
}


// 销毁一个线程
void thread_exit(ThreadPool* pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->threads_max; i++)
    {
        if (pool->threads_id[i] == tid)
        {
            pool->threads_id[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

void thread_work(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;

    // 工作线程要一直在执行,时时执行
    while (1)
    {
        sleep(1.3);
        pthread_mutex_lock(&(pool->lock_pool));
        // 如果任务列表为空,添加进程阻塞
        while (pool->task_list_size == 0 && pool->shut_down == 0)
        {
            pthread_cond_wait(&(pool->task_list_empty), &(pool->lock_pool));
            if (pool->threads_exit_num > 0)
            {
                pool->threads_exit_num--;
                if (pool->threads_live_num > pool->threads_min)
                {
                    pool->threads_live_num--;
                    pthread_mutex_unlock(&(pool->lock_pool));
                    // 销毁该线程
                    thread_exit(pool);
                }
            }
        }
        
        // 如果线程池要关闭了,直接返回
        if (pool->shut_down == 1)
        {
            pthread_mutex_unlock(&(pool->lock_pool));
            thread_exit(pool);
        }

        // 取任务参数
        TaskLists tmp;
        tmp.function = pool->task_list[pool->task_list_front].function;
        tmp.arg = pool->task_list[pool->task_list_front].arg;
        pool->task_list_size--;
        pool->task_list_front = (pool->task_list_front + 1) % pool->task_list_cap;  // 向后偏移
        
        // 条件变量,通知
        pthread_cond_signal(&pool->task_list_full);
        pthread_mutex_unlock(&(pool->lock_pool));

        // 处理任务
        printf("thread %ld start working...\n", pthread_self());
        pthread_mutex_lock(&(pool->lock_threads_busy_num));
        pool->threads_busy_num += 1;
        pthread_mutex_unlock(&(pool->lock_threads_busy_num));
        tmp.function(tmp.arg);
        free(tmp.arg);
        tmp.arg = NULL;

        printf("thread %ld end working...\n", pthread_self());
        pthread_mutex_lock(&(pool->lock_threads_busy_num));
        pool->threads_busy_num--;
        pthread_mutex_unlock(&(pool->lock_threads_busy_num));

    }
    return;
}

// 管理者线程  管理代码处理
void thread_manage(void* arg)
{
    ThreadPool* pool = (ThreadPool*)arg;

    while (pool->shut_down == 0)
    {
        printf("===========================\n");
        // 每3s检测一次pool的状态
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&(pool->lock_pool));
        int task_list_size = pool->task_list_size;
        int threads_live_num = pool->threads_live_num;
        pthread_mutex_unlock(&pool->lock_pool);

        // 取出工作的线程的数量
        pthread_mutex_lock(&pool->lock_threads_busy_num);
        int threads_busy_num = pool->threads_busy_num;
        pthread_mutex_unlock(&pool->lock_threads_busy_num);

        // 添加线程
        // 任务的个数 > 存活的线程个数 && 存活的线程数 < 最大线程数
        if (task_list_size > threads_live_num && threads_live_num < pool->threads_max)
        {
            pthread_mutex_lock(&(pool->lock_pool));
            int cnt = 0;
            for (int i = 0; i < pool->threads_max && cnt < task_list_size && pool->threads_live_num < pool->threads_max; i++)
            {
                if (pool->threads_id[i] == 0)
                {
                    pthread_create(&(pool->threads_id[i]), NULL, thread_work, pool);
                    cnt += 1;
                    pool->threads_live_num += 1;
                }
            }
            pthread_mutex_unlock(&pool->lock_pool);
        }

        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数
        if (threads_busy_num * 2 < threads_live_num && threads_live_num > pool->threads_min)
        {
            pthread_mutex_lock(&(pool->lock_pool));
            pool->threads_exit_num = threads_busy_num > pool->threads_min ? threads_busy_num : pool->threads_min;
            pthread_mutex_unlock(&(pool->lock_pool));
            // 让工作的线程自杀
            for (int i = 0; i < pool->threads_exit_num; i++)
            {
                pthread_cond_signal(&(pool->task_list_empty));
            }
        }
    }
    return;
}

// 创建线程池
ThreadPool* thread_pool_create(int max, int min, int task_list_cap)
{
    // 申请线程池结构体内容存放的地址
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    if (pool == NULL)
    {
        printf("the pool malloc failed...\n");
        return NULL;
    }

    // 申请线程数组
    pool->threads_id = (pthread_t*)malloc(sizeof(pthread_t) * max);
    if (pool->threads_id == NULL) 
    {
        printf("malloc threadIDs fail...\n");
        return NULL;
    }
    memset(pool->threads_id, 0, sizeof(pthread_t) * max);
    pool->threads_max = max;
    pool->threads_min = min;
    pool->threads_busy_num = 0;
    pool->threads_live_num = min;   // 线程池中最少有min线程在启动,但是有可能不再工作
    pool->threads_exit_num = 0;     // 需要销毁的线程个数;任务少的时候,大多数线程空闲时,需要销毁
    // 创建线程
    for (int i = 0; i < min; i++)
    {
        pthread_create(&(pool->threads_id[i]), NULL, thread_work, pool);
    }

    // 管理者线程
    pool->thread_manage  = (pthread_t*)malloc(sizeof(pthread_t));
    if (pool->thread_manage == NULL) 
    {
        printf("malloc manage thread fail...\n");
        return NULL;
    }
    pthread_create(&(pool->thread_manage), NULL, thread_work, pool);

    // 任务队列初始化
    pool->task_list = (TaskLists*)malloc(sizeof(TaskLists) * task_list_cap);
    if (pool->task_list == NULL)
    {
        printf("the task list init failed...\n");
        return NULL;
    }
    pool->task_list_cap = task_list_cap;
    pool->task_list_front = 0;
    pool->task_list_rear = 0;


    // 初始化锁和条件变量
    if (pthread_mutex_init(&(pool->lock_pool), NULL) != 0 || pthread_mutex_init(&(pool->lock_threads_busy_num), NULL) != 0 ||
        pthread_cond_init(&(pool->task_list_full), NULL) != 0 || pthread_cond_init(&(pool->task_list_empty), NULL) != 0)
    {
        printf("mutex or condition init fail...\n");
        return;
    }
    
    pool->shut_down = 0;
    printf("----------------------- init success.\n");
    return pool;
}

// 销毁线程池
void thread_pool_destory(ThreadPool* pool)
{
    if (pool == NULL)
    {
        return;
    }

    // 关闭线程池
    pool->shut_down = 1;

    // 阻塞回收管理者线程
    pthread_join(&(pool->thread_manage), NULL);

    // 唤醒阻塞的消费者线程
    for (int i = 0; i < pool->threads_live_num; i++)
    {
        pthread_cond_signal(&(pool->task_list_empty));
    }

    // 释放内存,申请的时候是一次申请出来的(一个数组),释放的时候也一样
    if (pool->threads_id != NULL)
    {
        free(pool->threads_id);
    }

    // 释放内存,申请的时候是一次申请出来的,释放的时候也一样
    if (pool->task_list != NULL)
    {
        free(pool->task_list);
    }

    pthread_mutex_lock(&(pool->lock_pool));               /*先锁住再销毁*/
    pthread_mutex_destroy(&(pool->lock_pool));
    pthread_mutex_lock(&(pool->lock_threads_busy_num));
    pthread_mutex_destroy(&(pool->lock_threads_busy_num));
    pthread_cond_destroy(&(pool->task_list_full));
    pthread_cond_destroy(&(pool->task_list_empty));

    // 释放内存
    free(pool);

    return;
}


// 实际的处理函数
void taskFunc(void* arg)
{
    int num = *(int*)arg;
    printf("thread %ld is working, number = %d\n", pthread_self(), num);
}

int main()
{
    ThreadPool* pool = thread_pool_create(10, 3, 100);
    for (int i = 0; i < 50; i++)
    {
        int* num = (int*)malloc(sizeof(int));
        *num = i + 100;
        thread_pool_add(pool, taskFunc, num);
        sleep(2);
    }

    sleep(30);

    thread_pool_destory(pool);

    return 0;
}

结果:

----------------------- init success.
--------- add 1 140196491261696
--------- add 2 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 100
thread 140196491253504 start working...
thread 140196491253504 is working, number = 101
thread 140196491253504 end working...
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 102
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 103
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 104
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 105
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 106
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 107
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 108
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 109
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 110
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 111
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 112
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 113
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 114
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 115
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 116
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 117
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 118
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 119
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 120
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 121
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 122
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 123
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 124
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 125
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 126
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 127
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 128
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 129
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 130
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 131
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 132
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 133
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 134
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 135
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 136
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 137
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 138
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 139
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 140
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 141
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 142
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 143
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 144
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 145
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 146
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 147
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 148
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 149
thread 140196491253504 end working...
threadExit() called, 140196474468096 exiting...

相关推荐

  1. C-线

    2024-03-15 08:50:04       21 阅读
  2. linux C 线

    2024-03-15 08:50:04       35 阅读
  3. 线设计---C++

    2024-03-15 08:50:04       24 阅读
  4. C++11 Thead线线

    2024-03-15 08:50:04       17 阅读
  5. C++同异步极致线

    2024-03-15 08:50:04       42 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-15 08:50:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-15 08:50:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-15 08:50:04       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-15 08:50:04       20 阅读

热门阅读

  1. react03

    react03

    2024-03-15 08:50:04      19 阅读
  2. 力扣大厂热门面试算法题 27-29

    2024-03-15 08:50:04       19 阅读
  3. 【MySQL 系列】MySQL 引擎篇

    2024-03-15 08:50:04       20 阅读
  4. c# 新增一条数据

    2024-03-15 08:50:04       23 阅读
  5. Retelling|Facebook1

    2024-03-15 08:50:04       20 阅读
  6. 第2周 Python列表、元组刷题

    2024-03-15 08:50:04       22 阅读
  7. 缓存和数据库更新的先后处理方案

    2024-03-15 08:50:04       23 阅读
  8. 小程序开发——获取设备信息 API(四)

    2024-03-15 08:50:04       19 阅读
  9. Quartz项目实际使用

    2024-03-15 08:50:04       18 阅读
  10. SQL笔记 -- 黑马程序员

    2024-03-15 08:50:04       21 阅读