文章目录
关于线程池
线程池是一种用于管理和复用一组工作线程的技术,目的就是提高程序性能和资源利用率,特别是在处理大量短时间任务时。简单来说,线程池就是预先创建一批线程,在有任务时就将其分配给某个线程执行,当线程执行完自己的任务之后并不会销毁而是等待下一个任务。这样就避免了频繁创建和销毁线程的大量开销。
线程池的主要原理和概念
- 工作线程:在线程池中创建一批线程,这些线程等待并处理任务
- 任务队列:待处理的任务队列。当有新任务时,任务就会被放入队列中等待执行
- 线程池管理器:管理线程池中线程的生命周期,处理任务调度、线程创建和销毁等
实现线程池
- PthreadPool.hpp文件定义了一个线程池模板,可根据数据类型实例化成线程池对象。线程池对象里面有一个任务队列和一个线程容器,线程会不断地从任务队列里面读取任务并执行任务,如果没有读到任务那就等待条件变量,直到有任务入队列之后唤醒。
#pragma once
#include <iostream>
#include <unistd.h>
#include <string>
#include <vector>
#include <queue>
#include <pthread.h>
#include <functional>
#include "Thread.hpp"
#include "Task.hpp"
using namespace ThreadMoudle;
static const int gdefaultnum = 5; // 默认线程数量
void test(string &str)
{
while (true)
{
cout << str << " helloworld" << endl;
sleep(1);
}
}
template <class T>
class ThreadPool
{
private:
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
void Wakeup() // 唤醒某个线程
{
pthread_cond_signal(&_cond);
}
void WakeupAll()
{ // 唤醒所有在等待的线程
pthread_cond_broadcast(&_cond);
}
void Sleep() // 调用该函数的线程等待条件变量
{
pthread_cond_wait(&_cond, &_mutex);
}
bool IsEmpty()
{
return _task_queue.empty();
}
void HandlerTask(const string &name) // 线程池中线程的执行逻辑
{
while (true)
{
// 取任务
LockQueue();
while (IsEmpty() && _isrunning)
{
_sleep_thread_num++;
Sleep();
_sleep_thread_num--;
}
// 判断此时是否需要停止
if (IsEmpty() && !_isrunning) // 此时没有任务且需要退出
{
cout << name << " quit" << endl;
UnlockQueue();
break;
}
T t = _task_queue.front(); // 取出一个任务对象
_task_queue.pop();
UnlockQueue();
t(); // 处理任务
cout << name << " : " << t.result() << endl;
}
}
public:
ThreadPool(int thread_num = gdefaultnum) // 初始化线程池
: _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void Init() // 初始化线程容器里的线程,赋予每个线程名字以及处理逻辑函数
{
func_t func = bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); // 绑定HandlerTask函数的第一个参数
for (int i = 0; i < _thread_num; i++)
{
string threadname = "thread-" + to_string(i + 1);
_threads.emplace_back(threadname, func);
}
}
void Start() // 启动线程池里的所有线程,此时所有线程开始执行HandlerTask函数
{
_isrunning = true;
for (auto &thread : _threads)
{
thread.Start();
}
}
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll(); // 唤醒所有线程赶紧把剩下的任务处理了
UnlockQueue();
}
void Push(const T &in)
{
LockQueue();
if (_isrunning) // 如果线程池要停止,那就不要在生产任务了
{
_task_queue.push(in); // 生产了一个任务
if (_sleep_thread_num > 0)
{
Wakeup(); // 唤醒一个等待的线程
}
}
UnlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _thread_num;
vector<Thread> _threads; // 线程组
queue<T> _task_queue; // 任务队列
bool _isrunning; // 运行状态
int _sleep_thread_num; // 休眠
pthread_mutex_t _mutex;
pthread_cond_t _cond; // 等待队列,没有收到任务的线程就等待
};
- Thread.hpp文件,封装了一个线程类对象,方便线程池管理,每个线程对象都有自己的名字和自己的执行函数。
#ifndef _MYTHREAD
#define _MYTHREAD
#include <iostream>
#include <pthread.h>
#include <string>
#include <functional>
using namespace std;
namespace ThreadMoudle
{
using func_t = function<void(const string &)>;
class Thread
{
public:
void Excute() // 线程执行任务
{
cout << _name << " is running" << endl;
_isrunning = true;
_func(_name);
_isrunning = false;
}
public:
Thread(const string &name, func_t func)
: _name(name), _func(func)
{
cout << "create " << _name << " done" << endl;
}
static void *ThreadRoutine(void *arg)
{ // 新线程都会先执行这个函数
Thread *self = static_cast<Thread *>(arg);
self->Excute();
return nullptr;
}
bool Start() // 线程开始启动
{
int res = pthread_create(&_tid, nullptr, ThreadRoutine, this);
if (res != 0)
return false;
return true;
}
void Stop()
{
if (_isrunning)
{
pthread_cancel(_tid);
_isrunning = false;
cout << _name << " Stop" << endl;
}
}
void Join()
{
pthread_join(_tid, nullptr);
cout << _name << " is Joined" << endl;
}
string Getname()
{
return _name;
}
~Thread()
{
Stop();
}
private:
string _name; // 线程名
pthread_t _tid; // 线程ID
bool _isrunning; // 是否在执行任务
func_t _func; // 要执行的回调函数
};
};
#endif
- main.cpp文件,定义线程池对象,主线程发布任务到线程池中的任务队列,再由线程池中的线程完成。
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <unistd.h>
int main()
{
ThreadPool<Task> *tp = new ThreadPool<Task>();
tp->Init();
tp->Start();
int cnt = 10;
srand(time(nullptr));
while (cnt)
{
// 主线程安排任务给线程池
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
Task t(x, y);
tp->Push(t);
sleep(1);
cout << "cnt: " << cnt-- << endl;
}
tp->Stop();
cout << "threadpoll is end" << endl;
sleep(10);
return 0;
}
- Task.hpp 文件,定义了一个任务类模板,执行两个数的加法
#pragma once
#include <iostream>
#include <functional>
// 要做加法
class Task
{
public:
Task()
{
}
Task(int x, int y) : _x(x), _y(y)
{
}
void Excute()
{
_result = _x + _y;
}
void operator()()
{
Excute();
}
std::string debug()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=?";
return msg;
}
std::string result()
{
std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + "=" + std::to_string(_result);
return msg;
}
private:
int _x;
int _y;
int _result;
};
- 运行结果