什么是线程池
线程池其实就是一种多线程处理形式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。
为什么要引入线程池
- 在单线程的场景中,如在服务端的服务中,要对多个客户端进行服务,单个线程会阻塞住,使效率大大降低,这时候就要多个线程,
- 然而在使用多个线程的时候,来一个任务,就要创建一个线程去服务,当客户端退出又需要销毁线程,而频繁的创建线程销毁线程效率很低,另一方面,
- 如果一次有很多个线程,计算机当中的线程越多,CPU的压力就越大,因为CPU要不断在这些线程之间来回切换,此时CPU在调度线程的时候,线程和线程之间切换的成本就会变得很高。此外,一旦线程太多,每一个线程再次被调度的周期就变长了,而线程是为客户端提供服务的,线程被调度的周期变长,客户端也迟迟得不到应答。
- 所以这里可以引入线程池,用固定数量的线程来处理任务,而这些任务由一个任务队列来管理,当有新的任务到来的时候,就可以将任务Push到线程池当中,如果线程池当中没有任务那么当前线程就会进入休眠状态。
线程池的应用场景
- 12306网上购票系统等
- 云盘文件上传和下载
- 网购
线程池的代码实现
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个pushtask接口,用于让外部线程能够将任务push到任务队列当中。
- 任务类中能够在接收任务后对任务进行处理
- 任务队列可能会被多个线程同时访问,所以在对任务队列操作引入互斥锁
任务类的实现
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
void run(){
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
std::cout<<formatArg()<<formatRes()<<std::endl;
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
线程池的实现
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include <pthread.h>
#include "Task.hpp"
const static int N = 5;
template <class T>
class ThreadPool
{
public:
ThreadPool(int num = N) : _num(num), _threads(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
void lockQueue()
{
pthread_mutex_lock(&_lock);
}
void unlockQueue()
{
pthread_mutex_unlock(&_lock);
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
tp->lockQueue();
while (tp->isEmpty())
{
tp->threadWait();
}
T t = tp->popTask(); // 从公共区域拿到私有区域
tp->unlockQueue();
t.run(); // 处理任务,应不应该在临界区中处理?1,0
}
}
void init()
{
// TODO
}
void start()
{
for (int i = 0; i < _num; i++)
{
pthread_create(&_threads[i], nullptr, threadRoutine, this); // ?
}
}
void pushTask(const T &t)
{
lockQueue();
_tasks.push(t);
threadWakeup();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<pthread_t> _threads;
int _num;
std::queue<T> _tasks; // 使用stl的自动扩容的特性
pthread_mutex_t _lock;
pthread_cond_t _cond;
};
测试代码
#include"ThreadPool.hpp"
#include<memory>
int main(){
std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
tp->init();
tp->start();
const char* op = "+-*/%";
for( ;;){
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
int index = rand() % 5;
Task task(x, y, op[index]);
tp->pushTask(task);
}
}
拓展操作
- 封装一个线程类
- 封装一个互斥锁类,在构造的时候加锁,析构的时候解锁
- 用单例模式来操作线程池
拓展后的代码
线程类
#pragma once
#include <iostream>
#include <string>
#include <cstdlib>
#include <pthread.h>
class Thread
{
public:
typedef enum
{
NEW = 0,
RUNNING,
EXITED
} ThreadStatus;
typedef void (*func_t)(void *);
public:
Thread(int num, func_t func, void *args) : _tid(0), _status(NEW), _func(func), _args(args)
{
char name[128];
snprintf(name, sizeof(name), "thread-%d", num);
_name = name;
}
int status() { return _status; }
std::string threadname() { return _name; }
pthread_t threadid()
{
if (_status == RUNNING)
return _tid;
else
{
return 0;
}
}
// 是不是类的成员函数,而类的成员函数,具有默认参数this,需要static
// 但是会有新的问题:static成员函数,无法直接访问类属性和其他成员函数
static void *runHelper(void *args)
{
Thread *ts = (Thread*)args; // 就拿到了当前对象
// _func(_args);
(*ts)();
return nullptr;
}
void operator ()() //仿函数
{
if(_func != nullptr) _func(_args);
}
void run()
{
int n = pthread_create(&_tid, nullptr, runHelper, this);
if(n != 0) exit(1);
_status = RUNNING;
}
void join()
{
int n = pthread_join(_tid, nullptr);
if( n != 0)
{
std::cerr << "main thread join thread " << _name << " error" << std::endl;
return;
}
_status = EXITED;
}
~Thread()
{
}
private:
pthread_t _tid;
std::string _name;
func_t _func; // 线程未来要执行的回调
void *_args;
ThreadStatus _status;
};
互斥锁类
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex // 自己不维护锁,有外部传入
{
public:
Mutex(pthread_mutex_t *mutex):_pmutex(mutex)
{}
void lock()
{
pthread_mutex_lock(_pmutex);
}
void unlock()
{
pthread_mutex_unlock(_pmutex);
}
~Mutex()
{}
private:
pthread_mutex_t *_pmutex;
};
class LockGuard // 自己不维护锁,有外部传入
{
public:
LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
{
_mutex.lock();
}
~LockGuard()
{
_mutex.unlock();
}
private:
Mutex _mutex;
};
任务类
#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
class Task
{
public:
Task()
{
}
Task(int x, int y, char op) : _x(x), _y(y), _op(op), _result(0), _exitCode(0)
{
}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if (_y == 0)
_exitCode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if (_y == 0)
_exitCode = -2;
else
_result = _x % _y;
}
break;
default:
break;
}
usleep(100000);
}
std::string formatArg()
{
return std::to_string(_x) + _op + std::to_string(_y) + "= ?";
}
std::string formatRes()
{
return std::to_string(_result) + "(" + std::to_string(_exitCode) + ")";
}
~Task()
{
}
private:
int _x;
int _y;
char _op;
int _result;
int _exitCode;
};
线程池类
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Thread.hpp"
#include "Task.hpp"
#include "lockGuard.hpp"
const static int N = 5;
template <class T>
class ThreadPool
{
private:
ThreadPool(int num = N) : _num(num)
{
pthread_mutex_init(&_lock, nullptr);
pthread_cond_init(&_cond, nullptr);
}
ThreadPool(const ThreadPool<T> &tp) = delete;
void operator=(const ThreadPool<T> &tp) = delete;
public:
static ThreadPool<T> *getinstance()
{
if(nullptr == instance) // 为什么要这样?提高效率,减少加锁的次数!
{
LockGuard lockguard(&instance_lock);
if (nullptr == instance)
{
instance = new ThreadPool<T>();
instance->init();
instance->start();
}
}
return instance;
}
pthread_mutex_t *getlock()
{
return &_lock;
}
void threadWait()
{
pthread_cond_wait(&_cond, &_lock);
}
void threadWakeup()
{
pthread_cond_signal(&_cond);
}
bool isEmpty()
{
return _tasks.empty();
}
T popTask()
{
T t = _tasks.front();
_tasks.pop();
return t;
}
static void threadRoutine(void *args)
{
// pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
while (true)
{
// 1. 检测有没有任务
// 2. 有:处理
// 3. 无:等待
// 细节:必定加锁
T t;
{
LockGuard lockguard(tp->getlock());
while (tp->isEmpty())
{
tp->threadWait();
}
t = tp->popTask(); // 从公共区域拿到私有区域
}
// for test
t();
std::cout << "thread handler done, result: " << t.formatRes() << std::endl;
// t.run(); // 处理任务,应不应该在临界区中处理?1,0
}
}
void init()
{
for (int i = 0; i < _num; i++)
{
_threads.push_back(Thread(i, threadRoutine, this));
}
}
void start()
{
for (auto &t : _threads)
{
t.run();
}
}
void check()
{
for (auto &t : _threads)
{
std::cout << t.threadname() << " running..." << std::endl;
}
}
void pushTask(const T &t)
{
LockGuard lockgrard(&_lock);
_tasks.push(t);
threadWakeup();
}
~ThreadPool()
{
for (auto &t : _threads)
{
t.join();
}
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_cond);
}
private:
std::vector<Thread> _threads;
int _num;
std::queue<T> _tasks; // 使用stl的自动扩容的特性
pthread_mutex_t _lock;
pthread_cond_t _cond;
static ThreadPool<T> *instance;
static pthread_mutex_t instance_lock;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
template <class T>
pthread_mutex_t ThreadPool<T>::instance_lock = PTHREAD_MUTEX_INITIALIZER;
测试代码
#include "ThreadPoolEnd.hpp"
#include "Task.hpp"
#include <memory>
int main()
{
printf("0X%x\n", ThreadPool<Task>::getinstance());//打印单例的地址
printf("0X%x\n", ThreadPool<Task>::getinstance());
printf("0X%x\n", ThreadPool<Task>::getinstance());
printf("0X%x\n", ThreadPool<Task>::getinstance());
printf("0X%x\n", ThreadPool<Task>::getinstance());
printf("0X%x\n", ThreadPool<Task>::getinstance());
const char* op = "+-*/%";
while (true)
{
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
int index = rand() % 5;
Task t(x, y, op[index]);
ThreadPool<Task>::getinstance()->pushTask(t); //单例对象也有可能在多线程场景中使用!
}
}