进程间通信

本节重点:

  • 进程间通信介绍

  • 管道

  • 消息队列

  • 共享内存

  • 信号量

一、进程间通信介绍

1.进程间通信概念以及误区

        两个进程之间,可以进行数据的直接传递吗?不行,不对呀,我们的父进程再fork之后,子进程就有了父进程的数据,这难道不是父进程将自己的数据传递给子进程吗?

在操作系统中,当父进程通过fork()系统调用创建子进程时,子进程确实会获得父进程数据的一个副本,但这并不构成严格意义上的“数据传递”。更准确地说,这是一个数据复制的过程,而不是数据传递。“数据传递”通常指的是进程间通过某种通信机制(如管道、消息队列、共享内存等)来交换数据。这种传递可以是多次的,并且涉及数据的发送和接收操作。

⭐总结来说,两个进程之间不能直接传递数据,因为进程具有独立性,所以要想实现数据传递,就要通过特定的进程间通信机制来实现数据的交换和共享。而fork()创建子进程的过程是一个数据复制的过程,不是数据传递。

父子进程尚且如此,那么没有关系的两个进程更不能直接进行数据的直接传递!!!

进程间通信就是指在不同进程之间传播或交换信息。由于各个进程之间是相互独立的,拥有自己的内存空间和系统资源,因此不能直接在内存中进行数据交换,它们之间的通信必须借助操作系统提供的特殊机制来实现。进程间通信是操作系统提供的基础服务之一,是系统中进程之间相互合作的重要方法。

总结进程间通信就是:一个进程把自己的数据,能够交给另一个进程

2.进程间通信目的

  • 数据传输:一个进程需要将它的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源(打游戏的时候共享地图资源)
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另 一个进程的所有陷入和异常,并能够及时知道它的状态改变。

总结进程间通信目的:往往需要多个进程协同,共同完成一些事情

进程间通信的一般规律

必须存在交换数据的空间(内存),该空间不能由通信双方任何一个提供!!!

进程间通信的本质就是:进程间通信必须有一份交换数据的空间,让不同的进程,都看到同一份空间(一般都是由OS提供),操作系统提供的内存空间通常被称为“共享内存”或“内核缓冲区”。当进程需要进行通信时,它们会向操作系统申请使用这些共享内存区域。操作系统会负责分配、保护和回收这些内存空间,确保每个进程只能访问其被授权访问的部分,并且通信过程不会干扰到其他进程的正常运行。

总结进程间通信的本质就是:先让不同的进程,看到同一份资源(一般都是操作系统提供)

3.进程间通信分类

操作系统提供的"空间"有不同的样式,就决定了有不同的通信方式!!!

管道

  • 匿名管道pipe
  • 命名管道

System V IPC

  • System V 消息队列
  • System V 共享内存
  • System V 信号量

POSIX IPC

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

二、管道

1.什么是管道

当创建子进程的时候,子进程并不会继承父进程的文件系统,只会继承文件描述符表,此时仅仅是子进程的指向父进程的文件系统,此时就是浅拷贝,此时父子进程就做到了不同的进程,看到同一份资源(缓冲区,由操作系统提供的空间),struct file是允许多个进程通过指针指向我的!

所以基于文件的,让不同进程看到同一份资源的通信方式,就叫做管道!!!

  • 管道是Unix中最古老的进程间通信的形式。
  • 我们把从一个进程连接到另一个进程的一个数据流称为一个“管道”

此时文件有父子进程两个指向,要想关闭一个指向,就要采用引用计数的方法!!!

既然父进程最后是要关闭w,为什么父进程最开始不是直接关闭w,而是按照rw打开同一个文件呢?

所以按照上面的逻辑,使用管道进行通信需要我们去磁盘打开一个文件去访问吗?不能,为了支持我们的管道通信,使用系统调用pipe(),此时依然和之前一样创建struct file对象,但是此时不会再向磁盘中读取数据加载到内存或者将缓冲区的数据刷新到磁盘中,

#include <unistd.h>
功能:创建一无名管道
原型
int pipe(int fd[2]);
参数
fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端
返回值:成功返回0,失败返回错误代码

匿名管道如何做到让不同的进程看到同一份资源呢?

当父进程通过fork()创建子进程时,子进程会继承父进程的文件描述符。这样,父进程和子进程就都可以通过各自的文件描述符来访问这个文件的共享的缓冲区。所以匿名管道只能是具有血缘关系的进程才能进行进程通信。

2.用fork来共享管道原理

3.站在文件描述符角度-深度理解管道

4.站在内核角度-管道本质

所以,看待管道,就如同看待文件一样!管道的使用和文件一致,迎合了“Linux一切皆文件思想”。

5.管道规则

现在我们就来写一些代码验证管道间是如何通信的

makefile文件:

testpipe:testpipe.c
	gcc -o $@ $^
.PHONY:clean
clean:
	rm testpipe

testpipe.c文件

#include <stdio.h>
#include <unistd.h>
int main()
{
  int pipefd[2];
  int n = pipe(pipefd);
  if(n < 0) return 1;

  printf("pipefd[0]:%d,pipe[1]:%d\n",pipefd[0],pipefd[1]);//3,4
  return 0;
}

然后我们来看一下运行结果:

这里的文件描述符为3和4也没毛病,因为前三个被我们的标准输入,标准输出和标准错误占用了。

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/wait.h>

void writer(int wfd)
{
  const char* str = "hello father,I an child,I am writing!";
  char buffer[128];
  int cnt = 0;
  pid_t pid = getpid();
  while(1)
  {
    // snprintf:把指定内容写入到指定长度char数组中
    snprintf(buffer,sizeof(buffer),"message:%s,pid:%d,count:%d\n",str,pid,cnt);
    write(wfd,buffer,strlen(buffer));//这里不要加上'\0'
    cnt++;
    sleep(2);
  }
}

void reader(int rfd)
{
  char buffer[1024];
  while(1)
  {
    ssize_t n = read(rfd,buffer,sizeof(buffer)-1);
    printf("father get a message:%s",buffer);
  }
}

int main()
{
  //父进程创建管道
  int pipefd[2];
  int n = pipe(pipefd);
  if(n < 0) return 1;

  printf("pipefd[0]:%d,pipe[1]:%d\n",pipefd[0],pipefd[1]);//3,4
  //                                        r         w
  //父进程fork出子进程
  pid_t id = fork();
  if(id == 0)
  {
    //child:w
    close(pipefd[0]);

    writer(pipefd[1]);

    exit(0);
  }
  //father:r
  close(pipefd[1]);
  reader(pipefd[0]);
  wait(NULL);
  return 0;
}

然后我们在命令行输入:while :;do ps axj | head -1 &&  ps axj | grep testpipe | grep -v grep;sleep 1;done来检测我们的运行结果:

如果我们此时让子进程每隔5秒再向管道写内容,而父进程时时刻刻都在读管道文件的内容,此时会发生什么呢?此时父进程在干嘛呢?此时父进程在等待子进程向管道写入数据。

管道内部没有数据&&子进程不关闭自己的写端文件fd, 读端(父)就要阻塞等待,直到pipe有数据

如果我们让子进程一直写,而让父进程不进行读取,此时我们来修改一下代码,顺便看看管道文件的大小。

我们来看一下此时的运行结果:

此时一共是65536,大约是64KB的文件大小!!!

管道内部被写满&&父进程(读端)不关闭自己的fd,写端(子)写满之后,就要阻塞等待

现在我们再来看另一种场景,我们让写端只写十次,然后就退出循环,并且将wfd关闭

此时子进程就退出了,但是父进程依然在读取,所以此时回收子进程的工作并没有进行,所以此时子进程会处于僵尸状态。

 

并且此时我们也能此时read此时的返回值是0,那怎么解决子进程的僵尸状态呢?

此时我们再来看一下结果:

对于写端而言:不写了&&关闭了pipe, 读端会将pipe中的数据读完,最后就会读到返回值为0,表示读结束,类似读到了文件的结尾

下一个场景就是让父进程读十次之后关闭读端文件描述符,而子进程此时一直是在写入的,没有退出,此时现象是什么呢?

我们来看一下结果:

此时我们观察到父子进程都结束了,但是我们的子进程是一个死循环,我们没有给它设置退出循环的代码,此时为什么也退出了呢?此时我们的读端已经关闭了,此时再向管道里面写数据是没有意义的,操作系统是不允许没有意义的事情发生的,操作系统会终止写入的进程

读端不读&&关闭,写端在写,OS会直接终止写入的进程(子进程),通过信号13---SIGPIPE信号杀掉进程

怎么验证呢?我们可以让父进程接收子进程的退出信息

直接运行:

当我们向管道进行写入时,管道有一个大小的设定,并不是我们刚刚的64KB,我们的管道中存在一个值PIPE_BUF,也就是说我们向管道写入4096个字节的数据此时是安全的。

还记得我们上面为了检测输出信息的代码嘛?while :;do ps axj | head -1 &&  ps axj | grep testpipe | grep -v grep;sleep 1;done,我们当时叫[ | ]也叫做管道,这个和我们上面提到的管道有什么区别呢?第一个sleep当作写端,第二个sleep当作读端

那怎么让这个三个进程按照管道的方式进行读写呢?第一个sleep当作写端,把第一个进程的标准输出重定向到管道的写端中,第二个sleep当作读端,并且把第二个进程的标准输入重定向到管道的读端中,随后bash再集体等待这三个进程执行。

6.管道特点

  • 只能用于具有共同祖先的进程(具有亲缘关系的进程)之间进行通信;通常,一个管道由一个进程创 建,然后该进程调用fork,此后父、子进程之间就可应用该管道。
  • 管道提供流式服务
  • 一般而言,进程退出,管道释放,所以管道的生命周期随进程
  • 一般而言,内核会对管道操作进行同步与互斥
  • 管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道

7.管道应用 - 实现进程池

        进程池是一种常见的多进程编程技术,它在程序启动时预先创建一定数量的进程,并将这些进程保存在池中,以备后续使用。当有新的任务需要处理时,程序会从进程池中取出一个空闲的进程来处理该任务。一旦任务处理完毕,该进程会被放回进程池中,等待下一个任务的到来。

        进程池的主要优点在于它避免了频繁地启动和关闭进程的开销,通过预先创建并复用一定数量的进程,显著提高了程序的性能和并发处理能力。这样可以减少系统资源的消耗,提高任务处理效率,特别是在高并发情况下。

开始手撕代码:

makefile:

processpool:processpool.cc
	g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
	rm -f processpool

processpool.cc:

#include <iostream>
#include <string>
#include <unistd.h> 
#include <cstdlib> //等价于<stdlib.h>
#include <vector>

using namespace std;

enum
{
    UsageError = 1,
    ArgError = 2,
    PipeError = 3
};

void Usage(const string &proc)
{
    cout << "Usage:" << proc << "subprocess-num" << endl;
}

class Channel //先组织
{
public:
    Channel(int wfd, int sub_id, string& name)
        :_wfd(wfd)
        ,_sub_process(sub_id)
        ,_name(name)
    {}
    void PrintDebug()
    {
        cout << "_wfd:" << _wfd;
        cout << "_sub_process:" << _sub_process;
        cout << " _name:" <<  _name << endl;
    }
    ~Channel()
    {}
private:
    int _wfd;//写端
    int _sub_process;//关联的子进程id
    string _name;//每个通道起名字
};

// ./processpool 5(进程的数量)
int main(int argc, char* argv[])
{
    if(argc != 2)//传入参数不符合
    {
        Usage(argv[0]);
        return UsageError;
    }   
    
    int sub_process_num = stoi(argv[1]);//获取进程的数量

    if(sub_process_num <= 0) return ArgError;//进程数量不符合

    vector<Channel> channels;
    // 1.创建通信信道和子进程
    for(int i = 0;  i < sub_process_num; ++i)
    {
        int pipefd[2] = {0};//管道
        int n = pipe(pipefd);
        if(n < 0) return PipeError;//创建管道失败

        pid_t id = fork();//创建进程
        if(id == 0)
        {
            //子进程 - 读
            close(pipefd[1]);
            sleep(100);
            exit(0);
        }

        string cname = "channel-" + to_string(i);


        //父进程 - 写
        close(pipefd[0]);

        channels.push_back(Channel(pipefd[1],id,cname));
    }

    // 2.控制子进程
    for(auto& e : channels)
        e.PrintDebug();
    
    sleep(100);   

    // 3.回收子进程
    return 0;
}

我们先来关联一下每个管道与之对应的子进程,直接运行代码关联。

#include <iostream>
#include <string>
#include <unistd.h> 
#include <cstdlib> //等价于<stdlib.h>
#include <vector>
#include <ctime>
#include <sys/types.h>
#include <sys/wait.h>
#include "task.hpp"

using namespace std;

enum
{
    UsageError = 1,
    ArgError = 2,
    PipeError = 3
};

void Usage(const string &proc)
{
    cout << "Usage:" << proc << "subprocess-num" << endl;
}

class Channel //先组织
{
public:
    Channel(int wfd, int sub_id, string& name)
        :_wfd(wfd)
        ,_sub_process(sub_id)
        ,_name(name)
    {}
    void PrintDebug()
    {
        cout << "_wfd: " << _wfd;
        cout << " _sub_process :" << _sub_process;
        cout << " _name: " <<  _name << endl;
    }
    string name() {return _name;} 
    int wfd() {return _wfd;}
    pid_t pid() {return _sub_process;}
    void Close() {close(_wfd);}
    ~Channel()
    {}
private:
    int _wfd;//写端
    pid_t _sub_process;//关联的子进程id
    string _name;//每个通道起名字
};

class ProcessPool //进程池
{
public:
    ProcessPool(int sub_process_num)
        :_sub_process_num(sub_process_num)
    {}
    int CreatProcess(work_t work)//回调函数
    {
        for(int i = 0;  i < _sub_process_num; ++i)
        {
            int pipefd[2] = {0};//管道
            int n = pipe(pipefd);
            if(n < 0) return PipeError;//创建管道失败

            pid_t id = fork();//创建进程
            if(id == 0)
            {
                //子进程 - 读
                close(pipefd[1]);
                // 子进程执行任务
                dup2(pipefd[0],0);// 重定向:从标准输入里面读变成从管道里面读取,0位置重定向为管道文件
                work(pipefd[0]);//目前是输出子进程的pid
                exit(0);
            }

            string cname = "channel-" + to_string(i);

            //父进程 - 写
            close(pipefd[0]);

            // 此时wfd传入的是父进程的写端
            _channels.push_back(Channel(pipefd[1],id,cname));
        }
        return 0;
    }
    int NextChannel()//均衡的选择
    {
        static int next = 0;
        int c =  next;
        next++;
        next %= _channels.size();
        return c;
    }
    void SendTaskCode(int index, uint32_t code)
    {
        cout << "send code:" << code << "to" << _channels[index].name() << "sub process id:" << _channels[index].pid() << endl;  
        write(_channels[index].wfd(),&code,sizeof(code));
    }
    // 所有的子进程退出,只需要关闭所有的w端即可!
    void KillAll()
    {
        for(auto& e : _channels)
        {
            e.Close();
            cout << e.name() << "close done " << "sub process quit now: " << e.pid() << endl;
        }
    }
    void Wait()
    {
        for(auto& e : _channels)
        {
            pid_t pid = e.pid();
            pid_t rid = waitpid(pid, nullptr, 0);
            if(rid == pid)
                cout << "wait sub process " << pid << "success..." << endl;
        }
    }
    void Debug()
    {
        for(auto& e : _channels)
            e.PrintDebug();
    }
    ~ProcessPool()
    {} 
private:
    int _sub_process_num;//进程数量
    vector<Channel> _channels;
};

void CtrlProcessPool(ProcessPool* ptr, int cnt)
{
    while(cnt--)
    {
        //a.选择一个进程和通道
        int channel = ptr->NextChannel();
        //sleep(1);
        //cout << channel.name() << endl;
        //b.选择一个任务
        uint32_t code = NextTask();
        //c.发送任务
        ptr->SendTaskCode(channel,code);
        sleep(1);
    }
}

// ./processpool 5(进程的数量)
int main(int argc, char* argv[])
{
    if(argc != 2)//传入参数不符合
    {
        Usage(argv[0]);
        return UsageError;
    }   
    srand((uint64_t)time(nullptr));
    int sub_process_num = stoi(argv[1]);//获取进程的数量

    if(sub_process_num <= 0) return ArgError;//进程数量不符合

    // 1.创建通信信道和子进程
    ProcessPool* ptr = new ProcessPool(sub_process_num);
    ptr->CreatProcess(worker);
    ptr->Debug();

    // 2.控制子进程
    CtrlProcessPool(ptr,10);
    cout << "task run done" << endl;
    //sleep(100);
    
    // 3.回收子进程
    /*
        a.如何让所有的子进程退出
        b.如何让所有已经退出的子进程把他自己进行资源回收wait/waitpid
    */
    // wait sub process
    ptr->KillAll();
    ptr->Wait();

    delete ptr;
    return 0;
}

为什么子进程的读端的fd都是3呢?

所以此时按照上面的解释,对于子进程的读端都是文件描述符3,那么对于父进程每个管道的写端是从文件描述符为4开始的吗?我们来看看。

但是我们这里存在一个小bug,我们看一下图

此时的bug是第二个子进程会继承父进程的文件描述符表,所以此时就继承了描述符为4的写端,那么第二个子进程描述符为4就同时也指向第一个管道的写端。所以未来创建10个进程,便会有10个进程指向第一个管道的写端,这样会有什么问题吗?如果一个管道的写端比较多的时候,此时父进程要关闭写端的时候,此时写端并没有被关完,此时子进程再在读端读取的时候并没有接收到0的返回值,因为写端并没有被关完。此时怎么解决呢?我们上面的代码逻辑是先关完,然后再读取的

我们可以关一个然后再直接读,改成我们下面的代码逻辑

此时按照我们之前的逻辑,只要我们把写端一关,此时子进程的read返回值n就为0,此时里面就会退出,顺便此时就把子进程进行回收,那么事实真的如此吗?

此时我们发现进程一个都没有退出,并且此时我们的程序也已经卡死了,这是因为第一个管道的写端非常多,此时只关掉了一个,此时读端不认为此时管道没有写端了,所以读端在读的时候一直阻塞,所以此时子进程一直在阻塞没退,所以waitpid也一直等待,导致程序卡死,怎么解决呢?第一个方法就是分开写,第二个方法就是倒着关,最后一个管道的肯定只有一个写端,一个读端,第三中方法就是每次创建子进程的时候,把继承下来的写端都关闭掉

int CreatProcess(work_t work)//回调函数
{
    vector<int> fds;

    for(int i = 0;  i < _sub_process_num; ++i)
    {
        int pipefd[2] = {0};//管道
        int n = pipe(pipefd);
        if(n < 0) return PipeError;//创建管道失败

        pid_t id = fork();//创建进程
        if(id == 0)
        {
            if(!fds.empty())    
            {
                for(auto e: fds)
                {
                    close(fds);//把继承父进程的写端关闭掉
                    cout << "close w fd: " << fd;
                }
            }
                    
                    
            //子进程 - 读
            close(pipefd[1]);
            // 子进程执行任务
            dup2(pipefd[0],0);// 重定向:从标准输入里面读变成从管道里面读取,0位置重定向为管道文件
            work(pipefd[0]);//目前是输出子进程的pid
            exit(0);
        }

        string cname = "channel-" + to_string(i);

        //父进程 - 写
        close(pipefd[0]);

        // 此时wfd传入的是父进程的写端
        _channels.push_back(Channel(pipefd[1],id,cname));

        // 把父进程的wfd保存
        fds.push_back(pipefd[1]);
    }
    return 0;
}

我们来看一下此时的运行结果:

此时也就解决问题了,同时也证明此时只有一个写端,上面的子进程都是被waitpid所等待退出的。

三、命名管道

⭐命名管道解决的让不相关的进程之间可以通信

⭐进程间通信的本质是让不同的进程看到同一份资源

  • 管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
  • 如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
  • 命名管道是一种特殊类型的文件,可以进行读写,但是不用刷新到磁盘

所以进程A和进程B通信的前提就是存在一个这样的特殊文件,那么怎么创建这样的文件呢?

那我们就来创建一个

所以现在我们就可以简单的验证两个毫无关系的进程之间的通信

向fifo文件里面写入hello内容,但是此时运行之后,程序一直阻塞,是因为只进行了写,没有进行读。

当我们用cat读文件的时候,此时阻塞也就取消了,这就是简单的echo和cat两个进程之间通过fifo这个特殊文件进行通信,所以现在我们只要创建这个特殊文件,然后一个进行作为读端,一个进程作为写端,那么这两个进程之间就可以进行通信。那我们怎么用代码创建管道文件呢?

那我们直接来看代码

makefile

.PHONY:all
all:pipe_server pipe_client

pipe_server:PipeServer.cc
	g++ -o $@ $^ -std=c++11
pipe_client:PipeClient.cc
	g++ -o $@ $^ -std=c++11

.PHONY:clean
clean:
	rm -f pipe_server pipe_client

Comm.hpp

#ifndef __COMM_HPP__
#define __COMM_HPP__

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>

using namespace std;

#define Mode 0666

class Fifo
{
public:
    Fifo(const string& path)
        :_path(path)
    {
        umask(0);
        int n = mkfifo(_path.c_str(),Mode);
        if(n == 0)
            cout << "mkfifo success" << endl;
        else
            cout << "mkfifo failed" << endl;
    }
    ~Fifo()
    {}
private:
    string _path;//文件路径 + 文件名
};

#endif

PipeServer.cc

#include "Comm.hpp"
#include <unistd.h>

int main()
{
    Fifo fifo("./fifo");
    sleep(1);
    return 0;
}

然后我们来运行一下程序

此时就存在一个权限为666的管道文件,随后我们再运行一次

什么原因呢?我们来看一下mkfifo的返回值

此时我们就可以通过错误码来看看是什么原因

我们直接看结果:

所以此时我们必须要删除这个管道文件才能创建新的管道文件,但是这样有点麻烦,我们可以在析构函数处删除文件。

直接上手代码

此时就程序执行完之后就把管道文件删除了。

随后我们来用命名管道实现server&client通信

Comm.hpp

#ifndef __COMM_HPP__
#define __COMM_HPP__

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>

using namespace std;

#define Mode 0666
#define Path "fifo"

class Fifo
{
public:
    Fifo(const string& path)
        :_path(path)
    {
        umask(0);
        int n = mkfifo(_path.c_str(),Mode);
        if(n == 0)
            cout << "mkfifo success" << endl;
        else
            cout << "mkfifo failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
    }
    ~Fifo()
    {
        int n = unlink(_path.c_str());
        if(n == 0) 
            cout << "remove fifo file " << _path << " success" << endl;
        else
            cout << "remove failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
    }
private:
    string _path;//文件路径 + 文件名
};

#endif

PipeServer.cc

#include "Comm.hpp"
#include <unistd.h>

int main()
{
    Fifo fifo(Path);

    int rfd = open(Path, O_RDONLY);
    if(rfd < 0)// 打开失败
    {
        cout << "open failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
    }
    char buffer[1024];
    while(true)
    {
        ssize_t n = read(rfd, buffer, sizeof(buffer) - 1);
        if(n > 0)//读取成功
        {
            buffer[n] = 0;
            cout << "client say: " << buffer << endl;
        }
        else if(n == 0)
        {
            cout << "client quit, me too!!" << endl;
            break; 
        }
        else
        {
           cout << "raed failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
        }
    }
    return 0;
}

PipeClient.cc

#include "Comm.hpp"

int main()
{
    int wfd = open(Path, O_WRONLY);//写方式打开
    if(wfd < 0)// 打开失败
    {
        cout << "open failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
    }
    string inbuffer;
    while(true)
    {
        cout << "Please Enter your Message# ";
        getline(cin,inbuffer);
        // 发生信息,调用write接口
        if(inbuffer == "quit") break;
        ssize_t n = write(wfd, inbuffer.c_str(), inbuffer.size());
        if(n < 0)
        {
            cout << "write failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl; 
            break;
        }
           
    }
    close(wfd);//关掉管道的写端
    return 0;
}

随后我们来看结果:

细节问题:

四、system V共享内存

我们上面所讲到的命名管道和匿名管道都是基于文件的,也就是说很多的对应内核数据结构和对于的读写方法是在操作系统源码上是不需要过多的修改的,直接把文件代码拿过来复用,所以对于管道的进程间通信不需要做过多的操作就可以使两个进程通信起来。但是随着本地通信技术的发展,为了更丰富的通信,产生了通信模板,并对其标准化化,基于本地通信的方式就叫做system V

system V --- 系统V

  1. 共享内存
  2. 消息队列
  3. 信号量

进程间通信的本质:让不同的进程看到同一份资源

共享内存,进行通信的两个进程可以没有任何关系

通过页表进行映射同一块内存,不同的进程就都能访问到这个内存,使不同进程看到同一份资源,这就是共享内存的通信。

共享内存在内核中同时可以存在很多个,来满足众多进程的通信,那么此时操作系统就管理这些共享内存,必定存在struct shmXXX的结构体包含共享内存的属性(key_t值->共享内存的为唯一值)

系统会存在很多共享内存,那么怎么保证两个或者多个不同的进程看到的是同一个共享内存呢?我们就需要给共享内存提供唯一的标识!

那我们上面的映射关系是谁做的呢?我们可以知道,此时的映射需要修改进程的页表,同时还要修改进程的地址空间,所以无论是地址空间(mm_struct)还是页表,它都是内核数据结构,此时只有操作系统能够操作,自然也就是操作系统为我们做的映射关系,可是我们又访问不了操作系统,那么此时就要使用系统调用接口,首先就需要看的是创建一个共享内存的系统调用接口

功能:用来创建共享内存
原型
 int shmget(key_t key, size_t size, int shmflg);
参数
 key:这个共享内存段名字
 size:共享内存大小
 shmflg:由九个权限标志构成,它们的用法和创建文件时使用的mode模式标志是一样的
返回值:成功返回一个非负整数,即该共享内存段的标识码;失败返回-1

这个key值怎么形成的?ftok。key值的意义是什么?key值是多少不重要,只要能够标识唯一性即可。

为什么要让用户传入呢?让OS传,做不到访问同一份共享内存

我们可以设想一下,未来进程A要创建一个共享内存,创建好了之后需要这个key来标识共享内存唯一性,问题是进程B想看到和A进程一样的共享内存,如果这个key由内核设定,那要怎么保证进程A和进程B看到的是一样的key呢?有人说进程A把共享内存创建好了之后,把key值传递给进程B不就行了,我们要想一下,此时我们都能直接把key由进程A传给进程B,我们为什么需要共享内存来传递,怎么把key由进程A传给进程B,此时通信信道都没建立起来,所以此时的key需要进程A和进程B约定式的把key设为一样,进程A和进程B不需要任何其他沟通,就可以做到访问同一份共享内存了。

我们来看看此时两个进程形成的key是否一样,直接看代码运行结果

但是这个数字有点大,我们可以转为十六进制表示,并且系统也是使用的十六进制来表示的。

直接看运行结果:

这样就形成了同一个key。

使用共享内存通信,一定是一个进程创建新的shm,另一个直接获取共享内存即可。

不多说直接看运行结果:

对于./shm_client无论我们运行多少次都能输出,因为它仅仅是获取共享内存,但是对于./shm_server我们只能运行一次,因为第一次运行就已经创建了共享内存,后面再运行就会出错!

我们来看一个细节问题

共享内存,如果进程结束,我们没有主动释放它则共享内存一直存在。共享内存的生命周期,随内核,除非重启系统,否则一直存在
⭐文件操作,一个进程打开一个文件, 进程退出的时候,这个被打开的文件就会被系统自动释放掉。文件的生命周期随进程

那么我们在程序退出的时候,怎么判断共享内存还是否存在呢?ipcs -m是用来查看系统中指定用户创建的共享内存。

那又要怎么删除一个共享内存呢?使用ipcrm -m shmid

key和shmid有什么区别呢?为什么删除我们的共享内存不能使用key,而必须要使用shmid呢?

key在内核角度,它是区分shm的唯一性!类似于struct file*。

⭐无论是指令级,还是代码级,最后对共享内存进行控制,用的都是shmid,用户层的标识符类似于文件描述符fd。

perm通常指的是共享内存的权限,perm用于指定哪些进程或用户可以对共享内存进行读取(read)、写入(write)或执行(execute)操作。

上面我们学会了指令删除一个共享内存,那代码删除呢?

功能:用于控制共享内存
原型
 int shmctl(int shmid, int cmd, struct shmid_ds *buf);
参数
 shmid:由shmget返回的共享内存标识码
 cmd:将要采取的动作(有三个可取值)
 buf:指向一个保存着共享内存的模式状态和访问权限的数据结构
返回值:成功返回0;失败返回-1

直接看运行结果:

同时我们还可以通过上面的shmctl接口查看共享内存的相关属性。

不多说直接看运行结果:

接下来我们就要让共享内存挂接到当前的进程地址空姐上。

功能:将共享内存段连接到进程地址空间
原型
 void *shmat(int shmid, const void *shmaddr, int shmflg);
参数
 shmid: 共享内存标识
 shmaddr:指定连接的地址
 shmflg:它的两个可能取值是SHM_RND和SHM_RDONLY
返回值:成功返回一个指针,指向共享内存第一个节;失败返回-1

说明:

shmaddr为NULL,核心自动选择一个地址
shmaddr不为NULL且shmflg无SHM_RND标记,则以shmaddr为连接地址。
shmaddr不为NULL且shmflg设置了SHM_RND标记,则连接的地址会自动向下调整为SHMLBA的整数倍。
公式:shmaddr - (shmaddr % SHMLBA)
shmflg=SHM_RDONLY,表示连接操作用来只读共享内存

挂载成功意味着我们可以使用addr返回值,直接访问共享内存

这里有一个小技巧:这里的返回值和malloc相同,malloc利用void*的返回值使用开辟的空间,我们也是用void*直接使用共享内存的。

直接来写我们的代码

看看运行的结果是什么呢?

我们把这个addr地址以十六进制的形式输出出来

我们上面挂接到进程地址空间完了之后,就直接将共享内存删除了,其实这样做是是不好的,我们应该先将进程地址空间与共享内存进行取消挂接,然后再删除共享内存,如果不想删,我们此时还可以继续用这个共享内存。

功能:将共享内存段与当前进程脱离
原型
 int shmdt(const void *shmaddr);
参数
 shmaddr: 由shmat所返回的指针
返回值:成功返回0;失败返回-1
注意:将共享内存段与当前进程脱离不等于删除共享内存段

看看运行结果:

此时我们终于完成了一件事,让不同的进程,看到同一份资源,现在开始通信!!!

如果我们没有运行客户端,那么此时运行服务端,我们会发生什么现象呢?

默认情况下,shm读取方(shm_server),根本就不管写入方(shm_client)有没有写入

默认情况下,管道写端如果没有写入,那么此时读端就会一直阻塞,此时管道会协同两个进程的执行

随后我们来看一下正常情况的结果

共享内存是所有进程间通信速度最快的!(优点)

        共享内存之所以被认为是所有进程间通信速度最快的方式,主要是因为它通过直接映射物理内存到进程的虚拟地址空间,从而允许多个进程直接访问和操作同一块内存区域。

        具体来说,共享内存的原理是在物理内存上申请一块空间,然后多个进程可以将这块空间映射到自己的虚拟地址空间中。这样,所有进程都可以直接访问和操作这块共享内存,就像它们各自拥有这块内存一样。当一个进程向共享内存写入数据时,这些更改会立即对所有能够访问同一块共享内存的其他进程可见。

        与其他进程间通信方式相比,共享内存避免了数据在用户态和内核态之间的多次拷贝过程。例如,管道通信方式都需要将数据从用户空间复制到内核空间,然后再从内核空间复制回用户空间,使用的是read和write系统调用接口(本质是拷贝函数),这种拷贝过程会消耗大量的时间和计算资源。而共享内存直接通过虚拟地址访问物理内存,无需进行任何数据拷贝,因此具有更高的通信速度。

共享内存不提供进程间协同的任何机制!(缺点)序用户维持进程间协调工作 - 信号量/管道

        由于多个进程可以同时访问和操作同一块内存区域,如果没有适当的同步机制(如锁、信号量或原子操作),就容易出现数据不一致和竞争条件的问题。例如,当一个进程正在写入数据时,另一个进程可能同时读取这些数据,从而得到不完整或错误的信息。这种情况在多个进程需要协同完成某项任务时尤为突出。

        举例来说,假设有两个进程A和B共享同一块内存区域,用于传递消息。进程A负责写入消息,进程B负责读取消息。如果进程A还没有完全写完消息,而进程B就开始读取,那么B读到的将是一个不完整的消息,从而导致数据不一致。更糟糕的是,如果进程A在写入过程中被打断(例如,由于优先级更高的任务抢占CPU),那么B可能读到一个中间状态的消息,这可能会导致程序出现逻辑错误或崩溃。

直接上代码:

makefile

.PHONY:all
all:shm_server shm_client

shm_server:ShmServer.cc
	g++ -o $@ $^ -std=c++11
shm_client:ShmClient.cc
	g++ -o $@ $^ -std=c++11

.PHONY:clean
clean:
	rm -f shm_server shm_client

Comm.hpp

#pragma once

#include <iostream>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <cerrno>
#include <cstring>
#include <cstdlib>
#include <string>
#include <unistd.h>

using namespace std;

const char* pathname = "/home/xyc/code";
const int proj_id = 0x66;
// 在内核中,共享内存的大小是以4KB为基本单位的
// 你只能用你申请的大小(4096),建议申请大小为n*4KB
const int defaultsize = 4096;//单位是字节

key_t GetShmKey() //创建key值
{
    key_t key = ftok(pathname, proj_id);
    if(key < 0)
    {
        cout << "ftok error, errno: " << errno << ", errno string: " << strerror(errno) << endl;
        exit(1);
    }
    return key;
}

string ToHex(key_t key) //转为十六进制
{
    char buffer[1024];
    snprintf(buffer, sizeof(buffer), "0x%x", key);
    return buffer;
}

int CreatShmOrDie(key_t key, size_t size, int flag) //创建共享内存
{
    int shmid = shmget(key, size, flag);
    if(shmid < 0)
    {
        cout << "shmget error, errno: " << errno << ", errno string: " << strerror(errno) << endl;
        exit(2);
    }
    return shmid;
}

int CraetShm(key_t key, size_t size) //创建共享内存
{
    //IPC_CREAT:不存在就创建,存在就获取
    //IPC_EXCL:没有意义
    //IPC_CREAT | IPC_EXCL:不存在就创建,存在就出错返回
    return CreatShmOrDie(key, size, IPC_CREAT | IPC_EXCL | 0666);
}
int GetShm(key_t key, size_t size) // 获取共享内存
{
    return CreatShmOrDie(key, size, IPC_CREAT);
}

void DeleteShm(int shmid) //删除共享内存
{
    int n = shmctl(shmid, IPC_RMID, nullptr);
    if(n < 0)
    {
        cout << "shmctl error, errno: " << errno << ", errno string: " << strerror(errno) << endl;
    }
    else
    {
        cout << "shmid delete shm success, shmid: " << shmid << endl;
    }
}

void ShmDebug(int shmid) // 获取共享内存的属性
{
    struct shmid_ds shmds;//用户级
    int n = shmctl(shmid, IPC_STAT, &shmds);//内核级
    if(n < 0)
    {
        cout << "shmctl error, errno: " << errno << ", errno string: " << strerror(errno) << endl;
        return;
    }
    cout << "shmds.shm_segsz: " << shmds.shm_segsz << endl;
    cout << "shmds.shm_nattch: " << shmds.shm_nattch << endl;
    cout << "shmds.shm_ctime: " << shmds.shm_ctime << endl;
    cout << "shmds.shm_perm.__key: " << ToHex(shmds.shm_perm.__key) << endl;
}

void* ShmAttach(int shmid)
{
    void* addr = shmat(shmid, nullptr, 0);
    if((long long)addr == -1) // 避免精度损失报错
    {
        cout << "shmat error, errno: " << errno << ", errno string: " << strerror(errno) << endl;
        return nullptr;
    }
    return addr;
}

void ShmDetach(void* addr)
{
    int n = shmdt(addr);
    if(n < 0)
    {
        cout << "shmdt error, errno: " << errno << ", errno string: " << strerror(errno) << endl;
        return;
    }
}

Fifo.hpp

#ifndef __COMM_HPP__
#define __COMM_HPP__

#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <assert.h>

using namespace std;

#define Mode 0666
#define Path "fifo"

class Fifo
{
public:
    Fifo(const string& path)
        :_path(path)
    {
        umask(0);
        int n = mkfifo(_path.c_str(),Mode);
        if(n == 0)
            cout << "mkfifo success" << endl;
        else
            cout << "mkfifo failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
    }
    ~Fifo()
    {
        int n = unlink(_path.c_str());
        if(n == 0)
            cout << "remove fifo file " << _path << " success" << endl;
        else
            cout << "remove failed, errno: " << errno << ", errstring: " << strerror(errno) <<  endl;
    }
private:
    string _path;//文件路径 + 文件名
};


class Sync
{
public:
    Sync()
        :_rfd(-1)
        ,_wfd(-1)
    {}
    void OpenRead()
    {
        _rfd = open(Path, O_RDONLY);
        if(_rfd < 0)
            exit(1);
    }
    void OpenWrite()
    {
        _wfd = open(Path, O_WRONLY);
        if(_wfd < 0)
            exit(1);
    }
    bool Wait()
    {
        bool ret = true;
        uint32_t c = 0;
        ssize_t n = read(_rfd, &c, sizeof(uint32_t));
        if(n == sizeof(uint32_t))
        {
            cout << "server wakeup, begin read shm..." << endl;
        }
        else if(n == 0)
        {
            ret = false;
            return ret;
        }
        else
        {
            ret = false;
            return ret;
        }
        return ret;

        
    }
    void WakeUp()
    {
        uint32_t c = 0;
        ssize_t n = write(_wfd,&c,sizeof(uint32_t));
        assert(n == sizeof(uint32_t));
        cout << "wakeup server..." << endl;
    }
    ~Sync(){}
private:
    int _rfd;
    int _wfd;
};
#endif

ShmServer.cc

#include "Comm.hpp"
#include "Fifo.hpp"


int main()
{
    // 1.获取key
    key_t key = GetShmKey();
    cout << "key: "<< ToHex(key) << endl;
    //sleep(2);

    // 2.创建共享内存
    int shmid = CraetShm(key, defaultsize);
    cout << "shmid: "<< shmid << endl;
    //sleep(2);

    // 3.将共享内存和进程进程挂接
    char* addr = (char*)ShmAttach(shmid);
    cout << "Attch shm success, addr: " << ToHex((uint64_t)addr) << endl;
    //sleep(2);

    // !!!先引入管道
    Fifo fifo(Path);
    Sync syn;
    syn.OpenRead();

    // 服务端
    // 此时就可以进行通信了
    for(;;)
    {   
        if(!syn.Wait()) break;//先等待
        cout << "shm content: " << addr << endl;
    }
        
    // 4.删除映射关系
    ShmDetach(addr);
    cout << "Detach shm success, addr: " << ToHex((uint64_t)addr) << endl;
    sleep(10);

    // 5.删除共享内存
    DeleteShm(shmid);
    return 0;
}

ShmClient.cc

#include "Comm.hpp"
#include "Fifo.hpp"

int main()
{
    // 1.获取key
    key_t key = GetShmKey();
    cout << "key: "<< ToHex(GetShmKey()) << endl;
    //sleep(2);

    // 2.获取共享内存
    int shmid = GetShm(key, defaultsize);
    cout << "shmid: "<< shmid << endl;
    //sleep(2);

    // 3.将共享内存和进程进程挂接
    char* addr = (char*)ShmAttach(shmid);
    cout << "Attch shm success, addr: " << ToHex((uint64_t)addr) << endl;
    //sleep(5);

    Sync syn;
    syn.OpenWrite();

    // ShmClient - 客户端
    // 此时就可以进行通信了
    memset(addr, 0, defaultsize);//清空共享内存
    // pipe,fifo,->read,write->系统调用,shm->没有使用任何系统调用
    for(char c = 'A'; c < 'Z'; c++)
    {
        addr[c-'A'] = c;//向共享内存写入消息
        sleep(1);
        syn.WakeUp();
    }

    // 4.删除映射关系
    ShmDetach(addr);
    cout << "Detach shm success, addr: " << ToHex((uint64_t)addr) << endl;
    sleep(5);


    return 0;
}

共享内存区是最快的IPC形式。一旦这样的内存映射到共享它的进程的地址空间,这些进程间数据传递不再涉及到内核,换句话说是进程不再通过执行进入内核的系统调用来传递彼此的数据。

五、system V消息队列

  • 消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法
  • 每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值
  • 特性方面:IPC资源必须删除,否则不会自动清除,除非重启,所以system V IPC资源的生命周期随内核

如何向消息队列发送数据块和读取数据块呢?

六、system V信号量

1.信号量的原理(多线程做准备)

1.对于共享资源进行保护,是一个多执行流场景下,一个比较常见和重要的话题
2.互斥&&同步
        同步:访问资源在安全的前提下,具有一定的顺序性,从而避免竞争条件和数据损坏。
        互斥:在访问一部分共享资源的时候,任何时刻只有一个线程访问, 就叫做互斥
3.被保护起来的,任何时刻只允许一个执行访问的公共资源 --- 临界资源
4.访问临界资源的代玛,我们叫做临界区 ---  非临界区--- 所谓的保护公共资源(临界资源)
        保护公共资源的本质:是程序员保护临界区
5.原子性:操作对象的时候,只有两种状态,要么还没开始,要么已经结束

对于一个多进程的场景,直接使用int能不能实现信号量的效果呢?

        在多进程场景中,直接使用int类型的变量来实现信号量的效果是不可靠的。int类型的变量本身并不能直接作为进程间的信号量,因为进程是独立的实体,它们有自己的地址空间。

每个进程在操作系统中都有独立的地址空间,这意味着一个进程无法直接访问另一个进程的变量。即使两个进程都有一个名为countint变量,这两个变量在物理内存中是分开的,一个进程对count的修改不会反映到另一个进程的count上。

        如果一个进程修改了某个整型变量的值,其他进程默认是无法看到这个修改的,无法在进程中共享,如果我们非要这样做,我们就必须使进程先看到同一份资源,也就是计数器资源!!!这个计数器资源就需要操作系统来维护,它使不属于这些进程的,所以信号量本质也属于进程间通信!!!

        同时对于count++和count--它不是原子的,这意味着在执行这些操作时,可能会被其他进程打断,从而导致数据不一致或其他同步问题。

 ⭐信号量就是操作系统提供的计数器,这个计数器可以被多个进程同时看到,而且是以原子性的方式对这个计数器进行PV操作,代表的使申请和释放资源。

2.信号量函数接口

现在我们再来学习一下信号量函数接口

那信号量PV操作接口呢?

共享内存、信号量、消息队列--- 共性--- OS特意设计的。system V 进程间通信的 --- OS注定要对IPC资源Inter- Process Communication,先描述再管理,怎么样的呢?

System V通信标准主要关注于本地进程通信,而无法实现跨平台进程通信。在Linux系统中,所有的内容,包括设备、文件、进程等,都被当作文件来处理。这种设计使得Linux具有高度的灵活性和可扩展性,同时也要求所有的通信和交互都需要遵循“一切皆文件”的原则。然而,System V的设计并不完全符合这一思想,它作为一个单独设计的内核模块,与Linux的文件系统模型存在较大的差异,这也就是System V版本的通信很少使用的原因。

相关推荐

  1. Python进程通信

    2024-04-27 22:52:05       67 阅读
  2. 20240204进程通信

    2024-04-27 22:52:05       55 阅读
  3. 进程通信方式

    2024-04-27 22:52:05       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-27 22:52:05       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-27 22:52:05       106 阅读
  3. 在Django里面运行非项目文件

    2024-04-27 22:52:05       87 阅读
  4. Python语言-面向对象

    2024-04-27 22:52:05       96 阅读

热门阅读

  1. centOS7.9| 无root安装 openssl 1.1.1

    2024-04-27 22:52:05       30 阅读
  2. Python中的进制转换函数详解

    2024-04-27 22:52:05       37 阅读
  3. ReactNative0.74 版本发布重大更新

    2024-04-27 22:52:05       172 阅读
  4. Chapter 1-16. Introduction to Congestion in Storage Networks

    2024-04-27 22:52:05       36 阅读
  5. Android 监听耳机按键方式

    2024-04-27 22:52:05       42 阅读
  6. vue中组件 和 插件的区别

    2024-04-27 22:52:05       66 阅读
  7. 【ARMv9 DSU-120 系列 5 -- CHI Interface】

    2024-04-27 22:52:05       40 阅读