Linux——多线程(五)

1.线程池

1.1初期框架

thread.hpp

#include<iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    using func_t = std::function<void()>;

    class Thread
    {
    public:
        void Excute()
        {
            _func();
        }
    public:
        Thread(func_t func, std::string name="none-name")
            : _func(func), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) //注意:类成员函数,形参是有this指针的
        {
            Thread *self = static_cast<Thread *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;//线程tid
        std::string _threadname;//线程名字
        func_t _func;//线程所要执行的函数
        bool _stop;//判断线程是否停止
    };
}

 ThreadPool.hpp 

#include<vector>
#include<unistd.h>
#include<string>
#include<queue>
#include"Thread.hpp"

using namespace ThreadModule;
const int g_thread_num = 3;//默认线程数
// 线程池->一批线程,一批任务,有任务push、有任务pop,本质是: 生产消费模型
template <typename T>
class ThreadPool
{
public:
    ThreadPool(int threadnum=g_thread_num)//构造函数
        :_threadnum(threadnum)
        , _waitnum(0)
        , _isrunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);//初始化锁
        pthread_cond_init(&_cond,nullptr);//初始化条件变量
    }
    void Print()
    {
        while(true)
        {
            std::cout<<"我是一个线程"<<std::endl;
            sleep(1);
        }
    }
    void InitThreadPool()
	{
	    // 指向构建出所有的线程,并不启动
        for (int num = 0; num < _threadnum; num++)
	    {
	        std::string name = "thread-" + std::to_string(num + 1);
            _threads.emplace_back(Print,name);//线程处理函数是Print,注意这里有问题
	    }
	    _isrunning = true;
	}
    void Start()//启动线程池
    {
        for(auto &thread:_threads)
        {
            thread.Start();
            std::cout<<thread.name()<<"线程:启动成功"<<std::endl;
        }
    }



    void Wait()
    {
        for(auto &thread:_threads)
        {
            thread.Join();
        }
    }

    // bool Enqueue(const T &t)
    // {
    // }


    ~ThreadPool()//析构
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _threadnum;//线程的数量
    std::vector<Thread> _threads;//用vector来存线程
    std::queue<T> _task_queue;//任务队列

    pthread_mutex_t _mutex;//锁
	pthread_cond_t _cond;//条件变量
	int _waitnum;//有几个线程阻塞
    bool _isrunning;//判断线程池是否在运行
};

main.cc

#include <iostream>	
#include <string>
#include <memory>
#include "threadpool.hpp"	
	
int main()
{
	std::unique_ptr<ThreadPool<int>> tp(new ThreadPool<int>()); 
	tp->InitThreadPool();
	tp->Start();
	sleep(5);

    tp->Wait();
    return 0;
}

 

 此时会报错:无效使用非静态成员函数...

主要原因是成员函数包含this指针而thread.hpp中线程所要执行函数的参数为空:using func_t = std::function<void()>;,导致参数类型不匹配

有两种解决方法

 方法一:在Print函数前面加上static

    static void Print()
    {
        while(true)
        {
            std::cout<<"我是一个线程"<<std::endl;
            sleep(1);
        }
    }

 

方法二:在初始化线程池时用bind绑定ThreadPool内部的Print方法,缺省地设置参数this,就是将this参数默认的绑定到Print方法上,这样一来就和thread.hpp中的参数匹配上了 

    void InitThreadPool()
	{
	    // 指向构建出所有的线程,并不启动
        for (int num = 0; num < _threadnum; num++)
	    {
	        std::string name = "thread-" + std::to_string(num + 1);
            //_threads.emplace_back(Print,name);//线程处理函数是Print
            _threads.emplace_back(std::bind(&ThreadPool::Print,this),name);
	    }
	    _isrunning = true;
	}

  也是成功运行

就算后面我们需要更改线程的参数
 那么也可以在初始化函数那里固定式的绑定参数了

不需要再去单独给线程设计参数对象了 

一个类的成员方法设计成另一个类的回调方法,常见的实现就是这种

类的成员方法也可以成为另一个类的回调方法,方便我们继续类级别的互相调用

 

1.2代码完善

 

接下来就是如何入队列以及我们的新线程应该做什么任务...

处理任务:每一个线程进来的时候都需要去任务队列中获取任务,所以我们首当其冲的就要对任务队列给它锁住

任务队列的加锁、解锁以及线程的等待与唤醒(条件变量)

private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }

 处理任务

    void HandlerTask(std::string name)//线程处理任务
    {
        while (true)
        {
            //加锁
            LockQueue();
            //任务队列中不一定有数据,如果任务队列为空且线程池在跑,那么就阻塞住
            while(_task_queue.empty()&&_isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            //如果任务队列是空的,然后线程池又退出了,那么就没必要运行了
            if(_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                std::cout<<name<<"quit..."<<std::endl;
                sleep(1);
                break;
            }
            //不论线程池有没有退出,走到这说明一定有任务 ->处理任务
            T t = _task_queue.front();
	        _task_queue.pop();
	        UnlockQueue();//解锁
            t();
        }
    }

 注意:这个任务是属于线程独占的任务,不能再任务队列的加锁、解锁之间处理

 入任务队列

如果线程阻塞等待的数量大于0,就唤醒一个线程

 

    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(t);
            if(_waitnum>0)
            {
                ThreadWakeup();
            }
            ret = true;
        }
        UnlockQueue();
        return ret;
    }

threadpool.hpp

任务还没写,所以t()先注释掉

#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<queue>
#include"LockGuard.hpp"
#include"Thread.hpp"

using namespace ThreadModule;
const int g_thread_num = 3;//默认线程数
// 线程池->一批线程,一批任务,有任务push、有任务pop,本质是: 生产消费模型
template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }
public:
    ThreadPool(int threadnum=g_thread_num)//构造函数
        :_threadnum(threadnum)
        , _waitnum(0)
        , _isrunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);//初始化锁
        pthread_cond_init(&_cond,nullptr);//初始化条件变量
    }
    // static void Print()
    // {
    //     while(true)
    //     {
    //         std::cout<<"我是一个线程"<<std::endl;
    //         sleep(1);
    //     }
    // }
    // void Print(std::string name)
    // {
    //     while(true)
    //     {
    //         std::cout<<"我是一个线程,线程名是"<<name<<std::endl;
    //         sleep(1);
    //     }
    // }
    void InitThreadPool()
	{
	    // 指向构建出所有的线程,并不启动
        for (int num = 0; num < _threadnum; num++)
	    {
	        std::string name = "thread-" + std::to_string(num + 1);
            //_threads.emplace_back(Print,name);//线程处理函数是Print
            //_threads.emplace_back(std::bind(&ThreadPool::Print,this,std::placeholders::_1),name);
	        _threads.emplace_back(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1),name);
	    }
	    _isrunning = true;
	}
    void Start()//启动线程池
    {
        for(auto &thread:_threads)
        {
            thread.Start();
            std::cout<<thread.name()<<"线程:启动成功"<<std::endl;
        }
    }

    void HandlerTask(std::string name)//线程处理任务
    {
        while (true)
        {
            //加锁
            LockQueue();
            //任务队列中不一定有数据,如果任务队列为空且线程池在跑,那么就阻塞住
            while(_task_queue.empty()&&_isrunning)
            {
                _waitnum++;
                std::cout<<name<<"线程阻塞中..."<<std::endl;
                ThreadSleep();
                _waitnum--;
            }
            //如果任务队列是空的,然后线程池又退出了,那么就没必要运行了
            if(_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                std::cout<<name<<"quit..."<<std::endl;
                sleep(1);
                break;
            }
            //不论线程池有没有退出,走到这说明一定有任务 ->处理任务
            T t = _task_queue.front();
	        _task_queue.pop();
	        UnlockQueue();//解锁
            //t();
        }
    }

    void Stop()
	{
	    LockQueue();
	    _isrunning = false;
	    ThreadWakeupAll();
	    UnlockQueue();        
	}


    void Wait()
    {
        for(auto &thread:_threads)
        {
            thread.Join();
        }
    }

    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(t);
            if(_waitnum>0)
            {
                ThreadWakeup();
            }
            ret = true;
        }
        UnlockQueue();
        return ret;
    }


    ~ThreadPool()//析构
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _threadnum;//线程的数量
    std::vector<Thread> _threads;//用vector来存线程
    std::queue<T> _task_queue;//任务队列

    pthread_mutex_t _mutex;//锁
	pthread_cond_t _cond;//条件变量
	int _waitnum;
    bool _isrunning;//判断线程池是否在运行
};

 main.cc

#include <iostream>	
#include <string>
#include <memory>
#include "Task.hpp"
#include "threadpool.hpp"	
	
int main()
{
	std::unique_ptr<ThreadPool<int>> tp(new ThreadPool<int>()); 
	tp->InitThreadPool();
	tp->Start();
	sleep(2);

	tp->Stop();
    tp->Wait();
    return 0;
}

 

2.加上日志与任务

 LOG.hpp(日志)

#pragma once
#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include"LockGuard.hpp"
bool gIsSave = false;
const std::string logname = "log.txt";

// 1. 日志是有等级的
enum Level
{
    DEBUG = 0,
    INFO,
    WARNING,
    ERROR,
    FATAL
};
void SaveFile(const std::string &filename, const std::string &message)
{
    std::ofstream out(filename, std::ios::app);
    if (!out.is_open())
    {
        return;
    }
    out << message;
    out.close();
}
std::string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "Debug";
    case INFO:
        return "Info";
    case WARNING:
        return "Warning";
    case ERROR:
        return "Error";
    case FATAL:
        return "Fatal";
    default:
        return "Unknown";
    }
}

std::string GetTimeString()
{
    time_t curr_time = time(nullptr);
    struct tm *format_time = localtime(&curr_time);
    if (format_time == nullptr)
        return "None";
    char time_buffer[1024];
    snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);
    return time_buffer;
}

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 日志是有格式的
// 日志等级 时间 代码所在的文件名/行数 日志的内容
void LogMessage(std::string filename, int line,bool issave,int level, const char *format, ...)
{

    std::string levelstr = LevelToString(level);
    std::string timestr = GetTimeString();
    pid_t selfid = getpid();

    char buffer[1024];
    va_list arg;
    va_start(arg, format);
    vsnprintf(buffer, sizeof(buffer), format, arg);
    va_end(arg);

    std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
                          "[" + std::to_string(selfid) + "]" +
                          "[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
    LockGuard lockguard(&lock);
    if (!issave)
    {
        std::cout << message;
    }
    else
    {
        SaveFile(logname, message);
    }
}
#define LOG(level, format, ...)                                                \
    do                                                                         \
    {                                                                          \
        LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
    } while (0)

 LockGuard.hpp

#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__

#include <iostream>
#include <pthread.h>

class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex):_mutex(mutex)
    {
        pthread_mutex_lock(_mutex); // 构造加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

#endif

threadpool.hpp

#include<iostream>
#include<vector>
#include<unistd.h>
#include<string>
#include<queue>	
#include"LOG.hpp"
#include"LockGuard.hpp"
#include"Thread.hpp"

using namespace ThreadModule;
const int g_thread_num = 3;//默认线程数
// 线程池->一批线程,一批任务,有任务push、有任务pop,本质是: 生产消费模型
template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeup()
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeupAll()
    {
        pthread_cond_broadcast(&_cond);
    }
public:
    ThreadPool(int threadnum=g_thread_num)//构造函数
        :_threadnum(threadnum)
        , _waitnum(0)
        , _isrunning(false)
    {
        pthread_mutex_init(&_mutex,nullptr);//初始化锁
        pthread_cond_init(&_cond,nullptr);//初始化条件变量
        LOG(INFO, "线程池构造成功");
    }
    // static void Print()
    // {
    //     while(true)
    //     {
    //         std::cout<<"我是一个线程"<<std::endl;
    //         sleep(1);
    //     }
    // }
    // void Print(std::string name)
    // {
    //     while(true)
    //     {
    //         std::cout<<"我是一个线程,线程名是"<<name<<std::endl;
    //         sleep(1);
    //     }
    // }
    void InitThreadPool()
	{
	    // 指向构建出所有的线程,并不启动
        for (int num = 0; num < _threadnum; num++)
	    {
	        std::string name = "thread-" + std::to_string(num + 1);
            //_threads.emplace_back(Print,name);//线程处理函数是Print
            //_threads.emplace_back(std::bind(&ThreadPool::Print,this,std::placeholders::_1),name);
	        _threads.emplace_back(std::bind(&ThreadPool::HandlerTask,this,std::placeholders::_1),name);
            LOG(INFO, "线程 %s 初始化成功", name.c_str());
	    }
	    _isrunning = true;
	}
    void Start()//启动线程池
    {
        for(auto &thread:_threads)
        {
            thread.Start();
            std::cout<<thread.name()<<"线程:启动成功"<<std::endl;
        }
    }

    void HandlerTask(std::string name)//线程处理任务
    {
        LOG(INFO, "%s 正在运行...", name.c_str());
        while (true)
        {
            //加锁
            LockQueue();
            //任务队列中不一定有数据,如果任务队列为空且线程池在跑,那么就阻塞住
            while(_task_queue.empty()&&_isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }
            //如果任务队列是空的,然后线程池又退出了,那么就没必要运行了
            if(_task_queue.empty() && !_isrunning)
            {
                UnlockQueue();
                //std::cout<<name<<"quit..."<<std::endl;
                sleep(1);
                break;
            }
            //不论线程池有没有退出,走到这说明一定有任务 ->处理任务
            T t = _task_queue.front();
	        _task_queue.pop();
	        UnlockQueue();//解锁
            LOG(DEBUG, "%s 获得任务", name.c_str());
            t();
            LOG(DEBUG,"%s 处理任务中,结果是%s",name.c_str(), t.ResultToString().c_str());
        }
    }

    void Stop()
	{
	    LockQueue();
	    _isrunning = false;
	    ThreadWakeupAll();
	    UnlockQueue();        
	}


    void Wait()
    {
        for(auto &thread:_threads)
        {
            thread.Join();
            LOG(INFO, "%s 线程退出...", thread.name().c_str());
        }
    }

    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();
        if(_isrunning)
        {
            _task_queue.push(t);
            if(_waitnum>0)
            {
                ThreadWakeup();
            }
            LOG(DEBUG, "任务入队列成功");
            ret = true;
        }
        UnlockQueue();
        return ret;
    }


    ~ThreadPool()//析构
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
private:
    int _threadnum;//线程的数量
    std::vector<Thread> _threads;//用vector来存线程
    std::queue<T> _task_queue;//任务队列

    pthread_mutex_t _mutex;//锁
	pthread_cond_t _cond;//条件变量
	int _waitnum;
    bool _isrunning;//判断线程池是否在运行
};

thread.hpp

#include<iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    using func_t = std::function<void(std::string)>;

    class Thread
    {
    public:
        void Excute()
        {
            _func(_threadname);
        }
    public:
        Thread(func_t func, std::string name="none-name")
            : _func(func), _threadname(name), _stop(true)
        {}
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread *self = static_cast<Thread *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if(!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if(!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if(!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;//线程tid
        std::string _threadname;//线程名字
        func_t _func;//线程所要执行的函数
        bool _stop;//判断线程是否停止
    };
}

 

 main.cc

#include <iostream>	
#include <string>
#include <memory>
#include "LOG.hpp"
#include "threadpool.hpp"	
#include "Task.hpp"	
#include<ctime>

        
int main()
{
	srand(time(nullptr) ^ getpid() ^ pthread_self());
	std::unique_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>(5)); 
	tp->InitThreadPool();
	tp->Start();
	
	int tasknum=3;
	while(tasknum)
	{
		int a = rand() % 12 + 1;
		usleep(1000);
		int b = rand() % 4 + 1;
		Task t(a, b);
		LOG(INFO, "主线程推送任务: %s", t.DebugToString().c_str());
		tp->Enqueue(t);
		sleep(1);
		tasknum--;
	}

	tp->Stop();
    tp->Wait();
    return 0;
}

相关推荐

  1. Linux线

    2024-07-10 04:54:03       70 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-10 04:54:03       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-10 04:54:03       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-10 04:54:03       57 阅读
  4. Python语言-面向对象

    2024-07-10 04:54:03       68 阅读

热门阅读

  1. Ubuntu上如何安装nvm包管理器

    2024-07-10 04:54:03       24 阅读
  2. python项目常见使用的传参调试方法

    2024-07-10 04:54:03       32 阅读
  3. 深入理解Spring Boot中的数据库优化

    2024-07-10 04:54:03       27 阅读
  4. HOW - React Router v6.x Feature 实践(react-router-dom)

    2024-07-10 04:54:03       23 阅读
  5. Mysql:时区问题

    2024-07-10 04:54:03       19 阅读
  6. WebSocket 双向通信

    2024-07-10 04:54:03       24 阅读
  7. 3102.最小化曼哈顿距离

    2024-07-10 04:54:03       25 阅读
  8. Power BI数据分析可视化实战培训

    2024-07-10 04:54:03       21 阅读
  9. Python文字数字转换利器: word2number库详解

    2024-07-10 04:54:03       30 阅读
  10. 在Spring Boot项目中使用Leyden

    2024-07-10 04:54:03       26 阅读