Boost中线程的使用

目录

boost的线程基本用法

boost:condition

thread_group 线程组

thread_pool


boost的线程基本用法


	boost::thread Thread_GenerateUuid;
	boost::thread Thread_ShowUuid;

	boost::mutex mutex;
	std::queue<std::string>UuidQueue;
	void procGenerateUuid();
	void showUuid();

void GetPveDataDemo::procGenerateUuid()
{
	try{
		for (;;)
		{
			boost::this_thread::sleep_for(boost::chrono::seconds(2));
			boost::this_thread::interruption_point(); // 手动在线程中加入中断点,中断点不影响其他语句执行 当其他地方触发该断点(thread.interrupt())再次执行到此处,抛出boost::thread_interrupted异常,若未捕捉则线程终止。 
			if (UuidQueue.size() <= 5)
			{
				boost::lock_guard<boost::mutex> lck_guard(mutex);
				std::string uuid = QUuid::createUuid().toString().toStdString();
				UuidQueue.push(uuid);
			}
		}
	}
	catch (...)
	{
		
	}
}

void GetPveDataDemo::showUuid()
{
	try{
		int cnt = 1;
		for (;;)
		{
			boost::this_thread::sleep_for(boost::chrono::seconds(2));
			boost::this_thread::interruption_point();// 手动在线程中加入中断点,中断点不影响其他语句执行 当其他地方触发该断点可从该循环中退出
			if (!UuidQueue.empty())
			{
				boost::lock_guard<boost::mutex> lck_guard(mutex);
				std::string cuuid = UuidQueue.front();
				UuidQueue.pop();
				QString UuidDisplay = "#:" + QString::number(cnt) + ": " + QString(cuuid.c_str()) + "\n";
				ui.textBrowser_UUID->append(UuidDisplay);//界面和线程写在一起了不太好	
				++cnt;
			}
		}
	}
	catch (...)
	{
		
	}
}


void GetPveDataDemo::on_pushButton_2_clicked()
{
	//Uuid线程
	if (!Thread_GenerateUuid.joinable() && !Thread_ShowUuid.joinable())	//joinable判断线程是否结束
	{
		Thread_GenerateUuid = boost::thread(boost::bind(&GetPveDataDemo::procGenerateUuid, this));
		Thread_ShowUuid = boost::move(boost::thread(boost::bind(&GetPveDataDemo::showUuid, this)));
		ui.pushButton_2->setText(QString::fromLocal8Bit("停止生成UUID"));
	}
	else
	{
		Thread_GenerateUuid.interrupt(); //向线程发送中断请求
		Thread_ShowUuid.interrupt();
		Thread_GenerateUuid.join();//join连接 detach是脱离线程 不会阻塞
		Thread_ShowUuid.join();	 //join函数,作用是等待直到线程执行结束; 阻塞当前线程

		ui.pushButton_2->setText(QString::fromLocal8Bit("开始生成UUID"));
		ui.textBrowser_UUID->clear();
		ui.textBrowser_UUID->append(QString::fromLocal8Bit("当前生产线程停止"));
		ui.textBrowser_UUID->append(QString::fromLocal8Bit("当前消费线程停止"));
	}
	//move 移动语义 将临时对象的所有权给予GenerateUuid,避免临时变量不必要的拷贝和析构
}

boost:condition

与std的条件变量一样会有虚假唤醒问题,同样需要谓词处理。

#define LANENUM 32

boost::mutex mtx_etcSignal[LANENUM];
boost::condition m_cond_etcSignal[LANENUM];


map<string,int> plateNo2flag;

bool waitForEtcSignal(int laneNo,const string&plate,int connTime)
{
    boost::mutex::scoped_lock lk(mutex mtx_etcSignal[laneNo]);
    m_cond_etcSignal[laneNo].wait_for(lk,bost::chrono::milliseconds(connTime),[&]()
   {
        if(plateNo2flag.count(plate)&&plateNo2flag[plate] == 1)
        { 
             plateNo2flag.erase(plate);
             return true;
        }
        else
        {
            return false;
        }
    });//添加谓词避免虚假唤醒
    return false;
}



//扣费线程发起扣费请求...

bool status = waitForEtcSignal(laneNo,plate,connTime);
if(status)
{
    //...
}


//server线程收到扣费结果
//...
if(paySuccess)
{
    boost::mutex::scoped_lock lk(mutex mtx_etcSignal[laneNo]);
    plateNo2flag[plate] = 1;
    m_cond_etcSignal[laneNo].notify_all();
}

thread_group 线程组

        boost::thread_group 实际利用list<thread>管理一组线程,方便批量管理;如批量join;不是线程池,没有线程池的自动伸缩等功能。

        

bool TestServerClient::startServer()
{
	if (!m_bStarted)
	{ 
		if (!group)
		{
			group = new boost::thread_group();
		}
		try{
			group->create_thread(boost::bind(&TestServerClient::func_1, this));
			group->add_thread(new boost::thread(boost::bind(&TestServerClient::func_2, this, "hello client")));
            //创建线程会自动加入list中(无法接受参数,需要bind) add添加线程
			m_bStarted = true;
		}
		catch (std::runtime_error e)
		{
			std::string errorstr(e.what());
		}
	}
	return m_bStarted;
}

void TestServerClient::stopServer()
{
	if (m_bStarted)
	{
		group->interrupt_all();
		group->join_all();
		group = nullptr;
	}
}

 thread_pool

boost的线程池概述:

https://threadpool.sourceforge.net/index.html

 threadpool是基于[boost]库实现的一个线程池子库 .

/*! \file
* \brief Thread pool core.
*
* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
*
* Thread pools are a mechanism for asynchronous and parallel processing 
* within the same process. The pool class provides a convenient way 
* for dispatching asynchronous tasks as functions objects. The scheduling
* of these tasks can be easily controlled by using customized schedulers. 
*
* Copyright (c) 2005-2007 Philipp Henkel
*
* Use, modification, and distribution are  subject to the
* Boost Software License, Version 1.0. (See accompanying  file
* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
*
* http://threadpool.sourceforge.net
*
*/


#ifndef THREADPOOL_POOL_HPP_INCLUDED
#define THREADPOOL_POOL_HPP_INCLUDED

#include <boost/ref.hpp>

#include "./detail/pool_core.hpp"

#include "task_adaptors.hpp"

#include "./detail/locking_ptr.hpp"

#include "scheduling_policies.hpp"
#include "size_policies.hpp"
#include "shutdown_policies.hpp"



/// The namespace threadpool contains a thread pool and related utility classes.
namespace boost { namespace threadpool
{



  /*! \brief Thread pool. 
  *
  * Thread pools are a mechanism for asynchronous and parallel processing 
  * within the same process. The pool class provides a convenient way 
  * for dispatching asynchronous tasks as functions objects. The scheduling
  * of these tasks can be easily controlled by using customized schedulers. 
  * A task must not throw an exception.
  *
  * A pool is DefaultConstructible, CopyConstructible and Assignable.
  * It has reference semantics; all copies of the same pool are equivalent and interchangeable. 
  * All operations on a pool except assignment are strongly thread safe or sequentially consistent; 
  * that is, the behavior of concurrent calls is as if the calls have been issued sequentially in an unspecified order.
  *
  * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
  * \param SchedulingPolicy A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
  *
  * \remarks The pool class is thread-safe.
  * 
  * \see Tasks: task_func, prio_task_func
  * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
  */ 
  template <
    typename Task                                   = task_func,
    template <typename> class SchedulingPolicy      = fifo_scheduler,
    template <typename> class SizePolicy            = static_size,
    template <typename> class SizePolicyController  = resize_controller,
    template <typename> class ShutdownPolicy        = wait_for_all_tasks
  > 
  class thread_pool 
  {
    typedef detail::pool_core<Task, 
                              SchedulingPolicy,
                              SizePolicy,
                              SizePolicyController,
                              ShutdownPolicy> pool_core_type;
    shared_ptr<pool_core_type>          m_core; // pimpl idiom
    shared_ptr<void>                    m_shutdown_controller; // If the last pool holding a pointer to the core is deleted the controller shuts the pool down.

  public: // Type definitions
    typedef Task task_type;                                   //!< Indicates the task's type.
    typedef SchedulingPolicy<task_type> scheduler_type;       //!< Indicates the scheduler's type.
 /*   typedef thread_pool<Task, 
                        SchedulingPolicy,
                        SizePolicy,
                        ShutdownPolicy > pool_type;          //!< Indicates the thread pool's type.
 */
    typedef SizePolicy<pool_core_type> size_policy_type; 
    typedef SizePolicyController<pool_core_type> size_controller_type;


  public:
    /*! Constructor.
     * \param initial_threads The pool is immediately resized to set the specified number of threads. The pool's actual number threads depends on the SizePolicy.
     */
    thread_pool(size_t initial_threads = 0)
    : m_core(new pool_core_type)
    , m_shutdown_controller(static_cast<void*>(0), bind(&pool_core_type::shutdown, m_core))
    {
      size_policy_type::init(*m_core, initial_threads);
    }


    /*! Gets the size controller which manages the number of threads in the pool. 
    * \return The size controller.
    * \see SizePolicy
    */
    size_controller_type size_controller()
    {
      return m_core->size_controller();
    }


    /*! Gets the number of threads in the pool.
    * \return The number of threads.
    */
    size_t size()	const
    {
      return m_core->size();
    }


     /*! Schedules a task for asynchronous execution. The task will be executed once only.
     * \param task The task function object. It should not throw execeptions.
     * \return true, if the task could be scheduled and false otherwise. 
     */  
     bool schedule(task_type const & task)
     {	
       return m_core->schedule(task);
     }


    /*! Returns the number of tasks which are currently executed.
    * \return The number of active tasks. 
    */  
    size_t active() const
    {
      return m_core->active();
    }


    /*! Returns the number of tasks which are ready for execution.    
    * \return The number of pending tasks. 
    */  
    size_t pending() const
    {
      return m_core->pending();
    }


    /*! Removes all pending tasks from the pool's scheduler.
    */  
    void clear()
    { 
      m_core->clear();
    }    


    /*! Indicates that there are no tasks pending. 
    * \return true if there are no tasks ready for execution.	
    * \remarks This function is more efficient that the check 'pending() == 0'.
    */   
    bool empty() const
    {
      return m_core->empty();
    }	


    /*! The current thread of execution is blocked until the sum of all active
    *  and pending tasks is equal or less than a given threshold. 
    * \param task_threshold The maximum number of tasks in pool and scheduler.
    */     
    void wait(size_t task_threshold = 0) const
    {
      m_core->wait(task_threshold);
    }	


    /*! The current thread of execution is blocked until the timestamp is met
    * or the sum of all active and pending tasks is equal or less 
    * than a given threshold.  
    * \param timestamp The time when function returns at the latest.
    * \param task_threshold The maximum number of tasks in pool and scheduler.
    * \return true if the task sum is equal or less than the threshold, false otherwise.
    */       
    bool wait(xtime const & timestamp, size_t task_threshold = 0) const
    {
      return m_core->wait(timestamp, task_threshold);
    }
  };



  /*! \brief Fifo pool.
  *
  * The pool's tasks are fifo scheduled task_func functors.
  *
  */ 
  typedef thread_pool<task_func, fifo_scheduler, static_size, resize_controller, wait_for_all_tasks> fifo_pool;


  /*! \brief Lifo pool.
  *
  * The pool's tasks are lifo scheduled task_func functors.
  *
  */ 
  typedef thread_pool<task_func, lifo_scheduler, static_size, resize_controller, wait_for_all_tasks> lifo_pool;


  /*! \brief Pool for prioritized task.
  *
  * The pool's tasks are prioritized prio_task_func functors.
  *
  */ 
  typedef thread_pool<prio_task_func, prio_scheduler, static_size, resize_controller, wait_for_all_tasks> prio_pool;


  /*! \brief A standard pool.
  *
  * The pool's tasks are fifo scheduled task_func functors.
  *
  */ 
  typedef fifo_pool pool;



} } // namespace boost::threadpool

#endif // THREADPOOL_POOL_HPP_INCLUDED

使用示例: 

// fifo策略的pool

boost::threadpool::pool _thread_pool;
_thread_pool.size_controller().resize(4);
_thread_pool.schedule(boost::bind(&LedDevice::SendParkingLots, this, nBelongedPark));
//同步等待结束
_thread_pool.wait();


// priority优先级线程池
#include <boost/thread.hpp>
#include "threadpool/pool.hpp"

class HaiNanEtcInterface final:public EtcInterface
{
public:
    //...
	enum Priority_Level
	{
		LowPriority = 100,
		MidPriority = 200,
		HighPriority = 300
	};//枚举声明 不定义不占内存 实际存在常量区(static 也在常量区)均可类名:访问
private:
	atomic_bool m_bStarted;
	boost::threadpool::prio_pool m_EtcThreadPool;
};


bool HaiNanEtcInterface::startServer()
{
	if (!m_bStarted)
	{
m_EtcThreadPool.size_controller().resize(4);//初始化线程池大小
	}
	m_bStarted = true;
	return m_bStarted;
}

//使用优先级策略的线程池
void HaiNanEtcInterface::queryFee(const PassVehicle& pve)
{
		if (m_EtcThreadPool.size() > 1)
		{		m_EtcThreadPool.schedule(boost::threadpool::prio_task_func(HighPriority, boost::bind(&HaiNanEtcInterface::queryFeeFunc, this,pve)));
		}
}

void HaiNanEtcInterface::checkTime()
{
		if (m_EtcThreadPool.size() > 1)
		{		m_EtcThreadPool.schedule(boost::threadpool::prio_task_func(LowPriority, boost::bind(&HaiNanEtcInterface::checkTimeFunc, this)));
		}
}

void HaiNanEtcInterface::stopServer()
{
	if (m_bStarted)
	{
        //清除pending的task 在等待所有active和pending的task结束
		m_EtcThreadPool.clear();
		m_EtcThreadPool.wait();
	}
}

相关推荐

  1. Spring中线池ThreadPoolTaskExecutor使用

    2024-07-18 00:44:03       50 阅读
  2. Android中线通信-Handler

    2024-07-18 00:44:03       49 阅读
  3. QT中线退出分析

    2024-07-18 00:44:03       52 阅读
  4. Elastic boosting使用

    2024-07-18 00:44:03       35 阅读
  5. 线使用

    2024-07-18 00:44:03       32 阅读
  6. spring boot 线应用

    2024-07-18 00:44:03       33 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-18 00:44:03       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-18 00:44:03       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-18 00:44:03       58 阅读
  4. Python语言-面向对象

    2024-07-18 00:44:03       69 阅读

热门阅读

  1. 详解Redis源码中的设计模式及设计思想

    2024-07-18 00:44:03       20 阅读
  2. HTSJDK库Cigar类介绍

    2024-07-18 00:44:03       23 阅读
  3. Html_Css问答集(9)

    2024-07-18 00:44:03       18 阅读
  4. 2024.7.17

    2024-07-18 00:44:03       26 阅读
  5. Web前端-Web开发CSS基础4-显示

    2024-07-18 00:44:03       18 阅读
  6. xml 标记语言介绍

    2024-07-18 00:44:03       23 阅读
  7. C# lock关键字

    2024-07-18 00:44:03       19 阅读