c++ 实现线程池,下面给出测试用例
mian.cpp
#include <iostream>
#include <thread>
#include <chrono>
#include "threadpool.h"
#include "callback_proxy.h"
using namespace std;
using namespace Demo;
bool GetTimeImpl(int& time) {
std::cout << std::endl;
std::cout << std::this_thread::get_id() <<": 开始延时..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << std::this_thread::get_id() <<": 延时结束..." << std::endl;
time = 5201314;
return true;
}
typedef void(*msg_func)(int time, const void* user_data);
bool GetTimeImplAsync(msg_func callback, const void* user_data)
{
int time = -1;
bool result = GetTimeImpl(time);
if (callback) {
callback(time, user_data); // 触发回调
}
return result;
}
typedef std::function<void(int)> FunCallback;
static void CallbackNotify(int time, const void* callback)
{
CallbackProxy::DoSafeCallback<FunCallback>(callback, [=](const FunCallback& cb)
{
CallbackProxy::Invoke(cb, time);
});
}
// 异步获取时间
bool GetTimeAsync(const FunCallback& callback)
{
FunCallback* callback_impl = nullptr;
if (callback) {
callback_impl = new FunCallback(callback);
}
Threadpool* thread_pool = Threadpool::GetInstance();
if (thread_pool != nullptr) {
thread_pool->Commit(GetTimeImplAsync, &CallbackNotify, callback_impl);
return true;
}
false;
}
// 同步获取时间
bool GetTime(int &time)
{
bool result = GetTimeImpl(time);
return result;
}
int main() {
std::cout << "main start" << std::endl;
int time;
GetTime(time); // 同步获取时间
std::cout << "tongbu time:" <<time << std::endl;
std::cout << "*********" << std::endl;
GetTimeAsync([](int time) { // 异步获取时间
std::cout << "yibu time:" << time << std::endl;
});
std::cout << "main end" << std::endl << std::endl;
getchar();
return 0;
}
threadpool.h
/*************************************************
** Copyright: xxx公司
** Author: xxx
** Date: xxx
** Description: 线程池
**************************************************/
#ifndef THREADPOOL_H_
#define THREADPOOL_H_
#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
class Threadpool {
public:
Threadpool(int size = 4) {
stopped_ = false;
idl_thread_number_ = 0;
idl_thread_number_ = size < 1 ? 1 : size;
for (int i = 0; i < idl_thread_number_; ++i) { //初始化线程数量
pool_.emplace_back(std::thread(&Threadpool::Run, this));
}
}
~Threadpool() {
// 线程退出,释放资源
stopped_.store(true);
cv_task_.notify_all(); // 唤醒所有线程执行
for (auto& item : pool_) {
if (item.joinable())
item.join();
}
}
static Threadpool* GetInstance()
{
static Threadpool threadpool;
return &threadpool;
}
public:
// 提交任务加入队列
// 可调用.get()等待任务执行完,获取返回值
template<class F, class... Args>
auto Commit(F&& f, Args&&... args)->std::future<decltype(f(args...))>
{
using RetType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<RetType()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<RetType> future = task->get_future();
if (stopped_.load()) return future;
{
// 添加任务到队列
std::lock_guard<std::mutex> lock(lock_);
tasks_.emplace(
[task]()
{
(*task)();
}
);
}
cv_task_.notify_one(); // 唤醒一个线程执行
return future;
}
// 空闲线程数量
inline int idl_thread_number() { return idl_thread_number_; }
protected:
void Run() {
while (!this->stopped_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock{ this->lock_ };
this->cv_task_.wait(lock,
[this] {
return this->stopped_.load() || !this->tasks_.empty();
}
); // wait 直到有 task
if (this->stopped_ && this->tasks_.empty())
return;
task = std::move(this->tasks_.front());
this->tasks_.pop();
}
idl_thread_number_--;
task();
idl_thread_number_++;
}
}
private:
using Task = std::function<void()>;
std::vector<std::thread> pool_; // 线程池
std::queue<Task> tasks_; // 任务队列
std::mutex lock_; // 同步
std::condition_variable cv_task_; // 条件阻塞
std::atomic<bool> stopped_; // 是否关闭提交
std::atomic<int> idl_thread_number_; // 空闲线程数量
};
#endif // THREADPOOL_H_
callback_proxy.h
/*************************************************
** Copyright: xxx信息
** Author: xxx
** Date: xxx
** Description: xxx
**************************************************/
#ifndef CALLBACK_PROXY_H_
#define CALLBACK_PROXY_H_
#include <functional>
#include <memory>
namespace Demo
{
class CallbackProxy
{
template<typename TR = void>
using CallbackProxyClosure = std::function<TR(void)>;
public:
template<typename TCallback, typename TDoCall>
static auto DoSafeCallback(const void* callback, const TDoCall& closure, bool delete_callback = false)
->decltype(closure((*(TCallback*)(callback))))
{
using TCallbackPtr = TCallback*;
using TReturn = decltype(closure((*(TCallbackPtr)(callback))));
struct Deleter {
Deleter(TCallbackPtr cb_ptr) : cb_ptr_(cb_ptr) {}
~Deleter() {
if (cb_ptr_ != nullptr) {
delete cb_ptr_;
cb_ptr_ = nullptr;
}
}
private: TCallbackPtr cb_ptr_;
};
if (callback != nullptr) {
auto&& real_type_cb_ptr = (TCallbackPtr)(callback);
Deleter deleter(delete_callback ? real_type_cb_ptr : nullptr);
return (*real_type_cb_ptr == nullptr) ? (TReturn()) : (closure(std::forward<TCallback>(*real_type_cb_ptr)));
}
return TReturn();
}
template<class F, class... Args, class = typename std::enable_if<!std::is_member_function_pointer<F>::value>::type>
static auto Invoke(F && f, Args && ... args)->decltype(f(std::forward<Args>(args)...))
{
using TReturn = decltype(f(std::forward<Args>(args)...));
return Run<TReturn>(std::bind(f, std::forward<Args>(args)...));
}
template<class R, class C, class... DArgs, class P, class... Args>
static R Invoke(R(C::*f)(DArgs...) const, P && p, Args && ... args)
{
using TReturn = R;
return Run<TReturn>(std::bind(f, p, std::forward<Args>(args)...));
}
template<class R, class C, class... DArgs, class P, class... Args>
static R Invoke(R(C::*f)(DArgs...), P && p, Args && ... args)
{
using TReturn = R;
return Run<TReturn>(std::bind(f, p, std::forward<Args>(args)...));
}
protected:
template<typename TR>
static TR Run(const CallbackProxyClosure<TR>& task)
{
return task();
}
//template<>
static void Run(const CallbackProxyClosure<void>& task)
{
if (docallback_async_ != nullptr) {
docallback_async_(task);
} else {
task();
}
}
private:
static std::function<void(const std::function< void()>&)> docallback_async_;
};
}
#endif // CALLBACK_PROXY_H_