1、介绍
一个线程池由三部分组成:
(1)管理者线程,用于实时判断线程池中的任务个数、线程个数;并根据两者之间的变化来创建或销毁线程,与任务队列达到平衡。对应于示例中的thread_manage()函数。
(2)工作线程,线程池中线程,个数不定,在没有任务时处于等待状态,可以循环的执行任务。对应于示例中的thread_work()函数。
(3)任务队列,用于存放没有处理的任务,链表、数组等结构存储。
2、示例
代码参考:
C语言实现简单的线程池项目(附完整工程文件)_线程池项目文档-CSDN博客
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
typedef struct TaskiLists
{
void (*function)(void* arg);
void* arg;
}TaskLists;
struct ThreadPool
{
// 任务队列
TaskLists* task_list;
// 任务队列的容量
int task_list_cap;
// 当前任务队列中任务的数量
int task_list_size;
// 任务队列队头数据
int task_list_front;
// 任务队列队尾数据
int task_list_rear;
// 线程池中线程队列
pthread_t* threads_id;
// 线程池中线程的最大数量
int threads_max;
// 线程池中线程的最小数量
int threads_min;
// 工作的线程个数
int threads_busy_num;
// 存活的线程个数
int threads_live_num;
// 要销毁的线程个数
int threads_exit_num;
// 管理者线程
pthread_t thread_manage;
// 锁 锁住整个线程池
pthread_mutex_t lock_pool;
// 锁 锁住threads_busy_num变量
pthread_mutex_t lock_threads_busy_num;
// 条件变量 task_list是不是达到最大容量
pthread_cond_t task_list_full;
// 条件变量 task_list是不是为空
pthread_cond_t task_list_empty;
// 是否销毁线程,销毁=1;不销毁=0
int shut_down;
};
typedef struct ThreadPool ThreadPool;
//############################################################################################################
//############################################################################################################
// 给线程池添加任务
void thread_pool_add(ThreadPool* pool, void(*func)(void*), void* arg)
{
pthread_mutex_lock(&(pool->lock_pool));
// 如果任务列表已满,添加进程阻塞
while (pool->task_list_size == pool->task_list_cap && pool->shut_down == 0)
{
pthread_cond_wait(&(pool->task_list_full), &(pool->lock_pool));
}
// 如果线程池要关闭了,直接返回
if (pool->shut_down == 1)
{
pthread_mutex_unlock(&(pool->lock_pool));
return;
}
// 添加任务
pool->task_list[pool->task_list_rear].function = func;
pool->task_list[pool->task_list_rear].arg = arg;
pool->task_list_size += 1;
pool->task_list_rear = (pool->task_list_rear + 1) % pool->task_list_cap;
printf("--------- add %d %ld\n", pool->task_list_size, pthread_self());
// 通过条件变量通知任务开始处理
pthread_cond_signal(&(pool->task_list_empty));
pthread_mutex_unlock(&(pool->lock_pool));
return;
}
// 销毁一个线程
void thread_exit(ThreadPool* pool)
{
pthread_t tid = pthread_self();
for (int i = 0; i < pool->threads_max; i++)
{
if (pool->threads_id[i] == tid)
{
pool->threads_id[i] = 0;
printf("threadExit() called, %ld exiting...\n", tid);
break;
}
}
pthread_exit(NULL);
}
void thread_work(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
// 工作线程要一直在执行,时时执行
while (1)
{
sleep(1.3);
pthread_mutex_lock(&(pool->lock_pool));
// 如果任务列表为空,添加进程阻塞
while (pool->task_list_size == 0 && pool->shut_down == 0)
{
pthread_cond_wait(&(pool->task_list_empty), &(pool->lock_pool));
if (pool->threads_exit_num > 0)
{
pool->threads_exit_num--;
if (pool->threads_live_num > pool->threads_min)
{
pool->threads_live_num--;
pthread_mutex_unlock(&(pool->lock_pool));
// 销毁该线程
thread_exit(pool);
}
}
}
// 如果线程池要关闭了,直接返回
if (pool->shut_down == 1)
{
pthread_mutex_unlock(&(pool->lock_pool));
thread_exit(pool);
}
// 取任务参数
TaskLists tmp;
tmp.function = pool->task_list[pool->task_list_front].function;
tmp.arg = pool->task_list[pool->task_list_front].arg;
pool->task_list_size--;
pool->task_list_front = (pool->task_list_front + 1) % pool->task_list_cap; // 向后偏移
// 条件变量,通知
pthread_cond_signal(&pool->task_list_full);
pthread_mutex_unlock(&(pool->lock_pool));
// 处理任务
printf("thread %ld start working...\n", pthread_self());
pthread_mutex_lock(&(pool->lock_threads_busy_num));
pool->threads_busy_num += 1;
pthread_mutex_unlock(&(pool->lock_threads_busy_num));
tmp.function(tmp.arg);
free(tmp.arg);
tmp.arg = NULL;
printf("thread %ld end working...\n", pthread_self());
pthread_mutex_lock(&(pool->lock_threads_busy_num));
pool->threads_busy_num--;
pthread_mutex_unlock(&(pool->lock_threads_busy_num));
}
return;
}
// 管理者线程 管理代码处理
void thread_manage(void* arg)
{
ThreadPool* pool = (ThreadPool*)arg;
while (pool->shut_down == 0)
{
printf("===========================\n");
// 每3s检测一次pool的状态
sleep(3);
// 取出线程池中任务的数量和当前线程的数量
pthread_mutex_lock(&(pool->lock_pool));
int task_list_size = pool->task_list_size;
int threads_live_num = pool->threads_live_num;
pthread_mutex_unlock(&pool->lock_pool);
// 取出工作的线程的数量
pthread_mutex_lock(&pool->lock_threads_busy_num);
int threads_busy_num = pool->threads_busy_num;
pthread_mutex_unlock(&pool->lock_threads_busy_num);
// 添加线程
// 任务的个数 > 存活的线程个数 && 存活的线程数 < 最大线程数
if (task_list_size > threads_live_num && threads_live_num < pool->threads_max)
{
pthread_mutex_lock(&(pool->lock_pool));
int cnt = 0;
for (int i = 0; i < pool->threads_max && cnt < task_list_size && pool->threads_live_num < pool->threads_max; i++)
{
if (pool->threads_id[i] == 0)
{
pthread_create(&(pool->threads_id[i]), NULL, thread_work, pool);
cnt += 1;
pool->threads_live_num += 1;
}
}
pthread_mutex_unlock(&pool->lock_pool);
}
// 销毁线程
// 忙的线程*2 < 存活的线程数 && 存活的线程 > 最小线程数
if (threads_busy_num * 2 < threads_live_num && threads_live_num > pool->threads_min)
{
pthread_mutex_lock(&(pool->lock_pool));
pool->threads_exit_num = threads_busy_num > pool->threads_min ? threads_busy_num : pool->threads_min;
pthread_mutex_unlock(&(pool->lock_pool));
// 让工作的线程自杀
for (int i = 0; i < pool->threads_exit_num; i++)
{
pthread_cond_signal(&(pool->task_list_empty));
}
}
}
return;
}
// 创建线程池
ThreadPool* thread_pool_create(int max, int min, int task_list_cap)
{
// 申请线程池结构体内容存放的地址
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
if (pool == NULL)
{
printf("the pool malloc failed...\n");
return NULL;
}
// 申请线程数组
pool->threads_id = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threads_id == NULL)
{
printf("malloc threadIDs fail...\n");
return NULL;
}
memset(pool->threads_id, 0, sizeof(pthread_t) * max);
pool->threads_max = max;
pool->threads_min = min;
pool->threads_busy_num = 0;
pool->threads_live_num = min; // 线程池中最少有min线程在启动,但是有可能不再工作
pool->threads_exit_num = 0; // 需要销毁的线程个数;任务少的时候,大多数线程空闲时,需要销毁
// 创建线程
for (int i = 0; i < min; i++)
{
pthread_create(&(pool->threads_id[i]), NULL, thread_work, pool);
}
// 管理者线程
pool->thread_manage = (pthread_t*)malloc(sizeof(pthread_t));
if (pool->thread_manage == NULL)
{
printf("malloc manage thread fail...\n");
return NULL;
}
pthread_create(&(pool->thread_manage), NULL, thread_work, pool);
// 任务队列初始化
pool->task_list = (TaskLists*)malloc(sizeof(TaskLists) * task_list_cap);
if (pool->task_list == NULL)
{
printf("the task list init failed...\n");
return NULL;
}
pool->task_list_cap = task_list_cap;
pool->task_list_front = 0;
pool->task_list_rear = 0;
// 初始化锁和条件变量
if (pthread_mutex_init(&(pool->lock_pool), NULL) != 0 || pthread_mutex_init(&(pool->lock_threads_busy_num), NULL) != 0 ||
pthread_cond_init(&(pool->task_list_full), NULL) != 0 || pthread_cond_init(&(pool->task_list_empty), NULL) != 0)
{
printf("mutex or condition init fail...\n");
return;
}
pool->shut_down = 0;
printf("----------------------- init success.\n");
return pool;
}
// 销毁线程池
void thread_pool_destory(ThreadPool* pool)
{
if (pool == NULL)
{
return;
}
// 关闭线程池
pool->shut_down = 1;
// 阻塞回收管理者线程
pthread_join(&(pool->thread_manage), NULL);
// 唤醒阻塞的消费者线程
for (int i = 0; i < pool->threads_live_num; i++)
{
pthread_cond_signal(&(pool->task_list_empty));
}
// 释放内存,申请的时候是一次申请出来的(一个数组),释放的时候也一样
if (pool->threads_id != NULL)
{
free(pool->threads_id);
}
// 释放内存,申请的时候是一次申请出来的,释放的时候也一样
if (pool->task_list != NULL)
{
free(pool->task_list);
}
pthread_mutex_lock(&(pool->lock_pool)); /*先锁住再销毁*/
pthread_mutex_destroy(&(pool->lock_pool));
pthread_mutex_lock(&(pool->lock_threads_busy_num));
pthread_mutex_destroy(&(pool->lock_threads_busy_num));
pthread_cond_destroy(&(pool->task_list_full));
pthread_cond_destroy(&(pool->task_list_empty));
// 释放内存
free(pool);
return;
}
// 实际的处理函数
void taskFunc(void* arg)
{
int num = *(int*)arg;
printf("thread %ld is working, number = %d\n", pthread_self(), num);
}
int main()
{
ThreadPool* pool = thread_pool_create(10, 3, 100);
for (int i = 0; i < 50; i++)
{
int* num = (int*)malloc(sizeof(int));
*num = i + 100;
thread_pool_add(pool, taskFunc, num);
sleep(2);
}
sleep(30);
thread_pool_destory(pool);
return 0;
}
结果:
----------------------- init success.
--------- add 1 140196491261696
--------- add 2 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 100
thread 140196491253504 start working...
thread 140196491253504 is working, number = 101
thread 140196491253504 end working...
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 102
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 103
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 104
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 105
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 106
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 107
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 108
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 109
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 110
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 111
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 112
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 113
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 114
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 115
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 116
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 117
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 118
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 119
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 120
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 121
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 122
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 123
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 124
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 125
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 126
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 127
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 128
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 129
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 130
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 131
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 132
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 133
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 134
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 135
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 136
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 137
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 138
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 139
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 140
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 141
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 142
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 143
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 144
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 145
thread 140196491253504 end working...
--------- add 1 140196491261696
thread 140196466075392 start working...
thread 140196466075392 is working, number = 146
thread 140196466075392 end working...
--------- add 1 140196491261696
thread 140196474468096 start working...
thread 140196474468096 is working, number = 147
thread 140196474468096 end working...
--------- add 1 140196491261696
thread 140196482860800 start working...
thread 140196482860800 is working, number = 148
thread 140196482860800 end working...
--------- add 1 140196491261696
thread 140196491253504 start working...
thread 140196491253504 is working, number = 149
thread 140196491253504 end working...
threadExit() called, 140196474468096 exiting...