【线程系列之五】线程池介绍C语言

一、基本概念

1.1 概念

线程池(Thread Pool)是一种基于池化技术管理线程的机制,旨在减少线程创建和销毁的开销,提高系统资源的利用率,以及更好地控制系统中同时运行的线程数量。线程池通过预先创建一定数量的线程,并将这些线程放入一个“池”中,当有新任务到来时,不是立即创建新线程来执行,而是从线程池中取出一个空闲的线程来执行该任务。如果所有线程都在忙碌,则新任务会等待直到有线程变得空闲。

在C语言中,由于标准库(如C89/C99/C11等)不支持线程或线程池,因此通常需要使用第三方库如POSIX线程(pthread)来实现线程池。

1.2 应用场景【C语言】

C语言中的线程池应用场景与在其他编程语言中类似,主要包括以下几类:

  • 高并发服务器: 在网络服务器中处理大量客户端请求。每个请求可以分配给一个线程池中的线程进行处理,以提高响应速度和吞吐量。

  • 数据处理和计算密集型任务: 当需要处理大量数据或执行复杂的计算时,可以将任务分配到线程池中并行执行,以缩短总体执行时间。

  • 资源密集型任务: 对于需要频繁访问共享资源(如数据库、文件系统等)的任务,使用线程池可以减少线程创建和销毁的开销,并可能通过更高效的资源管理来提高性能。

  • 异步操作: 在需要执行非阻塞操作(如异步I/O、异步网络请求等)时,线程池可以用来处理这些异步操作的结果或回调。

  • 定时任务和周期性任务: C语言本身不直接支持定时器和周期性任务,但你可以使用线程池中的线程来模拟这些功能,通过轮询或睡眠机制来检查时间并执行相应的任务。

1.3 实现线程池步骤
  • 定义线程池结构: 包括线程数量、任务队列、互斥锁、条件变量等;

  • 实现任务队列: 用于存储待执行的任务;

  • 编写线程工作函数: 该函数将被多个线程同时执行,并从任务队列中取出任务进行处理;

  • 初始化和管理线程池: 包括创建线程、启动线程、销毁线程等操作;

  • 添加任务到线程池: 提供接口将新任务添加到任务队列中,并通知等待的线程。

由于C语言相对底层,因此实现这些功能需要更多的手动编码和对系统资源的管理。此外,你还需要考虑线程安全和性能优化等问题,第三方库可在C++、python中查找,避免从头实现线程池。

下文将具体说明C语言实现线程池的代码。

二、实现

2.1 定义线程池结构

首先,定义一个包含线程池所需所有信息的结构体,如线程数组、任务队列、互斥锁、条件变量等。

#include <pthread.h>  
#include <stdbool.h>  
#include <stdlib.h> 

#define MAX_THREADS 4 
  
// 任务队列节点  
typedef struct task {  
    void (*function)(void*);  
    void* arg;  
    struct task* next;  
} task_t;  
  
// 线程池结构体  
typedef struct {  
    pthread_t threads[MAX_THREADS]; // 线程数组  
    pthread_mutex_t queue_mutex;    // 保护任务队列的互斥锁  
    pthread_cond_t queue_cond;      // 任务队列的条件变量  
    bool stop;                      // 线程池停止标志  
    task_t* head;                   // 任务队列头指针  
    task_t* tail;                   // 任务队列尾指针  
    int thread_count;               // 当前活跃线程数  
    int active_threads;             // 最大活跃线程数(可配置)  
} threadpool_t;
2.2 初始化线程池

实现一个函数来初始化线程池,包括创建线程、初始化同步等。

void* worker(void* arg) {  
    threadpool_t* pool = (threadpool_t*)arg;  
  
    while (true) {  
        pthread_mutex_lock(&pool->queue_mutex);  
  
        // 如果线程池已停止且没有任务,则退出循环  
        if (pool->stop && pool->head == NULL) {  
            pthread_mutex_unlock(&pool->queue_mutex);  
            break;  
        }  
  
        // 等待任务或线程池停止信号  
        while (!pool->stop && pool->head == NULL && pool->thread_count >= pool->active_threads) {  
            pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);  
        }  
  
        // 取出任务(如果线程池未停止且队列不为空)  
        task_t* task = NULL;  
        if (!pool->stop && pool->head != NULL) {  
            task = pool->head;  
            pool->head = task->next;  
            if (pool->head == NULL) {  
                pool->tail = NULL;  
            }  
            pool->thread_count--;  
        }  
  
        pthread_mutex_unlock(&pool->queue_mutex);  
  
        // 执行任务(如果任务不为空)  
        if (task != NULL) {  
            (*(task->function))(task->arg);  
            free(task); // 释放任务节点内存(如果任务节点是动态分配的)  
        }  
    }  
  
    return NULL;  
}  
  
void threadpool_init(threadpool_t* pool, int active_threads) {  
    pool->stop = false;  
    pool->head = pool->tail = NULL;  
    pool->thread_count = 0;  
    pool->active_threads = active_threads;  
  
    pthread_mutex_init(&pool->queue_mutex, NULL);  
    pthread_cond_init(&pool->queue_cond, NULL);  
  
    for (int i = 0; i < active_threads; i++) {  
        pthread_create(&pool->threads[i], NULL, worker, pool);  
    }  
}
2.3 添加任务到线程池

实现一个函数来向线程池的任务队列中添加任务,并唤醒一个等待的线程(如果有的话)。

void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {  
    task_t* new_task = (task_t*)malloc(sizeof(task_t));  
    new_task->function = function;  
    new_task->arg = arg;  
    new_task->next = NULL;  
  
    pthread_mutex_lock(&pool->queue_mutex);  
  
    if (pool->tail == NULL) {  
        pool->head = pool->tail = new_task;  
    } else {  
        pool->tail->next = new_task;  
        pool->tail = new_task;  
    }  
  
    pthread_cond_signal(&pool->queue_cond);  
    pthread_mutex_unlock(&pool->queue_mutex);  
}
2.4 销毁线程池

实现一个函数来停止所有线程并销毁线程池。

void threadpool_destroy(threadpool_t* pool) {  
    pool->stop = true;  
  
    pthread_cond_broadcast(&pool->queue_cond);  
  
    for (int i = 0; i < MAX_THREADS; i++) {  
        pthread_join(pool->threads[i], NULL);  
    }  
  
    pthread_mutex_destroy(&pool->queue_mutex);  
    pthread_cond_destroy(&pool->queue_cond);  
}

三、完整简单示例

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>
#include <unistd.h>

#define MAX_THREADS 5

typedef struct task {
    void (*function)(void*);
    void* arg;
    struct task* next;
} task_t;

typedef struct {
    pthread_t threads[MAX_THREADS];
    pthread_mutex_t queue_mutex;
    pthread_cond_t queue_cond;
    bool stop;
    task_t* head;
    task_t* tail;
    int thread_count;
    int active_threads;
} threadpool_t;

void* worker(void* arg) {
    threadpool_t* pool = (threadpool_t*)arg;

    while (true) {
        pthread_mutex_lock(&pool->queue_mutex);

        while (pool->head == NULL && !pool->stop) {
            pthread_cond_wait(&pool->queue_cond, &pool->queue_mutex);
        }

        if (pool->stop && pool->head == NULL) {
            pthread_mutex_unlock(&pool->queue_mutex);
            break;
        }

        task_t* task = pool->head;
        if (task != NULL) {
            pool->head = task->next;
            if (pool->head == NULL) {
                pool->tail = NULL;
            }
        }

        pthread_mutex_unlock(&pool->queue_mutex);

        if (task != NULL) {
            (*(task->function))(task->arg);
            free(task); // 释放任务节点内存
        }
        sleep(1);
    }

    return NULL;
}

void threadpool_init(threadpool_t* pool, int active_threads) {
    pool->stop = false;
    pool->head = pool->tail = NULL;
    pool->thread_count = 0;
    pool->active_threads = active_threads;

    pthread_mutex_init(&pool->queue_mutex, NULL);
    pthread_cond_init(&pool->queue_cond, NULL);

    for (int i = 0; i < active_threads; ++i) {
        pthread_create(&pool->threads[i], NULL, worker, pool);
    }
}

void threadpool_add_task(threadpool_t* pool, void (*function)(void*), void* arg) {
    task_t* new_task = (task_t*)malloc(sizeof(task_t));
    new_task->function = function;
    new_task->arg = arg;
    new_task->next = NULL;

    pthread_mutex_lock(&pool->queue_mutex);

    if (pool->tail == NULL) {
        pool->head = pool->tail = new_task;
    } else {
        pool->tail->next = new_task;
        pool->tail = new_task;
    }

    pthread_cond_signal(&pool->queue_cond);
    pthread_mutex_unlock(&pool->queue_mutex);
}

void threadpool_destroy(threadpool_t* pool) {
    pool->stop = true;

    pthread_mutex_lock(&pool->queue_mutex);
    pthread_cond_broadcast(&pool->queue_cond);
    pthread_mutex_unlock(&pool->queue_mutex);

    for (int i = 0; i < pool->active_threads; ++i) {
        pthread_join(pool->threads[i], NULL);
        printf("%d\r\n", i);
    }

    pthread_mutex_destroy(&pool->queue_mutex);
    pthread_cond_destroy(&pool->queue_cond);
}

// 示例任务函数
void example_task(void* arg) {
    int task_id = *(int*)arg;
    printf("Task %d is executing\n", task_id);
    free(arg); // 释放参数内存
    sleep(1);  // 模拟任务执行时间
}

int main() {
    threadpool_t pool;
    threadpool_init(&pool, 2); // 初始化线程池,最大活跃线程数为2

    // 添加示例任务
    for (int i = 0; i < MAX_THREADS; ++i) {
        int* arg = (int*)malloc(sizeof(int));
        *arg = i;
        threadpool_add_task(&pool, example_task, arg);
    }

    // 等待一段时间,观察任务执行情况
    sleep(5);

    // 销毁线程池
    threadpool_destroy(&pool);

    return 0;
}

运行结果为:

在这里插入图片描述

这段代码演示了一个简单的线程池的使用方式。在主函数中,我们初始化了一个线程池(最大活跃线程数为2),然后添加了5个示例任务(每个任务执行时间模拟为1秒)。在任务执行完毕后,通过调用 threadpool_destroy 函数来销毁线程池,确保所有任务都被执行完毕。

注意:这里销毁的时候保证知道数组中有多少个有效的线程ID,进而销毁。

可在结构体中添加pool->num_threads ,即用来保存线程池中线程数量的成员变量,应该在初始化线程池时进行设置,并在后续操作中根据需要使用,则不会出现在销毁时越界或死锁。

这个示例展示了如何使用线程池来管理并发执行的任务,以及如何通过互斥锁和条件变量来实现线程安全的任务队列操作。

相关推荐

  1. 线介绍

    2024-07-17 02:04:01       22 阅读
  2. C-线

    2024-07-17 02:04:01       35 阅读
  3. C#线介绍及应用

    2024-07-17 02:04:01       20 阅读
  4. ElasticSearch线

    2024-07-17 02:04:01       61 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-17 02:04:01       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-17 02:04:01       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-17 02:04:01       58 阅读
  4. Python语言-面向对象

    2024-07-17 02:04:01       69 阅读

热门阅读

  1. 记录第一次因为数据库事务产生的BUG

    2024-07-17 02:04:01       18 阅读
  2. 量化机器人如何提升交易透明度?

    2024-07-17 02:04:01       23 阅读
  3. Flutter基本概念&常用命名

    2024-07-17 02:04:01       23 阅读
  4. AI对开发者的影响:重塑技能、职业与生活

    2024-07-17 02:04:01       24 阅读
  5. CloudCone服务器2核1G一年只需15刀

    2024-07-17 02:04:01       19 阅读
  6. zookeeper+kafka消息队列群集部署

    2024-07-17 02:04:01       20 阅读
  7. Webflux中的订阅关系

    2024-07-17 02:04:01       20 阅读