C++的并发世界(十一)——线程池

0.线程池的概念

在这里插入图片描述

1.线程池使用步骤

①初始化线程池:确定线程数量,并做好互斥访问;
②启动所有线程
③准备好任务处理基类;
④获取任务接口:通过条件变量阻塞等待任务

2.atomic原子操作

'std:atomic`是C++11标准库中的一个模板类,用于实现多线程环境下的原子操作。它提供了一种线程安全的方式来访问和修改共享变量,可以避免多线程环境中的数据竞争问题,'std:atomic’的使用方式类似于普通的C++变量,但是它的操作是原子性的。也就是说,在多线程环境下,多个线程同时对同一个’std:atomic变量进行操作时,不会出现数据竞争问题。

3.线程池案例

①threadpool.cpp

#include "threadpool.h"


void XThreadPool::Init(int num)
{
	std::unique_lock<std::mutex> lock(__mux__);
	thread_num = num;
	std::cout << "Thread pool Init: " << num << std::endl;
}


void XThreadPool::Start()
{
	std::unique_lock<std::mutex> lock(__mux__);
	
	if (thread_num <= 0)
	{
		std::cerr << "Please Init XThreadPool !" << std::endl;
		return;
	}

	if (!threads.empty())
	{
		std::cerr << "Thread Pool has start!" << std::endl;
		return;
	}

	for (int i = 0; i < thread_num; i++)
	{
		auto th = std::make_shared<std::thread>(&XThreadPool::Run, this);
		threads.push_back(th);
	}
}

void XThreadPool::Run()
{
	std::cout << "begin XThreadPool Run: " << std::this_thread::get_id() << std::endl;
	
	while (true)
	{
		auto task = GetTask();
		if (!task)
		{
			continue;
		}
		
		++__task_run_count__;

		try
		{
			auto re = task->Run();
			task->Setvalue(re);
		}
		catch (...)
		{
			
		}
		--__task_run_count__;
	}

	std::cout << "end XThreadPool Run: " << std::this_thread::get_id() << std::endl;
}

void XThreadPool::AddTask(XTask *task)
{
	std::unique_lock<std::mutex> lock(__mux__);
	tasks.push_back(task);
	task->is_exit = [this] {return is_exit(); }
}

XTask* XThreadPool::GetTask()
{
	std::unique_lock<std::mutex> lock(__mux__);
	if (tasks.empty())
	{
		__cv__.wait(lock);
	}

	auto task = tasks.front();
	tasks.pop_front();
	return task;
}

void XThreadPool::Stop()
{
	exit = true;
	__cv__.notify_all();
	for (auto &th : threads)
	{
		th->join();
	}

	std::unique_lock<std::mutex> lock(__mux__);
	threads.clear();
}

②threadpool.h

#pragma once

#include <thread>
#include <mutex>
#include <vector>
#include <list>
#include <iostream>
#include <string>
#include <condition_variable>
#include <functional>
#include <atomic>
#include <future>

class XTask
{
	public:
		virtual int Run() = 0;
		std::function<bool()> is_exit = nullptr;
		void Setvalue(int v) { __p__.set_value(v); }
		auto GetValue() { return __p__.get_future().get(); }
	private:
		std::promise<int> __p__;//用来接收返回值
};

class XThreadPool
{
	public:
		void Init(int num);
		void Start();//所有线程启动函数
		void Stop();//线程池退出
		void AddTask(XTask *task);
		XTask* GetTask();
		bool is_exit() { return exit; }
		int task_run_count() { return __task_run_count__; }

	private:
		int thread_num = 0;
		std::mutex __mux__;
		void Run();//线程池线程入口函数
		std::vector<std::shared_ptr<std::thread>> threads;
		std::list<XTask*> tasks;
		std::condition_variable __cv__;
		bool exit = false;
		std::atomic<int> __task_run_count__ = {0};//正在运行的任务数量
};

③main.cpp

#include "threadpool.h"

class MyTask :public XTask
{
	public:
		int Run()
		{
			std::cout <<"==============================================" << std::endl;
			std::cout << std::this_thread::get_id() << "-Mytask" << name << std::endl;
			std::cout << "==============================================" << std::endl;

			for (int i = 0; i < 10; i++)
			{
				if(is_exit())
				{
					break;
				}

				std::cout << "." << std::flush;
				std::this_thread::sleep_for(std::chrono::microseconds(500));
			}

			return 0;
		}

		std::string name = "";
};

int main()
{
	XThreadPool pool;
	pool.Init(16);
	pool.Start();

	MyTask task1;
	task1.name = "test name 001";
	pool.AddTask(&task1);

	MyTask task2;
	task2.name = "test name 002";
	pool.AddTask(&task2);
	std::this_thread::sleep_for(std::chrono::seconds(100));
	std::cout << "task run count =" << pool.task_run_count() << std::endl;

	MyTask task3;
	task3.name = "test name 003";
	pool.AddTask(&task3);

	MyTask task4;
	task4.name = "test name 004";
	pool.AddTask(&task4);
	std::cout << "task run count = " << pool.task_run_count() << std::endl;

	std::this_thread::sleep_for(std::chrono::seconds(1));
	pool.Stop();
	std::cout << "task run count =" << pool.task_run_count() << std::endl;

	getchar();
	return 0;
}

相关推荐

  1. 一步一步写线一线应用内存

    2024-04-11 10:12:04       33 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-11 10:12:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-11 10:12:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-11 10:12:04       82 阅读
  4. Python语言-面向对象

    2024-04-11 10:12:04       91 阅读

热门阅读

  1. jQuery笔记 02

    2024-04-11 10:12:04       35 阅读
  2. Anthropic Claude 3 加入亚马逊云科技 AI“全家桶”

    2024-04-11 10:12:04       36 阅读
  3. Mysql 事物阻塞

    2024-04-11 10:12:04       41 阅读
  4. PHP 图片裁剪类封装

    2024-04-11 10:12:04       30 阅读
  5. 2192. 有向无环图中一个节点的所有祖先

    2024-04-11 10:12:04       38 阅读
  6. Visual Studio 2022 快速注释代码

    2024-04-11 10:12:04       39 阅读
  7. 使用Django开发爬虫系统

    2024-04-11 10:12:04       39 阅读