c++ 实现线程池、实现异步接口

c++ 实现线程池,下面给出测试用例

mian.cpp

#include <iostream>
#include <thread>
#include <chrono>

#include "threadpool.h"
#include "callback_proxy.h"

using namespace std;
using namespace Demo;

bool GetTimeImpl(int& time) {
	std::cout << std::endl;
	std::cout << std::this_thread::get_id() <<": 开始延时..." << std::endl;
	std::this_thread::sleep_for(std::chrono::seconds(5));
	std::cout << std::this_thread::get_id() <<": 延时结束..." << std::endl;

	time = 5201314;
	return true;
}

typedef void(*msg_func)(int time, const void* user_data);
bool GetTimeImplAsync(msg_func callback, const void* user_data)
{
	int  time = -1;
	bool result = GetTimeImpl(time);
	if (callback) {
		callback(time, user_data);  // 触发回调
	}
	return result;
}


typedef std::function<void(int)> FunCallback;
static void CallbackNotify(int time, const void* callback)
{
	CallbackProxy::DoSafeCallback<FunCallback>(callback, [=](const FunCallback& cb)
	{
		CallbackProxy::Invoke(cb, time);
	});
}

// 异步获取时间
bool GetTimeAsync(const FunCallback& callback)
{
	FunCallback* callback_impl = nullptr;
	if (callback) {
		callback_impl = new FunCallback(callback);
	}
	Threadpool* thread_pool = Threadpool::GetInstance();
	if (thread_pool != nullptr) {
		thread_pool->Commit(GetTimeImplAsync, &CallbackNotify, callback_impl);
		return true;
	}
	false;
}

// 同步获取时间
bool GetTime(int &time)
{
	bool result = GetTimeImpl(time);
	return result;
}

int main() {
	std::cout << "main start" << std::endl;

	int time;
	GetTime(time); // 同步获取时间
	std::cout << "tongbu time:" <<time << std::endl;

	std::cout << "*********" << std::endl;

	GetTimeAsync([](int time) {  // 异步获取时间
	    std::cout << "yibu time:" << time << std::endl;
	});

	std::cout << "main end" << std::endl << std::endl;

	getchar();
	return 0;
}

threadpool.h

/*************************************************
** Copyright:   xxx公司
** Author:      xxx
** Date:        xxx
** Description: 线程池
**************************************************/
#ifndef THREADPOOL_H_
#define THREADPOOL_H_

#include <vector>
#include <queue>
#include <thread>
#include <atomic>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class Threadpool {
public:
    Threadpool(int size = 4) {
        stopped_ = false;
        idl_thread_number_ = 0;

        idl_thread_number_ = size < 1 ? 1 : size;
        for (int i = 0; i < idl_thread_number_; ++i) {   //初始化线程数量
            pool_.emplace_back(std::thread(&Threadpool::Run, this));
        }
    }

    ~Threadpool() {
        // 线程退出,释放资源
        stopped_.store(true);
        cv_task_.notify_all(); // 唤醒所有线程执行
        for (auto& item : pool_) {
            if (item.joinable())
                item.join();
        }
    }

	static Threadpool* GetInstance() 
	{
		static Threadpool threadpool;
		return &threadpool;
	}

public:
    // 提交任务加入队列
    // 可调用.get()等待任务执行完,获取返回值
    template<class F, class... Args>
    auto Commit(F&& f, Args&&... args)->std::future<decltype(f(args...))>
    {
        using RetType = decltype(f(args...));
        auto task = std::make_shared<std::packaged_task<RetType()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );
        std::future<RetType> future = task->get_future();

        if (stopped_.load()) return future;

        {
            // 添加任务到队列
            std::lock_guard<std::mutex> lock(lock_);
            tasks_.emplace(
                [task]()
                {
                    (*task)();
                }
            );
        }
        cv_task_.notify_one(); // 唤醒一个线程执行
        return future;
    }

    // 空闲线程数量
    inline int idl_thread_number() { return idl_thread_number_; }

protected:
    void Run() {
        while (!this->stopped_) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock{ this->lock_ };
                this->cv_task_.wait(lock,
                    [this] {
                        return this->stopped_.load() || !this->tasks_.empty();
                    }
                ); // wait 直到有 task
                if (this->stopped_ && this->tasks_.empty())
                    return;
                task = std::move(this->tasks_.front());
                this->tasks_.pop();
            }
            idl_thread_number_--;
            task();
            idl_thread_number_++;
        }
    }

private:
    using Task = std::function<void()>;
    std::vector<std::thread> pool_;                  // 线程池
    std::queue<Task>         tasks_;                 // 任务队列
    std::mutex               lock_;                  // 同步
    std::condition_variable  cv_task_;               // 条件阻塞
    std::atomic<bool>        stopped_;               // 是否关闭提交
    std::atomic<int>         idl_thread_number_;     // 空闲线程数量
};

#endif // THREADPOOL_H_

callback_proxy.h

/*************************************************
** Copyright:   xxx信息
** Author:      xxx
** Date:        xxx
** Description: xxx
**************************************************/
#ifndef CALLBACK_PROXY_H_
#define CALLBACK_PROXY_H_

#include <functional>
#include <memory>

namespace Demo
{
class CallbackProxy
{
    template<typename TR = void>
    using CallbackProxyClosure = std::function<TR(void)>;

public:
    template<typename TCallback, typename TDoCall>
    static auto DoSafeCallback(const void* callback, const TDoCall& closure, bool delete_callback = false)
        ->decltype(closure((*(TCallback*)(callback))))
    {
        using TCallbackPtr = TCallback*;
        using TReturn = decltype(closure((*(TCallbackPtr)(callback))));
        struct Deleter {
            Deleter(TCallbackPtr cb_ptr) : cb_ptr_(cb_ptr) {}
            ~Deleter() {
                if (cb_ptr_ != nullptr) {
                    delete cb_ptr_;
                    cb_ptr_ = nullptr;
                }
            }
            private: TCallbackPtr cb_ptr_;
        };

        if (callback != nullptr) {
            auto&& real_type_cb_ptr = (TCallbackPtr)(callback);
            Deleter deleter(delete_callback ? real_type_cb_ptr : nullptr);
            return (*real_type_cb_ptr == nullptr) ? (TReturn()) : (closure(std::forward<TCallback>(*real_type_cb_ptr)));
        }
        return TReturn();
    }

    template<class F, class... Args, class = typename std::enable_if<!std::is_member_function_pointer<F>::value>::type>
    static auto Invoke(F && f, Args && ... args)->decltype(f(std::forward<Args>(args)...))
    {
        using TReturn = decltype(f(std::forward<Args>(args)...));
        return Run<TReturn>(std::bind(f, std::forward<Args>(args)...));
    }

    template<class R, class C, class... DArgs, class P, class... Args>
    static R Invoke(R(C::*f)(DArgs...) const, P && p, Args && ... args)
    {
        using TReturn = R;
        return Run<TReturn>(std::bind(f, p, std::forward<Args>(args)...));
    }

    template<class R, class C, class... DArgs, class P, class... Args>
    static R Invoke(R(C::*f)(DArgs...), P && p, Args && ... args)
    {
        using TReturn = R;
        return Run<TReturn>(std::bind(f, p, std::forward<Args>(args)...));
    }

protected:
    template<typename TR>
    static TR Run(const CallbackProxyClosure<TR>& task)
    {
        return task();
    }

    //template<>
    static void Run(const CallbackProxyClosure<void>& task)
    {
        if (docallback_async_ != nullptr) {
            docallback_async_(task);
        } else {
            task();
        }
    }

private:
    static std::function<void(const std::function< void()>&)> docallback_async_;
};
}

#endif  // CALLBACK_PROXY_H_

相关推荐

  1. c++ 实现线实现异步接口

    2024-04-03 15:54:03       32 阅读
  2. C++同异步极致线

    2024-04-03 15:54:03       62 阅读
  3. 简易线实现

    2024-04-03 15:54:03       39 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-03 15:54:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-03 15:54:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-03 15:54:03       82 阅读
  4. Python语言-面向对象

    2024-04-03 15:54:03       91 阅读

热门阅读

  1. LeetCode 746. 使用最小花费爬楼梯

    2024-04-03 15:54:03       41 阅读
  2. 模拟退火算法

    2024-04-03 15:54:03       36 阅读
  3. 每日OJ题_回文串dp①_力扣647. 回文子串

    2024-04-03 15:54:03       36 阅读
  4. 【WPF应用24】C#中的Image控件详解与应用示例

    2024-04-03 15:54:03       45 阅读
  5. rust实现希尔排序算法

    2024-04-03 15:54:03       32 阅读
  6. 七彩云转码系统v12.8二开正式版发布

    2024-04-03 15:54:03       35 阅读
  7. 宝塔面板CentOS Stream 8 x86 下如何安装openlitespeed

    2024-04-03 15:54:03       35 阅读
  8. 【Python BUG】局域网内远程连接mysql错误:1130

    2024-04-03 15:54:03       28 阅读
  9. AI大模型学习的理论基础

    2024-04-03 15:54:03       35 阅读