什么是线程池:
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
线程池示例:
- 创建固定数量线程池,循环从任务队列中获取任务对象
- 获取到任务对象后,执行任务对象中的任务接口
锁设计
采用RAII风格的加锁方式,用于保护在出了临界区后,忘记解锁的情况。
Mutex 类:
- Mutex 类封装了 pthread_mutex_t 互斥锁。它包含 lock 和 unlock 方法,分别用于加锁和解锁。
- 构造函数接收一个指向 pthread_mutex_t 的指针,并将其存储在私有成员 _pmtx 中。
lockGuard 类:
- lockGuard 类是 RAII 风格的锁保护类。在构造函数中,它接收一个 pthread_mutex_t 指针并使用 Mutex 类将锁住。
- 在析构函数中,它解锁互斥锁。这样,当 lockGuard 对象超出范围(超出了作用域),它的析构函数将确保互斥锁被正确解锁,即使在发生异常的情况下也是如此。
#include <iostream>
#include <pthread.h>
class Mutex{
public:
Mutex(pthread_mutex_t *mtx)
:_pmtx(mtx)
{}
void lock()
{
pthread_mutex_lock(_pmtx);
}
void unlock()
{
pthread_mutex_unlock(_pmtx);
}
~Mutex()
{}
private:
pthread_mutex_t *_pmtx;
};
// RAII风格的加锁方式
class lockGuard
{
public:
lockGuard(pthread_mutex_t *mtx)
:_mtx(mtx)
{
_mtx.lock();
}
~lockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
线程封装
把线程进行封装,用于创建和管理线程。
主要的类和相关功能:
ThreadData 类:
- 存储线程的参数和名称,包含 _args 和 _name 成员变量。
Thread 类:
- 构造函数接受线程编号、回调函数指针和线程参数,生成一个线程名称,并初始化了 _func 和 _tdata 。
- start 方法用于创建线程,调用 pthread_create。
- join 方法等待线程的结束,调用 pthread_join。name 方法返回线程的名称。
#include <iostream>
#include <string>
#include <functional>
#include <cstdio>
typedef void*(*fun_t)(void*);
class ThreadData
{
public:
void* _args;
std::string _name;
};
class Thread
{
public:
Thread(int num, fun_t callback, void* args)
:_func(callback)
{
char nameBuffer[64];
snprintf(nameBuffer, sizeof nameBuffer, "thread-%d", num);
_name = nameBuffer;
_tdata._name = _name;
_tdata._args = args;
}
void start()
{
pthread_create(&_tid, nullptr, _func, (void*)&_tdata);
}
void join()
{
pthread_join(_tid, nullptr);
}
std::string name()
{
return _name;
}
~Thread()
{}
private:
std::string _name; // 线程号
fun_t _func; // 仿函数
ThreadData _tdata;
pthread_t _tid; // 线程标识符
};
线程池
- getThreadPool 函数是获取线程池单例的静态方法。使用了双检锁机制(Double-Check Locking)确保在多线程环境下只创建一个实例。
- routine 是线程的执行函数,其中使用了 lockGuard 类来实现 RAII 风格的加锁和解锁。在 routine 中,线程不断从任务队列中取出任务执行。
- PushTask 用于向任务队列中添加任务,并通过条件变量 pthread_cond_signal 通知等待中的线程有新任务。
- run 方法启动所有线程。
- 在构造函数中初始化了互斥锁 pthread_mutex_init 和条件变量 pthread_cond_init。
- 析构函数负责销毁线程池中的线程、互斥锁和条件变量。
条件变量:void waitCond()
方法使用了条件变量 pthread_cond_wait 函数,该函数的作用是使当前线程阻塞,等待条件变量被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 激活。这个函数会释放 lock 互斥锁,允许其他线程在执行 pthread_cond_signal 或 pthread_cond_broadcast 时获得锁。
具体步骤如下:
- 当前线程调用 pthread_cond_wait 时,它会释放 lock,使得其他线程可以进入临界区。
- 当其他线程执行 pthread_cond_signal 或 pthread_cond_broadcast 时,被阻塞的线程会重新获得 lock。
- 被重新唤醒的线程会重新检查条件。如果条件满足,它将继续执行;否则,它将再次进入等待状态。
这种机制通常用于线程之间的同步,其中一个线程在满足某个条件时通知其他线程继续执行。在这个线程池的实现中,waitCond 用于在任务队列为空时阻塞线程,直到有新的任务到来。
#include <iostream>
#include <vector>
#include <string>
#include <queue>
#include <unistd.h>
#include "lockGuard.hpp"
#include "thread.hpp"
#include "log.hpp"
const int g_thread_num = 10;
// 本质:生产者消费者模型
template<class T>
class ThreadPool
{
public:
// 加锁
pthread_mutex_t* getMutex()
{
return &lock;
}
// 判断任务队列是否为空
bool isEmpty()
{
return _task_queue.empty();
}
// 调用条件变量
void waitCond()
{
pthread_cond_wait(&cond, &lock);
}
// 获取一个任务
T getTask()
{
T t = _task_queue.front();
_task_queue.pop();
return t;
}
private:
ThreadPool(int thread_num = g_thread_num)
:_num(thread_num)
{
for(int i=1; i <= _num; i++)
{
_threads.push_back(new Thread(i, routine, this));
}
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
}
// 禁用拷贝构造和拷贝赋值
ThreadPool(const ThreadPool<T> &other) = delete;
const ThreadPool<T> operator=(const ThreadPool<T> &other) = delete;
public:
static ThreadPool<T> *getThreadPool(int num = g_thread_num)
{
// 可以有效减少未来必定要进行枷锁检测的问题
// 拦截大量的再已经创建好单例的时候,剩余线程请求单例的而直接访问所的行为
if(nullptr == thread_ptr)
{
lockGuard lockguard(&mutex);
if(thread_ptr == nullptr)
{
thread_ptr = new ThreadPool<T>(num);
}
}
return thread_ptr;
}
void run()
{
// 启动所有线程
for(auto &iter : _threads)
{
iter->start();
logMessage(NORMAL, "%s %s", iter->name().c_str(), "启动成功");
}
}
// 从任务队列中获取任务
static void* routine(void* args)
{
ThreadData *td = (ThreadData*)args;
ThreadPool<T> *tp = (ThreadPool<T>*)td->_args;
while(true)
{
T task;
{
lockGuard lockGuard(tp->getMutex());
while(tp->isEmpty())
{
tp->waitCond();
}
// 获取任务
task = tp->getTask();
}
task(td->_name);
}
}
// 添加任务到任务队列
void PushTask(const T &task)
{
lockGuard lockguard(&lock);
_task_queue.push(task);
pthread_cond_signal(&cond);
}
~ThreadPool()
{
for(auto &iter : _threads)
{
delete iter;
}
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::vector<Thread*> _threads;
int _num;
std::queue<T> _task_queue;
static ThreadPool<T> *thread_ptr;
static pthread_mutex_t mutex;
pthread_mutex_t lock;
pthread_cond_t cond;
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::thread_ptr = nullptr;
template <typename T>
pthread_mutex_t ThreadPool<T>::mutex = PTHREAD_MUTEX_INITIALIZER;
// 这样的定义确保了每个模板实例都有自己的静态成员,而不是共享同一个。
// PTHREAD_MUTEX_INITIALIZER,这是一个宏,用于初始化一个互斥锁。