多线程详解

线程

1.线程的概念

线程是进程内部的一个执行分支。 线程是CPU调度的基本单位

2.线程的理解

1.v1

需要知道:

正文代码在进程中,是串行调用的。

如果需要并行调用,就需要多个执行流。

2.v2

在Linux系统的“线程”

所以:Linux下的线程并不是一个新的数据结构。

复用了进程PCB,而且是在进程地址空间内新建的,可以访问进程地址空间的所有资源,因为进程的创建是费时费空间的。

把Linux中的线程叫做 轻量级进程。

线程就可以执行正文代码中不同的区域。

3.v3

进程在内核角度的概念:承担分配系统资源的基本实体

所以在调度的角度:线程

4.v4 地址空间第四谈

首先:对于物理内存而言,它是被划分了一块一块的,每一块大小是4KB,而磁盘上的程序加载到内存中的大小是多少呢?

因为磁盘文件系统中的block块数组,就把数据也划分为了每一块每一块也为4KB,所以传入内存中的大小也是4KB。

把内存和磁盘中一块一块为基本单位的空间叫做:页框 / 页帧

其次:操作系统需要管理物理内存,所以也会有对应的数据结构,对内存的管理,变成了对数组的增删查改。

最后:对于真正的页表而言,肯定不是一个平常对应的页表,因为虚拟地址+物理地址+权限标记位 == 10个字节

2^32 * 10 就是40G,页表不可能有这么大的,所以有一套特殊规则在里面:

对于一个虚拟地址: 前10位 对应页目录中的地址 中10位 对应页表的页表项的地址 后12位 刚好2^12次方是4KB,所以就是页内偏移, 找一个物理地址: 通过页表项(页框)对应的起始物理地址+虚拟地址后12位对应的数据,就可以访问整个页框的数据。

线程的控制

1.创建线程

1.pthread.h头文件

理解:pthread库不属于c也不属于c++,是Linux系统自带的,所以以后编译时需要链接这个库。

2.tid和LWP

注意:ps -aL 可以看到执行流的 PID 和 LWP(轻量级进程)

理解:tid是用户层的id,LWP是在内核中标识线程的id,两者一一对应。

3.主线程退出

主线程退出 == 进程退出,所以:

  1. 我们往往需要让 main thread最后退出
  2. 线程退出也要被" wait ",不然可能会出现内存泄漏。

2.线程等待

返回值为 0,说明等待成功。

pthread_join的第二个参数是输出型参数,用来接收new pthread的返回值。

看两个现象:

1.定义全局变量g_val,让new pthread给它++,main 和 new pthread地址一样,而且都可以看到g_val的修改。

2.new pthread故意野指针访问,直接进程退出! 因为信号是发给进程的。

总结:

1.所有线程共用一张地址空间。

2.信号杀掉的是进程,所以多线程的代码是不健壮的。

3.线程终止

  1. pthread_cancel

在main pthread 中用的,参数是要取消线程的 tid。

线程的优点

1.线程切换比进程切换要快

单论上下文数据的保存,线程只比进程快一点。

CPU直接从内存中存取数据要等待一定时间周期,Cache(硬件)中保存着CPU刚用过或循环使用的一部分数据,当CPU再次使用该部分数据时可从Cache中直接调用,这样就减少了CPU的等待时间,提高了系统的效率。

理解:cache就是存储现在读到的代码往后一段的物理内存,方便下次调用CPU直接在cache里面取,提高效率,而如果切换进程B,cache中的A进程数据全部失效,所以需要清空,再重新载入B的,这个过程很漫长,而线程是资源共享的,所以cache中的资源不需要被丢弃,切换线程,里面的数据也大概率能用。

2.线程的私有

  1. 线程的硬件上下文数据(cpu寄存器的值)(调度)
  2. 线程的独立栈结构(正常运行) (线程的临时变量是存在自己独立的栈上的)

3.线程的共享

  1. 代码和全局数据
  2. 进程的文件描述符表

线程问题

  1. 一个线程出问题,可能会导致其他线程也出问题,导致整个进程的退出。————线程安全问题
  2. 多线程中,公共函数如果被多个线程同时进入————该函数被重入了。

多线程

1.创建

问题是:为什么右边打印的结果只有4和5被打印出了?

    1. 首先,要知道每个线程的临时变量是在自己的独立栈上的,所以给每一个线程传参时,可以接收到不一样的值。
    2. 其次,pthreadname是指针,指向buffer的起始地址,而这几个线程是并行的,所以等一个线程读取buffer时,可能另一个线程正在对buffer进行写入所以在main不sleep的情况下就会出现覆盖的情况。(sleep的话可以避免,因为一个线程打印完了,下一个才开始写入buffer)
    3. 最后,那如果要在本质解决问题呢?

解决方法就是:把buffer动态开辟在堆上,所以虽然都是访问一个堆,但是访问的是不同地方的buffer。

2.用类管理并分配任务

#include <iostream>
#include <pthread.h>
#include <vector>
#include <unistd.h>
#include <string>
using namespace std;
const int pthreadnum = 5;

class Task
{
public:
    Task()
    {
    }
    void SetData(int x, int y)
    {
        datax = x;
        datay = y;
    }
    int add()
    {
        return datax + datay;
    }
    ~Task()
    {
    }

private:
    int datax;
    int datay;
};

class pthread : public Task
{
public:
    pthread(int x, int y, string& pthreadname)
        : _pthreadname(pthreadname)
    {
        _t.SetData(x, y);
    }

    string getname()
    {
        return _pthreadname;
    }
    int task()
    {
        return _t.add();
    }

    ~pthread()
    {
    }

private:
    string _pthreadname;
    Task _t;
};



class Result
{
public:
    Result()
    {}
    ~Result()
    {}
    void SetResult(int result, const std::string &threadname)
    {
        _result = result;
        _threadname = threadname;
    }
    void Print()
    {
        std::cout << _threadname << " : " << _result << std::endl;
    }
private:
    int _result;
    string _threadname;
};


void *handler(void *arg)
{
    pthread *p = static_cast<pthread *>(arg);
    string name = p->getname();
    int ret = p->task();

    Result * re = new Result();
    re->SetResult(ret,name);
    //cout << "name:" << name << "add ret:" << ret <<endl;

    delete p;
    return re;
}


int main()
{
    vector<pthread_t> pthreads;
    for (int i = 0; i < pthreadnum; i++)
    {
        char buffer[64];
        // char* buffer = new char[64];
        snprintf(buffer, 64, "pthread:%d", i + 1);
        string s(buffer);
        pthread *p = new pthread(10, 20, s);

        pthread_t tid;
        pthread_create(&tid, nullptr, handler, p);
        pthreads.push_back(tid);
    }

    void* ret = nullptr;
    vector<Result*> result;
    for (auto& e : pthreads)
    {
        //拿到线程运行结果
        pthread_join(e, &ret);
        result.push_back((Result*)ret);
    }

    for(auto& e : result)
    {
        e->Print();
        delete e;
    }
}

Task这个类是用来作为pthread的成员来创造的,它的作用是创建任务。

pthread这个类是可以理解为每一个线程的属性+任务,它的构造就是传参给Task类型的对象初始化,并且还可初始化其他的线程属性,比如名字......,用户通过pthread类的对象来调用任务+获取属性。

result这个类的创建是因为在创建线程后,handler线程函数的返回值要被获取,那多个线程的返回值在"wait"结束后都要被获取的话,就创建一个类,可以接收线程函数的返回值。

c++线程和Linux线程

1.结论

#include <thread> //c++
string ToHex(pthread_t tid)
{
    char id[64];
    snprintf(id, sizeof(id), "0x%lx", tid);
    return id;
}

void* threadrun(int count)
{
    while(count--)
    {
        cout<< "I am new thread:"<<" tid: "<<ToHex(pthread_self())<<endl;
        sleep(1);
    }
    cout<<"new thread end"<<endl;
    return nullptr;
}

int main()
{

    thread t1(threadrun,5);
    while(1)
    {
        cout << "I am main thread" <<" tid: "<<ToHex(pthread_self())<<endl;
        sleep(1);
    }

    t1.join();
}

线程分离

1.接口

2.现象

主线程,和新线程都可以对自己进行分离,分离之后,主线程不需要再回收(join)了,否则会失败。

3.结论

新线程分离,只是一种状态,让main不再干预它的结束,但是新线程还和主线程在同一地址空间里。

pthread动态库

1.谁来管理线程

2.库来管理线程

局部存储示例:

封装一个pthread

1.小封装

namespace thread
{
    template<class T>
    using func_t = function<void(T&)>;

    template<class T>
    class thread
    {
        thread(func_t<T> func,T data,string name = "none name")
        :_func(func)
        ,_data(data)
        ,_name(name)
        ,stop(true)
        {}
        ~thread()
        {}
        //下面有解决了的代码,这个不对。
        void* threadrun(void* args)
        {
            _func(_data);
            
        }
        string getname()
        {
            return _name;
        }
        bool start()
        {
            int n = pthread_create(&_tid,nullptr,threadrun,nullptr);
            if(n == 0)
            {
                stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detech()
        {
            if(stop == false)
            {
                pthread_detach(_tid);
            }
        }
        void join()
        {
            if(stop == false)
            {
                pthread_join(_tid,nullptr);
            }
        }
        void Stop()
        {
            stop = true;
        }
    private:
        pthread_t _tid;
        string _name;
        func_t<T> _func;
        T& _data;
        bool stop;
    };
};
  1. 问题:此时编译是会报错的,原因出在threadrun 上:

因为是在类里,类成员函数是有一个形参this指针的,而threadrun只能有一个参数。

解决:把threadrun变成静态的。

  1. 问题:_func 和 _data都是类里的成员,没有this指针怎么访问

解决:借threadrun的参数传入一个this指针呗。

为啥要再开一个Excute封装:因为_func(_data)是私有的,需要this指向。

看看测试:

注意:其实包装器那里的模板也可换成可变参数的模板。

互斥

1.抢票

int g_tickets = 10000;

class ThreadData
{
public:
    ThreadData(int& tickets,string name)
    :_tickets(tickets)
    ,_name(name)
    ,total(0)
    {}
    ~ThreadData()
    {}
public:
    int& _tickets;
    string _name;
    int total;
};

void route(ThreadData* t)
{
    while(1)
    {
        if(t->_tickets > 0)
        {
            usleep(100);
            printf("%s:running , get tickets:%d\n",t->_name.c_str(),t->_tickets);
            t->_tickets--;
            t->total++;
        }
        else
        {
            break;
        }
    }
}



int main()
{
    vector<thread<ThreadData*>> threads;
    vector<ThreadData*> data;
    // 1. 创建一批线程
    for (int i = 0; i < 4; i++)
    {
        std::string name = "thread-" + std::to_string(i + 1);
        ThreadData* t = new ThreadData(g_tickets,name);
        threads.emplace_back(route, t, name);
        data.emplace_back(t);
    }

    // 2. 启动 一批线程
    for (auto& thread : threads)
    {
        thread.start();
    }

    // 3. 等待一批线程
    for (auto& thread : threads)
    {
        thread.join();
        //std::cout << "wait thread done, thread is: " << thread.getname() << std::endl;
    }
    
    //4.打印总数
    for(auto& t : data)
    {
        cout << t->_name << " total is : " << t->total<<endl;
    }

结果:会发现最后的票数居然 - 到了负数,为什么呢?

2.解释

当tickets == 1时,在usleep时,被调到等待队列里,让其他线程进来了。而就在判断 >0时,就会进行 CPU->内存 的工作,而被放到等待队列后,寄存器里的值也被当成线程的上下文被保护起来了(所以都拿到了1)

等到又该thread -1进来,就开始继续向下执行,接着是 -2 -3,因为已经判断过了,所以也会向下进行,就导致3次tickets--。

抢到负数的原因是:对共享资源的操作不是原子的。

3.加锁解决问题

小封装一下锁:

class lockgroup
{
public:
    lockgroup(pthread_mutex_t mutex)
    :_mutex(mutex)
    {}
    ~lockgroup()
    {}
    void lock()
    {
        pthread_mutex_lock(&_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&_mutex);
    }
public:
    pthread_mutex_t _mutex;
};

像这样,只允许一个线程访问共享资源的现象就叫做互斥。

4.互斥底层实现

单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题

为了实现互斥锁操作,大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单元的数据相交换,由于只有一条指令,保证了原子性。

切出去,CPU寄存器里的值被保存,就属于线程的硬件上下文,是一个线程所私有的。

线程2被切出去,保存寄存器里的值为 0.

而此时假设没有其他进程了,thread -1又开始被调度,判断 1 > 0 返回。

而又到了thread -2,它的值为0,所以被挂起等待。

因此,交换的本质:不是拷贝到寄存器,而是在争同一个1.

同步

1.概念

我从自习室出来,不能直接再返回去,而是排队到末尾,等着。

做到了:在保证了共享资源安全的情况下,让所有的线程访问临界资源(共享资源)有了一定的顺序性。

2.条件变量 ------ 实现同步的方式

让朋友去取苹果,因为加锁,所以每次只允许一个人取,而取走之后,没有苹果了,朋友去到排队队列里,我再放苹果,等到放完之后敲铃铛,再让朋友来取苹果。 这一系列操作:就是主控线程控制其他线程访问,这样不会造成某个线程竞争锁的能力太强而导致其他线程长时间无法执行的饥饿问题。 条件变量 = 铃铛 + 等待队列。

3.认识接口 + 结合同步和互斥

#include "pthread.hpp"

using namespace Thread;

pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;


void* SlaverCore(void* args)
{
    string name = static_cast<char*>(args);

    while(1)
    {
        //加锁
        pthread_mutex_lock(&gmutex);
        cout<< "创建slaver成功: "<<name<<endl;
        //访问共享资源

        //去到等待队列
        pthread_cond_wait(&gcond,&gmutex);
        cout<< "现在被唤醒的线程是:"<<name<<endl;
        //解锁
        pthread_mutex_unlock(&gmutex);
    }
}


void* MasterCore(void* args)
{
    sleep(2);
    string name = static_cast<char*>(args);
    while(1)
    {
        pthread_cond_signal(&gcond);
        cout<<"master 唤醒一个线程"<<endl;
        sleep(1);
    }
}


void StartSlaver(vector<pthread_t>* threads,int num)
{
    for(int i = 0;i<num;i++)
    {
        pthread_t tid;
        char* ch = new char[64];
        snprintf(ch,64,"thread-slaver-%d",i+1);
        int n = pthread_create(&tid,nullptr,SlaverCore,ch);
        if(n == 0)
            cout<<"Start Slaver Success"<<endl;
        threads->emplace_back(tid);
    }
}


void StartMaster(vector<pthread_t>* threads)
{
    pthread_t tid;
    int n = pthread_create(&tid,nullptr,MasterCore,(void*)"thread-master");
    if(n == 0)
        cout<<"Start Master Success"<<endl;
    threads->emplace_back(tid);
}


void Waitphread(vector<pthread_t>& threads)
{
    for(auto& thread : threads)
    {
        pthread_join(thread,nullptr);
    }
}

int main()
{
    vector<pthread_t> threads;
    StartMaster(&threads);
    StartSlaver(&threads,3);
    Waitphread(threads);
}

3.生产消费模型

必要点:321. + 理解过程

回顾过来再看:虽然生产者给消费者任务是串行的,但是生产者生产任务和消费者执行任务是并发的!!!

4.阻塞队列

解释:队列为空,消费者不能取数据,队列满了,生产者不能放数据。

好处:

  1. 因为生产消费是并发的过程,所以阻塞队列给它的并发以同步机制(就如上面所说)保证生产消费平衡。
  2. 支持忙闲不均,生产和消费的速度不均,往往生产更快。生产者可以将多余的数据放入队列中等待消费者处理。
template<class T>
class Blockqueue
{
private:
    bool ISFULL()
    {
        return _blockqueue.size() == _cup;
    }
        
    bool ISEMPTY()
    {
        return _blockqueue.size() == 0;
    }
       
public:
    Blockqueue(int cup)
    :_cup(cup)
    {
        pthread_mutex_init(&mutex,nullptr);
        pthread_cond_init(&productor,nullptr);
        pthread_cond_init(&consumer,nullptr);
        int productor_wait_num = 0;
        int consumer_wait_num = 0;
    }
    void Enqueue(const T& data)
    {
        //加锁
        pthread_mutex_lock(&mutex);
        //if(ISFULL())
        while(ISFULL())
        {
            //满了,不能进去生产,需要等消费者消费
            productor_wait_num++;
            pthread_cond_wait(&productor,&mutex);
            productor_wait_num++;
        }
        //不满,生产者生产
        _blockqueue.push(data);
        //通知消费者消费
        if(consumer_wait_num > 0)
        pthread_cond_signal(&consumer);
        //解锁
        pthread_mutex_unlock(&mutex);
        
    }
    void Pop(T* out)
    {
        //加锁
        pthread_mutex_lock(&mutex);
        //if(ISEMPTY())
        while(ISEMPTY())
        {
            //空了,不能进去消费,需要等生产者生产
            consumer_wait_num++;
            pthread_cond_wait(&consumer,&mutex);
            consumer_wait_num++;
        }
        //不空,消费者消费
        *out = _blockqueue.front();
        _blockqueue.pop();
        //通知生产者生产
        if(productor_wait_num > 0)
            pthread_cond_signal(&productor);
        //解锁
        pthread_mutex_unlock(&mutex);
    }
    ~Blockqueue()
    {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&productor);
        pthread_cond_destroy(&consumer);
    }
private:
    queue<T> _blockqueue;
    int _cup; //最多存放
    pthread_mutex_t mutex;
    pthread_cond_t productor;//专门给生产者的
    pthread_cond_t consumer;//专门给消费者的
    
    int productor_wait_num;
    int consumer_wait_num;
};
1.问题
  1. 1个生产,5个消费,pthread_cond_broadcast(): 假设在不休眠的情况下,由于调度不确定的原因,消费1先跑就先被wait住(),生产再继续,而生产的逻辑是生产一个,就pthread_cond_broadcast()(全唤醒的方式)通知消费者消费,而因为消费者较多,在并发跑的时候,,全部阻塞在了Pop()的wait那里,现在生产者生产成功,通知消费者,而消费者中的某一个就会被唤醒,此时又加上了锁,然后向下访问,用完生产资源后,,解锁。而此时因为还有消费者阻塞在wait那里,又会有一个消费者竞争锁成功,继续向下走,但此时生产者的资源已经被挥霍完了,所以再访问一块空的资源就会报错。
  2. 2个生产,5个消费,pthread_cond_signal():即便是一次唤醒一个,但是生产者的生产后的通知频率,是比消费者的等待频率要高的,所以消费者可能刚休眠,就受到了生产通知,唤醒后,继续向下。

条件没有就绪,线程就被唤醒的情况:称作伪唤醒。

2.解决
  1. 判断换成while
  2. 在阻塞队列加入成员变量记录生产,消费等待数量。

5.生产消费代码

直接让阻塞队列中执行任务:

1.传任务的两个方式
  1. 包装器 -->函数对象

注意:两者用一样的名字是不能共存的,所以要注释掉另一方。

//函数对象
using Task = function<void()>;
//类
class Task
{
public:
    Task(){}
    Task(int a,int b):_a(a),_b(b),_result(0)
    {}
    void Excute()
    {
        _result = _a + _b;
    }
    string Add()
    {
        string s =  "_a + _b = ?";
        return s;
    }
    string Result()
    {
        string s =  to_string(_a) + " + " + to_string(_b) + " = " + to_string(_result);
        return s;
    }
    ~Task()
    {}

private:
    int _a;
    int _b;
    int _result;
};
2.main.cc

类型重命名,容易更换类型

using Blockqueue_t = Blockqueue<Task>;

void Productor(Blockqueue_t& bq)
{
    srand(time(nullptr));
    while (1)
    {
        sleep(1);
        //1.给任务
        int a = rand() % 10 + 1;
        usleep(1000);
        int b = rand() % 20 + 1;
        Task t(a,b);
        //2.通知消费者做任务
        bq.Enqueue(t);
        cout << "Productor->" << t.Add() << endl;
    }
}

void Consumer(Blockqueue_t& bq)
{
    while (1)
    {
        sleep(1);
        //1.取任务
        Task t;
        bq.Pop(&t);
        //2.做任务
        t.Excute();
        cout << "Consumer->" << t.Result() << endl;
    }
}


//创建线程 ————封装
void comm(vector<thread<Blockqueue_t>> *threads,int num,Blockqueue_t bq,func_t<Blockqueue_t> func)
{
    // 创建一批线程
    for (int i = 0; i < num; i++)
    {
        string name = "thread-" + to_string(i + 1);
        // thread<int> pro(ProductorCore,num,name);
        // threads->emplace_back(pro);
        threads->emplace_back(func, bq, name);
        // 启动线程
        threads->back().start();
    }
}
void StartProductor(vector<thread<Blockqueue_t>> *threads, int num,Blockqueue_t bq)
{
    comm(threads,num,bq,Productor);
}

void StartConsumer(vector<thread<Blockqueue_t>> *threads, int num,Blockqueue_t bq)
{
    comm(threads,num,bq,Consumer);
}

void WaitPhread(vector<thread<Blockqueue_t>>& threads)
{
    for (auto thread : threads)
    {
        thread.join();
    }
}

int main()
{
    Blockqueue_t* bq = new Blockqueue_t(5);
    vector<thread<Blockqueue_t>> threads;
    StartProductor(&threads, 1,*bq);
    StartConsumer(&threads, 1,*bq);
    WaitPhread(threads);
}

接着——>回顾生产消费。

6.信号量解决生产消费

通过前面进程间通信,已经认识到了信号量的相关知识。

简单来说:相当于一把计数器(因为资源数量有限,申请成功资源数--,计数器++),而申请到信号量相当于申请到了资源,有效减少内部判断,不像阻塞队列需要在临界区判断。

1.环形队列

用来在这里实现多线程的生产消费资源管理的。

2.生产者消费者要做的事情

3.单生产单消费的环形队列
#include <semaphore.h>

using namespace std;

template<class T>
class Ringqueue
{
private:
    void P(sem_t& sem)
    {
        //申请到资源,信号量--
        sem_wait(&sem);
    }
    
    void V(sem_t& sem)
    {
        //归还资源,信号量++
        sem_post(&sem);
    }

public:
    Ringqueue(int cup)
    :_cup(cup)
    ,_ringqueue(cup)
    ,productor_index(0)
    ,consumer_index(0)
    {
        sem_init(&_room,0,_cup);
        sem_init(&_data,0,0);
    }
    void Enqueue(T in)//生产者
    {
        P(_room);
        _ringqueue[productor_index] = in;
        productor_index++;
        productor_index %= _cup;
        //生产完,数据资源多了一个
        V(_data);
    }
    void Pop(T* out)
    {
        P(_data);
        *out = _ringqueue[consumer_index];
        consumer_index++;
        consumer_index %= _cup;
        //消费完,房间多一个
        V(_room);
    }
    ~Ringqueue()
    {
        sem_destroy(&_room);
        sem_destroy(&_data);
    }
private:
    vector<T> _ringqueue;
    int _cup; //最大容量

    int productor_index;
    int consumer_index;

    sem_t _room;
    sem_t _data;
};

main.cc有了改动

//创建线程 ————封装
void comm(vector<thread<Ringqueue_t>> *threads,int num,Ringqueue_t rq,func_t<Ringqueue_t> func)
{
    // 创建一批线程
    for (int i = 0; i < num; i++)
    {
        string name = "thread-" + to_string(i + 1);
        // thread<int> pro(ProductorCore,num,name);
        // threads->emplace_back(pro);
        threads->emplace_back(func, rq, name);
        // 启动线程
        threads->back().start();
    }
}

注意:上面的写法是会段错误的。

解释:主线程跑到start()——>threadroutine()——>Excute()

而新线程也在跑,vector里的指针本来指向第一个,现在又进来一个,指向了新的线程,而在emplace_back()还没有构建好内部属性时进行了野指针访问(Thread对象还没有构建好)。

解决:初始化和启动分开写

//创建线程 ————封装
void Initcomm(vector<thread<Ringqueue_t>> *threads,int num,Ringqueue_t rq,func_t<Ringqueue_t> func)
{
    // 创建一批线程
    for (int i = 0; i < num; i++)
    {
        string name = "thread-" + to_string(i + 1);
        // thread<int> pro(ProductorCore,num,name);
        // threads->emplace_back(pro);
        threads->emplace_back(func, rq, name);
        // 启动线程
        //threads->back().start();
    }
}
void InitStartProductor(vector<thread<Ringqueue_t>> *threads, int num,Ringqueue_t rq)
{
    Initcomm(threads,num,rq,Productor);

}

void InitStartConsumer(vector<thread<Ringqueue_t>> *threads, int num,Ringqueue_t rq)
{
    Initcomm(threads,num,rq,Consumer);
}

void StartAll(vector<thread<Ringqueue_t>>& threads)
{
    for(auto& thread:threads)
    {
        cout << "start-> " << thread.name() << endl;
        thread.start();
    }
}

void WaitPhread(vector<thread<Ringqueue_t>>& threads)
{
    for (auto& thread : threads)
    {
        thread.join();
    }
}

int main()
{
    Ringqueue_t* rq = new Ringqueue_t(5);
    vector<thread<Ringqueue_t>> threads;
    InitStartProductor(&threads, 1,*rq);
    InitStartConsumer(&threads, 1,*rq);
    StartAll(threads);

    WaitPhread(threads);
}

4.多生产多消费的环形队列

template<typename T>
class RingQueue
{
private:
    void P(sem_t& sem)
    {
        //申请到资源,信号量--
        sem_wait(&sem);
    }
    
    void V(sem_t& sem)
    {
        //归还资源,信号量++
        sem_post(&sem);
    }
    
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap)
    : _ring_queue(cap)
    , _cap(cap)
    , _productor_step(0)
    , _consumer_step(0)
    {
        sem_init(&_room_sem, 0, _cap);
        sem_init(&_data_sem, 0, 0);

        pthread_mutex_init(&_productor_mutex, nullptr);
        pthread_mutex_init(&_consumer_mutex, nullptr);
    }
    void Enqueue(const T &in)
    {
        // 生产行为
        P(_room_sem);
        Lock(_productor_mutex);
        // 一定有空间!!!
        _ring_queue[_productor_step++] = in; // 生产
        _productor_step %= _cap;
        Unlock(_productor_mutex);
        V(_data_sem);
    }
    void Pop(T *out)
    {
        // 消费行为
        P(_data_sem);
        Lock(_consumer_mutex);
        *out = _ring_queue[_consumer_step++];
        _consumer_step %= _cap;
        Unlock(_consumer_mutex);
        V(_room_sem);
    }
    ~RingQueue()
    {
        sem_destroy(&_room_sem);
        sem_destroy(&_data_sem);

        pthread_mutex_destroy(&_productor_mutex);
        pthread_mutex_destroy(&_consumer_mutex);
    }
private:
    // 1. 环形队列
    std::vector<T> _ring_queue;
    int _cap; // 环形队列的容量上限

    // 2. 生产和消费的下标
    int _productor_step;
    int _consumer_step;

    // 3. 定义信号量
    sem_t _room_sem; // 生产者关心
    sem_t _data_sem; // 消费者关心

    // 4. 定义锁,维护多生产多消费之间的互斥关系
    pthread_mutex_t _productor_mutex;
    pthread_mutex_t _consumer_mutex;
};

解释:

生产和消费之间:

因为有信号量,所以队列为满,只能消费者先跑;队列为空,只能生产者先跑。————同步

而第一次开始,生产和消费指向同一个位置(为空),如果消费者先跑,信号量为0,只能阻塞住,只能让生产者先跑。————互斥

生产者与生产者之间:

加锁互斥————生产者之间竞争锁,竞争胜利再进行生产

消费者与消费者之间:

加锁互斥————消费者之间竞争锁,竞争胜利再进行消费

而如果为空为满————>就变成了生产和消费之间的互斥。

线程池

1.线程池主体

#include <iostream>
#include <vector>
#include <queue>
#include "pthread.hpp"
#include <pthread.h>
#include <string>

using namespace std;
using namespace ThreadModule;

template<class T>
class thread_pool
{
public:
    thread_pool(int threadnum)
    :_threadnum(threadnum)
    ,_waitnum(0)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
    }

    void Print()
    {
        while(1)
        {
            cout <<"haha"<<endl;
        }
    }

    void Init()
    {
        for(int i = 0;i<_threadnum;i++)
        {
            string name = "thread-" + to_string(i+1);
            _threads.emplace_back(Print,name);
        }
    }

    void Start()
    {
        for(auto& thread : _threads)
        {
            thread.Start();
        }
    }
    ~thread_pool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    vector<Thread> _threads;
    int _threadnum;
    queue<T> _taskqueue;

    //互斥 + 同步
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int _waitnum;
};

里面有个小问题:关于func传参的问题

在这个文件夹,已经把Thread的模板去掉了,所以func_t 时void()类型的。

但是此时Print会报错

原因是:Print是无参的函数,但是Print是在类内的,包含this指针,所以线程调用_func会出错(类型不匹配,多了个参数)

解决:绑定! 给类内的Print绑定一个this指针,this在这里变成Print的默认参数,所以传给thread的_func还是与void()一致

接下来继续:要解决的任务肯定不是Print,所以再修改,且多加一个成员。

直接完全体:

#include <iostream>
#include <vector>
#include <queue>
#include "pthread.hpp"
#include <pthread.h>
#include <string>
#include <time.h>
#include "LOG.hpp"
#include "Task copy.hpp"

using namespace std;
using namespace ThreadModule;

template <class T>
class thread_pool
{
private:
    void Lockqueue()
    {
        pthread_mutex_lock(&_mutex);
    }

    void Unlockqueue()
    {
        pthread_mutex_unlock(&_mutex);
    }

    void Sleep()
    {
        pthread_cond_wait(&_cond, &_mutex);
    }

    void Wakeup()
    {
        pthread_cond_signal(&_cond);
    }

    void WakeAll()
    {
        pthread_cond_broadcast(&_cond);
    }
public:
    thread_pool(int threadnum)
        : _threadnum(threadnum), _waitnum(0), _isrunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        LOG(1,g_issave,"Thread_pool Construct Success");
    }
    // 处理任务
    void Handler(string name)
    {
        // 上锁保护
        Lockqueue();
        while (1)
        {
            // 队列如果没任务,线程是启动了的,需要等待
            while (_taskqueue.empty() && _isrunning)
            {
                _waitnum++;
                Sleep();
                _waitnum--;
            }
            //1.队列没任务,线程池也结束了
            if (_taskqueue.empty() && !_isrunning)
            {
                Unlockqueue();
                break;
            }
            //2.队列有任务,线程池没结束
            //3.队列有任务,线程池结束了————处理完所有任务再退出
            // 有任务,拿出来
            T t = _taskqueue.front();
            _taskqueue.pop();
            Unlockqueue();
            // 处理任务,这个任务属于这个线程独占的任务,不属于共享资源了。
            LOG(1,g_issave,"%s Handler Task",name.c_str());
            t();
            LOG(1,g_issave,"%s Handler Success ,result is : %s",name.c_str(),t.Result().c_str());
        }
    }

    void Init()
    {
        for (int i = 0; i < _threadnum; i++)
        {
            string name = "thread-" + to_string(i + 1);
            _threads.emplace_back(bind(&thread_pool::Handler, this,placeholders::_1), name);
            LOG(1,g_issave,"%s Init Success",name.c_str());
        }
        _isrunning = true;

    }
    // push任务
    bool Enqueue(const T &in)
    {
        Lockqueue();
        bool ret = false;
        if (_isrunning)
        {
            _taskqueue.push(in);
            // 有任务了,唤醒线程
            if (_waitnum > 0)
            {
                Wakeup();
            }
            ret = true;
        }

        Unlockqueue();

        return ret;
    }

    void Start()
    {
        for (auto &thread : _threads)
        {
            thread.Start();
            LOG(1,g_issave,"%s Start Success",thread.name().c_str());
        }
    }

    void Stop()
    {
        Lockqueue();
        
        _isrunning = false;
        //停止之后,唤醒所有线程,处理任务队列中剩下的任务
        WakeAll();

        Unlockqueue();
    }
    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
            LOG(1,g_issave,"%s Wait Success",thread.name().c_str());
        }
    }
    ~thread_pool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
        LOG(1,g_issave,"Thread_pool Destory Success");
    }

private:
    vector<Thread> _threads;
    int _threadnum;
    queue<T> _taskqueue;

    // 互斥 + 同步
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;

    int _waitnum;

    bool _isrunning;
};

2.加入Log日志

  • 日志是有等级的
  • 日志是有格式的 日志等级 时间 代码所在的文件名/行数 日志的内容....
using namespace std;

enum Level
{
    DEBUG = 0,
    INFO,
    WARNING,
    ERROR,
    FATAL
};
//获取等级
string GetLevel(int level)
{
    switch (level)
    {
    case 0 : return "DEBUG";
    case 1 : return "INFO";
    case 2 : return "WARNING";
    case 3 : return "ERROR";
    case 4 : return "FATAL";
    default:
        return "NONE";
    }
}
//获取时间
string GetTime()
{
    char buffer[1024];

    time_t t = time(nullptr);
    struct tm* tm = localtime(&t);

    snprintf(buffer,sizeof(buffer),"%d-%d-%d %d:%d:%d",
    tm->tm_year + 1900,
    tm->tm_mon + 1,
    tm->tm_mday,
    tm->tm_hour,
    tm->tm_min,
    tm->tm_sec);

    return buffer;
}

string Logstring(string name,int line,int level)
{
    string levelstr = GetLevel(level);
    string timestr = GetTime();

    cout << levelstr << ":" << timestr <<endl;
}

关于时间:用到了两个函数:

也就是说:time()计算当前的时间戳,localtime()把时间戳转为结构体——>包含一系列可用字段(年月日时分秒....)。

可变参数部分解决日志内容:

vsnprintf可以让ap(可变参数部分)以format的格式用指定的大小填入str中。

1.加入宏:

__VA_ARGS__是C/C++预处理器中的一个特殊标识符,用于在宏定义中表示可变数量的参数。

ret没有传到可变参数部分就结束了,通常会报错。

  1. 加##就是为了可以让__VA_ARGS__有可变参数就接收,没有也不报错。
  2. 加 \ 是因为当宏定义的内容过长,无法在同一行显示时,可以在行尾使用 \ 来表示下一行的内容仍然是当前宏定义的一部分。
  3. 加do ...while是因为可以让宏进行 整块 替换

2.加入全局变量g_issave

可以用来决定是在显示器打印 还是 打印到文件中

3.测试线程池

int main()
{
    EnableFile();
    //LOG(1,g_issave,"hhhhh");
    //智能指针 c++11构建  new一个thread_pool类型的对象来构造
    //unique_ptr<thread_pool<int>> pool(new thread_pool<int>(10));
    //智能指针 c++14构建 make_unique返回一个thread_pool类型的对象
    unique_ptr<thread_pool<Task>> pool = make_unique<thread_pool<Task>>(4);

    pool->Init();

    pool->Start();

    srand(time(nullptr)^ getpid());
    int tasknum = 10;
    while(tasknum--)
    {
        int a = rand() %10 + 1;
        int b = rand() %5 + 1;
        usleep(1000);
        Task t(a,b);
        LOG(INFO, "main thread push task: %s", t.Add().c_str());  
        pool->Enqueue(t);
    }

    sleep(3);

    pool->Stop();

    pool->Wait();

线程一些概念补充

1.线程安全

线程安全:多个线程并发同一段代码时,会出现不同的结果。常见对全局变量或者静态变量进行操作,

并且没有锁保护的情况下,会出现该问题。

重入:同一个函数被不同的执行流调用,当前一个流程还没有执行完,就有其他的执行流再次进入,我们

称之为重入。一个函数在重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重

入函数,否则,是不可重入函数。

STL默认不是线程安全的,因为追求性能

智能指针中:unique_ptr,只是在当前代码块范围内生效,不涉及线程安全问题

对于shared_ptr多个对象需要共用一个引用计数变量, 所以会存在线程安全问题.

2.死锁

死锁:指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资

源而处于的一种永久等待状态。(就是用锁不合理,导致代码不会正常推进的问题)

死锁四个必要条件

  • 互斥条件:一个资源每次只能被一个执行流使用

就是要有锁

  • 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

就是线程A访问锁2失败会阻塞挂起,导致自己资源保持不放

  • 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

就是默认申请对方锁的时候,不能抢占对方的锁。

  • 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

就是锁1需要锁2,锁2需要锁1,但是锁1要申请锁2失败被挂起,锁2要锁1时找不到。成环形了。

解决死锁:

就是破坏死锁的四个必要条件:

  • 不用锁
  • 只请求,请求失败把我自己的锁释放掉
  • 可剥夺锁
  • 可以把对方的锁解锁掉

3.自旋锁

  • 如果申请失败等待时间长(比如IO操作):让后面线程阻塞挂起
  • 如果申请失败等待时间短(比如抢票操作):让后面线程一直申请锁直到成功——>自旋

4.读者写者问题

有线程向公共资源写入,有线程从公共资源读取。

也有“321”原则

但是在“3”上与生产消费者模型不同:读者和读者之间没有关系——>因为读者不会把数据拿走,而是数据的拷贝

伪代码:理解加锁逻辑——>因为默认写者比读者多,不能在写者写时读。

注意:读者写者问题默认是读者优先的。

系统库已经把上面的逻辑封装了接口:

相关推荐

  1. python线详解

    2024-06-11 08:54:02       17 阅读
  2. 【python3】线详解

    2024-06-11 08:54:02       30 阅读
  3. JUC与线基础详解

    2024-06-11 08:54:02       11 阅读
  4. C#线编程详细教学

    2024-06-11 08:54:02       21 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-11 08:54:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-11 08:54:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-11 08:54:02       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-11 08:54:02       20 阅读

热门阅读

  1. xgboost导出为pmml模型包

    2024-06-11 08:54:02       9 阅读
  2. 回溯算法练习题(2024/6/10)

    2024-06-11 08:54:02       10 阅读
  3. 28.找零

    28.找零

    2024-06-11 08:54:02      12 阅读
  4. Kubernetes学习总结知识点汇总

    2024-06-11 08:54:02       11 阅读
  5. hw meta10 adb back up DCIM

    2024-06-11 08:54:02       13 阅读
  6. 【Spring Boot】过滤敏感词的两种实现

    2024-06-11 08:54:02       11 阅读
  7. 鼠标侧键映射虚拟桌面切换 —— Win11

    2024-06-11 08:54:02       9 阅读
  8. YOLOv5的predict.py逐句讲解(学习笔记)

    2024-06-11 08:54:02       11 阅读
  9. 递归

    递归

    2024-06-11 08:54:02      14 阅读
  10. OpenZeppelin Ownable合约 怎么使用

    2024-06-11 08:54:02       9 阅读