项目:仿RabbitMQ实现的消息队列组件

写在前面

开源仓库和项目上线

本项目已开源到下面链接下的仓库当中

仿RabbitMQ实现消息队列

其他文档说明

针对于日志的信息,我采用了之前写的一份利用可变参数实现日志的代码,具体链接如下

C++:可变参数实现日志系统

项目:消息队列的前置知识

需求分析

对于这个项目来说,首先需要明确几个概念,项目中要包含生产者,消费者,中间人,发布,订阅

先说生产者和消费者的模型,这个并不陌生,在操作系统中有过对于这些内容的讲解:

在这里插入图片描述
如上所示是一个基本的生产消费者模型的理论,那么在这个理论中比较重要的就是中间的这个Broker Server,这个部分的核心功能就是进行消息的存储和转发

在中间件服务器Broker当中,存在下面的概念

虚拟机:类似于是一个MySQL当中的database的概念,是一个逻辑上的集合,一个BrokerServer当中会存在多个VirtualHost

交换机:这是生产者把消息先发送到Broker当中的Exchange上,再依据不同的规则,把消息转发给不同的Queue

队列:真正用来存储消息的部分,每个消费者自己进行决定从哪个Queue上进行消息的读取

绑定:这是一个Exchange和Queue之间的关联关系,Exchange和Queue可以理解为是多对多的概念,而用一个关联表就可以把信息存储起来

所以要实现的内容,其实就有下面的内容:

  1. Broker服务器,也就是所谓的消息队列服务器
  2. 消息发布客户端:生产者,把消息放到服务器上
  3. 消息订阅客户端:消费者,从服务器上订阅消息

在AMQP协议中细化了对应的规则,以虚拟机为单元,来进行交换机,队列,绑定的整体操作,下面基于这些理论进行一些核心的API功能和操作

BrokerServer

对于Broker来说,主要有下面的这些功能操作

  1. 创建交换机
  2. 销毁交换机
  3. 创建队列
  4. 销毁队列
  5. 创建绑定
  6. 解除绑定
  7. 发布消息
  8. 订阅消息
  9. 确认消息
  10. 取消订阅

交换机类型

对于RabbitMQ来说,主要支持下面的四种交换机类型

  1. Direct
  2. Fanout
  3. Topic
  4. Header

下面针对于前三种比较常见的交换机进行一个简单的概述:

Direct:生产者发送消息时,直接指定被该交换机绑定的队列名,这样就可以直接进行交换了

Fanout:生产者发送的消息会被复制到该交换机的所有队列中,就有点类似于是一个广播的效果

Topic:绑定队列到交换机上,指定一个字符串为bindingKey,发送消息指定一个字符串是routingKey,这样只有这两个Key满足一定条件的时候再进行对应的消息的投递

持久化

项目一定是需要有持久化的需求的

消息应答

对于被消费的消息,是要进行一些应答的,具体的策略有两种:

  1. 自动应答:消费者只要消费了消息,就算应答完成了,Broker直接删除这个消息
  2. 手动应答:消费者手动调用应答接口,Broker收到应答请求之后再进行删除这个消息

对于手动应答来说,比较好的一个特点是,保证了消息确实被消费者处理成功了,在一些对于数据可靠性要求比较高的场景下,是一个比较常见的特点

模块划分

对于这个项目来说,主要有下面的三个模块

  1. 服务端
  2. 发布客户端
  3. 订阅客户端

服务端模块

  1. 数据管理模块:对于交换机,队列,绑定,消息数据的管理
  2. 虚拟机数据管理模块:虚拟机本质上是把上面的这些数据进行一个封装和合并后的管理,虚拟机本质上就可以看成是交换机,队列,绑定,消息的整体逻辑单元,对于虚拟机的数据管理本质上就是把上述的这些模块进行了一个合并管理
  3. 交换路由模块:消息的发布,就是把消息发布到交换机上,然后交换机把消息再放到队列中,那么如何进行设置对应的方式?其实就需要用到交换机类型,比如有直接,广播或者主题交换等,而交换路由模块就是专门进行匹配的过程的
  4. 消费者管理模块:消费者说的是订阅了一个队列消息的客户端,一旦这个队列中有了消息,就会推送给客户端,客户端是一个被动获取的效果,在核心API中,存在有订阅消息的功能,这个订阅其实是订阅的某个队列当中的全部内容,而不是订阅某个特定的消息,只要队列中有消息就绪了,就会把消息给客户端
  5. 信道管理模块:一个连接可能会有对应的多个通信通道,那么一旦有客户端要关闭通信,就要把自己的通信通道管理,因此就要把连接的通信通道进行管理
  6. 连接管理模块:就是对于一个网络通信对应的连接

客户端模块

  1. 消费者管理模块:一个订阅客户端,而当订阅一个队列消息的时候,本质上就相当于创建了一个消费者
  2. 信道管理模块:客户端的信道和服务端的信道是一一对应的,服务端信道提供的服务,客户端都有,也就是说,相当于服务端给客户端提供服务,客户端给用户提供服务
  3. 连接管理模块:对于用户来说,所有的服务都是通过信道来完成的,信道在用户的角度就是一个通信通道,所有的请求都是借助信道来完成的
  4. 基于上述的这三个模块,就可以实现一个订阅客户端和发布客户端这两个内容,其中订阅客户端就是要订阅一个队列的消息,收到推送过来的消息进行处理,而发布客户端就是向一个交换机来发布消息

交换机数据管理模块

  1. 要管理的数据:描述了一个交换机中应该有什么数据,而在这内部当中还会存在有交换机的名称,交换机的类型,是否持久化,是否要自动删除
  2. 对交换机的管理操作:声明(创建)交换机,删除交换机,获取指定名称的交换机,获取当前交换机的数量等等

队列数据管理模块

  1. 要管理的数据:在这当中描述的是队列的名称,持久化存储标志,是否独占标志,自动删除表示等
  2. 提供对应的操作:创建队列,删除队列,获取队列信息,获取队列数量,获取所有队列名称等

绑定数据管理模块

这个模块主要是进行描述的是,哪个队列和哪个交换机绑定在了一起

  1. 要管理的数据:对于这个模块来说,要管理的数据主要有交换机的名称,队列名称,以及binding_key,这个主要是绑定密钥,用来进行描述交换机和队列的一种匹配规则
  2. 提供对应的操作:添加绑定,解除绑定,获取交换机相关的所有绑定信息,获取队列相关的所有绑定信息,获取绑定信息数量

消息数据管理模块

  1. 要管理的数据

对于这个模块来说,先看一下消息数据说的是什么:

消息信息中主要包含内容+属性:

对于属性来说包含的有,消息的ID,以及持久化表示,routing_key,这个key表示的是当前消息要发布的队列信息

  1. 管理的方式

对于管理方式当中,必定是要以队列为单元进行管理的,因为消息的所有操作都是在队列当中进行实现的,消息是要存储在队列当中的

对于管理数据来说,首先会存在一个消息链表,这当中存放的是保存所有待推送的消息,以及待确认消息hash,模仿TCP的方式,保证消息可靠传输,持久化消息hash,以及持久化的有效消息数量,持久化的总的消息数量等

  1. 提供对应的操作

对于消息管理的操作来说,要提供的方法有,向队列新增消息,获取队首消息,对消息的确认,恢复队列历史消息,垃圾回收,删除队列相关消息文件等

队列信息管理模块

对于队列的管理来说,主要包含有:

初始化队列消息,移除队列消息,向队列新增消息,对队列消息进行确认,恢复队列历史消息等

虚拟机数据管理模块

对于虚拟机来说,它内部包含了交换机,队列,绑定,消息,这些数据管理的集合

在其内部管理的数据:

  1. 交换机数据管理句柄
  2. 队列数据管理句柄
  3. 绑定信息数据管理句柄
  4. 消息数据管理句柄

要管理的操作包括:

  1. 删除交换机
  2. 删除队列
  3. 队列的绑定
  4. 队列的解除绑定
  5. 获取队列的信息
  6. 对指定队列信息的确认
  7. 获取交换机相关的所有绑定信息

路由匹配模块

对于这个模块来说,决定了一条消息是否可以发布到指定的队列,具体的,在每个队列和交换机的绑定当中会存在一个binding_key,这个是队列发布的一种匹配规则,在每条要发布的消息中,都会有一个routing_key,这是消息的发布规则

而对于交换机来说存在三种类型,直接广播和主题,这三种类型就会借助routing_key和binding_key来进行合适的划分

路由器模块的本质,其实没有管理的数据,只是提供一些匹配的操作:

  1. 判断routing_key和binding_key能否匹配成功
  2. 判断routing_key是否符合规定
  3. 判断binding_key是否符合规定

消费者管理模块

客户端有两种,分别是用来发布消息和订阅消息,只有订阅了指定队列消息的客户端才是一个消费者,而消费者数据存在的意义是,当指定队列有了消息之后,就需要把消息推送给这个消费者客户端

对于消费者的信息来说,其中需要包含内容有:

  1. 消费者的标识
  2. 定语额消息名称
  3. 自动确认标志
  4. 消费者处理回调函数指针

信道管理模块

对于信道管理Channel来说,这是一个在网络通信中的概念,表示的就是通信通道,当进行网络通信的时候,必然是借助一个网络通信连接来完成的,为了更加方便的进行资源的利用,因此要对于通信连接进行了一个更加进一步的细化,细化出了通信通道,对于用户来说,一个通信通道就是网络通信的载体,而一个真正的通信连接,是可以创建出多个通信通道

每一个信道和信道之间,在使用者的角度来看是相互独立的存在,所有的网络通信服务都是由信道提供的

信道提供的服务操作有:

  1. 删除交换机
  2. 删除队列
  3. 绑定队列和交换机
  4. 解绑队列和交换机
  5. 发布消息/订阅队列消息/取消队列订阅/队列消息确认

信道要管理的数据主要包括有,信道的ID,信道所关联的虚拟机句柄,信道关联的消费者句柄,工作线程池句柄

逻辑示意图

下面基于上述的这些理论,画出下面的逻辑示意图

示意图1

在这里插入图片描述
首先,对于交换机,它的功能主要是进行消息的转发,当有消息到来之后,交换机的作用是把消息转发到对应的队列当中,而转发的依据就需要用到路由匹配模块,路由匹配模块会根据交换机的类型和对应的binding信息,来匹配到对应的队列进行消息的转发,而整个交换机到binding模块再到路由匹配和队列这一整个模块,就被叫做是虚拟机

因此,因为有了虚拟机的存在,所以对于一个要发布的消息来说,只需要把消息传递给虚拟机就可以了,具体的分发策略是由虚拟机来完成的,做到了一个解耦合的效果,下面再来看数据库

因为有持久化的原因,所以虚拟机必定是需要伴随着数据库的,数据库当中就会有消息的持久化和数据的持久化,这个模块就是项目大体的核心模块

示意图2

下面演示的是第二个示意图:

在这里插入图片描述
如上所示的是对于BrokerServer模块,这是一个服务端的示意图,对于服务端来说内部会有一个连接管理器,而对于连接管理器来说,它内部会存在一个Channel信道,这个信道内部会有各种各样的方法,例如有创建删除交换机,创建删除队列,绑定解绑队列和交换机,发布消息,订阅队列消息,取消订阅消息,确认消息等等各种各样的接口,这些接口足以支撑整个服务器

示意图3

在这里插入图片描述
如上所示是对于客户端的示意图,客户端分为发布客户端和订阅客户端,而这两个客户端又公用同样的一组接口,这组接口中同样存在有Connection模块进行连接的管理,Connection内部又包含有各种各样的处理方法,而这些方法实际上都会和BrokerServer进行一个连接,这样就把整个服务端,客户端和主体数据库连接在一起了:

在这里插入图片描述
下面,就基于上述的这些理论,进行代码的编写

工具类编写

这个模块没有什么好说的,就是一些工具的编写,方便进行使用

sqlite3

class SqliteHelper 
{
public:
    typedef int(*SqliteCallback)(void*,int,char**,char**);
    SqliteHelper(const string &dbfile) : _dbfile(dbfile), _handler(nullptr){}
    bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX);
    bool exec(const string &sql, SqliteCallback cb, void *arg);
    void close();
private:
    string _dbfile;
    sqlite3 *_handler;
};

字符串分割

class StrHelper
{
public:
    static size_t split(const string& str, const string& sep, vector<string>& result);
};

生成随机数

class UUIDHelper 
{
public:
    static string uuid();
};

文件常用操作

class FileHelper 
{
public:
    FileHelper(const string &filename):_filename(filename){}
    bool exists();
    size_t size();
    bool read(char *body, size_t offset, size_t len);
    bool read(string &body);
    bool write(const char *body, size_t offset, size_t len);
    bool write(const string &body);
    bool rename(const string &nname);
    static string parentDirectory(const string &filename);
    static bool createFile(const string &filename);
    static bool removeFile(const string &filename);
    static bool createDirectory(const string &path);
    static bool removeDirectory(const string &path);
private:
    string _filename;
};

消息和交换机类型定义

在项目功能模块的编写前,要先把消息类型定义出来

消息本身要素

  1. 消息属性:消息ID,消息投递方式,消息的routing_key
  2. 消息有效载荷内容

消息额外存储所需要素

  1. 消息的存储位置
  2. 消息的长度
  3. 消息是否有效

下面定义交换机的属性

交换机类型

  1. direct
  2. fanout
  3. topic

消息投递模式

  1. undurable
  2. durable

声明下面的类型,之后基于这个类型生成代码

protoc --cpp_out=. message.proto
syntax = "proto3";
package MessageQueue;

enum ExchangeType 
{
    UNKNOWTYPE = 0;
    DIRECT = 1;
    FANOUT = 2;
    TOPIC = 3;
};

enum DeliveryMode 
{
    UNKNOWMODE = 0;
    UNDURABLE = 1;
    DURABLE = 2;
};

message BasicProperties
{
    string id = 1;
    DeliveryMode delivery_mode = 2;
    string routing_key = 3;
};

message Message 
{
    message Payload 
    {
        BasicProperties properties = 1;
        string body = 2;
        string valid = 3;
    };
    Payload payload = 1;
    uint32 offset = 2;
    uint32 length = 3;
};

在这里插入图片描述

交换机数据管理

下面就基于这两个内容,实现对应的交换机部分的实现

// 1. 定义交换机类
struct Exchange 
{
    Exchange() {}
    using ptr = shared_ptr<Exchange>;
    Exchange(const string& ename, 
        MessageQueue::ExchangeType etype, 
        bool edurable, 
        bool eauto_delete, 
        unordered_map<string, string> eargs) :
    name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs)
    {}
    // args存储键值对,在进行数据库存储的时候,组织字符串格式进行存储
    // 内部解析这种key=val&key=val,把数据存储到数据库中
    void SetArgs(const string& str_args);
    // 从args中把数据存储到字符串中
    string GetArgs();
    // 名称,类型,持久化,自动删除,其他参数
    string name;
    MessageQueue::ExchangeType type;
    bool durable;
    bool auto_delete;
    unordered_map<string, string> args;
};

// 2. 定义交换机持久化管理类--数据存储在sqlite数据库中
class ExchangeMapper
{
    using ExchangeMap = unordered_map<string, Exchange::ptr>;
public:
    ExchangeMapper(const string& db_file) : _sql_helper(db_file);
    // 创建数据表
    void CreateTable();
    // 移除数据表
    void RemoveTable();
    // 插入数据
    bool Insert(Exchange::ptr& exp);
    // 移除数据
    void Remove(const string& name);
    // 读取表的数据恢复
    ExchangeMap Recovery();
};


// 3. 定义交换机数据内存管理类
class ExchangeManager
{
public:
    using ptr = shared_ptr<ExchangeManager>;
    ExchangeManager(const string& db_file) : _mapper(db_file);
    // 声明交换机
    bool DeclareExchange(const string& name,
        MessageQueue::ExchangeType type, 
        bool durable, bool auto_delete,
        unordered_map<string, string>& args);
    // 删除交换机
    void DeleteExchange(const string& name);
    // 根据名称获取指定交换机对象
    Exchange::ptr selectExchange(const string& name);
    // 判断是否存在
    bool exists(const string& name);
    // 清除
    void clear();
    size_t ExchangeSize();
};

队列数据管理

// 1. 队列描述数据类
struct Queue 
{
    using ptr = shared_ptr<Queue>;
    Queue(){}
    Queue(const string& qname, bool qdurable, bool qexclusive, bool qauto_delete,
        unordered_map<string, string> qargs)
        : name(qname), durable(qdurable), exclusive(qexclusive), 
        auto_delete(qauto_delete), args(qargs)
    {}

    // args存储键值对,在进行数据库存储的时候,组织字符串格式进行存储
    // 内部解析这种key=val&key=val,把数据存储到数据库中
    void SetArgs(const string& str_args);

    // 从args中把数据存储到字符串中
    string GetArgs();
    string name;
    bool durable;
    bool exclusive;
    bool auto_delete;
    unordered_map<string, string> args;
};

// 2. 队列数据持久化类
class QueueMapper
{
public:
    QueueMapper(const string& db_file)
        : _sql_helper(db_file);
    void CreateTable();
    void RemoveTable();
    bool Insert(Queue::ptr& qe);
    void Remove(const string& name);
    using QueueMap = unordered_map<string, Queue::ptr>;
    QueueMap Recovery();
};

// 3. 队列数据管理类
class QueueManager 
{
public:
    using ptr = shared_ptr<QueueManager>;
    QueueManager(const string &dbfile):_mapper(dbfile);
    bool declareQueue(const string &qname, 
        bool qdurable, 
        bool qexclusive,
        bool qauto_delete,
        const unordered_map<string, string> &qargs);
    void deleteQueue(const string &name);
    Queue::ptr selectQueue(const string &name);
    QueueMapper::QueueMap allQueues();
    bool exists(const string &name);
    size_t size();
    void clear();
};

绑定信息管理

绑定信息,本质上描述的是交换机关联了哪些队列

// 1. 定义绑定信息类
struct Binding
{
    using ptr = shared_ptr<Binding>;
    Binding() {}
    Binding(const string& bexchange_name, const string& bqueue_name, const string& bbinding_key)
        : exchange_name(bexchange_name), queue_name(bqueue_name), binding_key(bbinding_key)
    {}
    string exchange_name;
    string queue_name;
    string binding_key;
};

// 队列与绑定信息是一一对应的,因此一个交换机可能会有多个队列的绑定信息
// 队列名与绑定信息的映射关系
using QueueBindingMap = unordered_map<string, Binding::ptr>;
// 交换机与队列的映射关系
using BindingMap = unordered_map<string, QueueBindingMap>;

// unordered_map<string, Binding::ptr>; 队列与绑定
// unordered_map<string, Binding::ptr>; 交换机与绑定

// 2. 定义绑定信息持久化
class BindingMapper
{
public:
    BindingMapper(const string& db_file)
        : _sql_helper(db_file)
    {}
    void CreateTable();
    void RemoveTable();
    bool Insert(Binding::ptr& binding);
    void Remove(const string& ename, const string& qname);
    void RemoveExchangeBindings(const string& ename);
    void RemoveQueueBindings(const string& qname);
    BindingMap Recover();
private:
    SqliteHelper _sql_helper;
};

// 3. 绑定信息管理
class BindingManager
{
public:
    BindingManager(const string& dbfile) 
        : _mapper(dbfile)
    {}
    bool bind(const string& ename, const string& qname, const string& key);
    void unbind(const string& ename, const string& qname);
    void RemoveQueueBindings(const string& qname);
    QueueBindingMap GetExchangeBindings(const string& ename);

    Binding::ptr GetBinding(const string& ename, const string& qname);
    bool exists(const string& ename, const string& qname);
    size_t size();
    void clear();

private:
    mutex _mutex;
    BindingMapper _mapper;
    BindingMap _buildings;
};

消息管理

消息信息管理

对于消息来说,首先要先描述它,消息大概包含内容和属性,对于属性来说包含的有消息ID,消息的routing_key,消息的投递模式等,而对于内容来说就是实际的消息数据内容

对于服务器上的消息管理来说,最重要的是要进行消息的持久化管理,在每一条消息上都有可能要进行持久化存储,推送给客户端再删除,那么这样的每次进行数据的存储就要进行重写一次文件,效率非常低下

  1. 因此,需要添加一个消息有效标志,每次只需要把这个有效标志位对应的数据进行修正为无效即可,消息有效标志需要随着消息的持久化内容一起进行持久化
  2. 当要删除某个消息时,需要重写这个消息在文件的对应位置,此时就把有效标志位记为无效即可,那么就需要有消息的实际存储位置,需要有文件的偏移量
  3. 当恢复历史消息的时候,需要解决粘包问题,因此就需要一个消息的长度这样的字段

消息的持久化管理

概述:

  1. 当消息文件垃圾回收的时候,需要重新加载所有的有效消息,重新生成一个新的数据文件进行写入,但是生成新的文件后,存储位置就会发生改变,因此就需要更新内存中的数据信息,这时候就需要把所有的队列数据进行加锁,然后进行更新,效率会比较低,因此,一种解决方案是,使用队列进行管理,每个队列使用自己独立的数据文件,每次只需要对操作的队列数据进行加锁即可,因此,每个队列都有自己的文件
  2. 由于要存储在文件当中,因此就会有数据格式的要求,具体格式安排是:4字节格式和长度为一组,解决粘包问题

提供的操作:

  1. 消息文件的创建和删除
  2. 消息的新增持久化和删除持久化
  3. 历史数据恢复和垃圾回收

垃圾回收的思想

  1. 垃圾回收的使用场景:每次删除数据并不是真正删除,而是直接进行标记位的标记,因此数据会越来越多,所以必然是需要回收的,因此可以定义一个场景,比如当文件的有效消息超过某些条数,并且有效消息的比例过低时,可以选择进行垃圾回收
  2. 回收思想:加载文件中所有有效消息,写到临时文件,写入完毕后删除源文件,将临时文件进行一个替换即可,返回所有的有效消息,消息中就会记录了新的存储位置,这样就可以更新内存中的数据内容

需要管理的数据

  1. 队列名
  2. 根据队列名生成数据文件名
  3. 根据队列名生成临时文件名

函数设计如下:

// 使用队列来管理消息,每个队列有自己的文件
class MessageMapper
{
public:
    MessageMapper(string& basedir, const string& qname) :
        _qname(qname);
    
    bool CreateMessageFile();

    void RemoveMessageFile();

    bool Insert(MessagePtr& msg);

    bool Remove(MessagePtr& msg);e;
    }

    // 垃圾回收
    list<MessagePtr> gc();
};

消息内存数据管理

以队列为单元进行管理

如果内存中所有的消息整体进行管理,那么在进行垃圾回收以及恢复历史消息上就会比较麻烦,因此依旧是使用队列来进行管理,每一个队列中都有消息数据的管理结构,然后最终向外提供一个消息管理类

队列消息管理

对于队列消息来说:

  1. 创建文件恢复队列历史消息数据
  2. 新增消息/确认消息
  3. 垃圾回收,当持久化数据总量和比例到达一定程度就进行垃圾回收的策略
  4. 获取队首消息
  5. 删除队列所有消息
  6. 获取待推送消息数量
  7. 获取待确认消息数量
  8. 获取持久化消息数量

需要进行管理的数据:

  1. 持久化的管理句柄
  2. 待推送消息链表:以头插尾删的思想实现队列功能
  3. 持久化消息的hashmap:垃圾回收后需要更新消息的实际存储位置
  4. 待确认消息的hashmap:一条消息被推送给客户端,有些消息需要确认
  5. 持久化文件有效消息和总体消息的数量
  6. 队列名称
class QueueMessage
{
public:
    QueueMessage(const string& basedir, const string& qname);
    bool Insert(const MessageQueue::BasicProperties *bp, const string& body);
    // 每次删除消息后,判断是否需要垃圾回收
    bool Remove(const string& msg_id);
    MessagePtr front();
    size_t PushCount();
    size_t TotalCount();
    size_t DurableCount();
    size_t WaitAckCount();
    void clear();
private:
    string _qname;
    size_t _valid_count;
    size_t _total_count;
    list<MessagePtr> _msgs;
    unordered_map<string, MessagePtr> _durable_msgs;
    unordered_map<string, MessagePtr> _wait_msgs;
};

消息对外接口类

最后要提供一个对外接口类,管理的是每一个队列的消息

管理成员

  1. 互斥锁
  2. 每个队列的消息管理句柄,队列名称和队列消息管理句柄的hash表

提供操作

  1. 初始化队列的消息管理句柄,用来创建队列用
  2. 销毁队列的消息管理句柄
  3. 队列的各项消息操作,新增,获取,确认,回复等等
// 对外提供操控消息的总接口
class MessageManager
{
public:
    using ptr = shared_ptr<MessageManager>;
    MessageManager(const string& basedir) : _basedir(basedir);

    void clear();

    // 把队列进行初始化获取消息
    void InitQueueMessage(const string& qname);

    // 销毁队列
    void DestroyQueueMessage(const string& qname);

    // 插入消息
    bool Insert(const string& qname, MessageQueue::BasicProperties* bp, const string& body, bool queue_is_durable);

    // 获取队首消息信息
    MessagePtr Front(const string& qname);

    // 获取ack应答
    void ack(const string& qname, const string& msg_id);

    size_t GetableCount(const string& qname);

    size_t TotalCount(const string& qname);

    size_t DurableCount(const string& qname);

    size_t WaitAckCount(const string& qname);
private:
    mutex _mutex;
    string _basedir;
    unordered_map<string, QueueMessage::ptr> _queue_msgs;
};

虚拟机信息管理

虚拟机是对于上述的三个数据管理模块的整合,并基于数据之间的关联关系进行联合操作,这个模块其实就是把前面的模块都整合起来了

class VirtualHost
{
public:
    VirtualHost(const string& basedir, const string& dbfile);

    bool DeclareExchange(const string& name,
        MessageQueue::ExchangeType type, 
        bool durable, bool auto_delete,
        unordered_map<string, string>& args);

    void DeleteExchange();

    bool DeclareQueue(const string &qname, 
        bool qdurable, 
        bool qexclusive,
        bool qauto_delete,
        const unordered_map<string, string> &qargs);

    void DeleteQueue();

    bool Bind(const string& ename, const string& qname, const string& key, bool durable);

    void UnBind(const string& ename, const string& qname);

    QueueBindingMap ExchangeBindings(const string& ename);

    bool BasicPublish(const string& qname, MessageQueue::BasicProperties* bp, const string& body, MessageQueue::DeliveryMode mode);

    MessagePtr BasicConsume(const string& qname);

    bool BasicAck(const string& qname, const string& msgid);

private:
    ExchangeManager::ptr _emp;
    QueueManager::ptr _qmp;
    BindingManager::ptr _bmp;
    MessageManager::ptr _mmp;
};

交换机路由管理模块

主要功能就是根据routing_key和binding_key判断是否匹配成功

class Router 
{
public:
    static bool isLegalRoutingKey(const string &routing_key);
    static bool isLegalBindingKey(const string &binding_key);
    static bool route(MessageQueue::ExchangeType type, const string &routing_key, const string &binding_key);
};

消费者和订阅者模块

消费者信息结构

  1. 消费者标识
  2. 订阅的队列名称
  3. 一个消息的处理回调函数
  4. 是否自动应答
struct Consumer 
{
    using ptr = shared_ptr<Consumer>;
    Consumer()
    {
        lg(Debug, "创建新的消费者:%p", this);
    }

    Consumer(const string& ctag, const string& queue_name, bool ack_flag, const ConsumerCallback& cb)
        : tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb))
    {
        lg(Debug, "创建新的消费者:%p", this);
    }

    ~Consumer()
    {
        lg(Debug, "移除消费者:%p", this);
    }

    string tag;
    string qname;
    bool auto_ack;
    ConsumerCallback callback;
};

消费者管理

管理要以队列为单元进行管理

操作

  1. 新增消费者
  2. 删除消费者
  3. 从队列所有消费者中取出消费者进行推送

元素

  1. 消费者管理结构,vector
  2. 轮转序号
  3. 互斥锁
  4. 队列名称
// 以队列为单元进行消费者管理
class QueueConsumer
{
public:
    using ptr = shared_ptr<QueueConsumer>;
    QueueConsumer(const string& qname);

    // 新增消费者
    Consumer::ptr Create(const string& ctag, const string& queue_name, bool ack_flag, const ConsumerCallback& cb);

    // 从队列移除消费者
    void Remove(const string& ctag);

    // 从队列中获取消费者
    Consumer::ptr Choose();

    // 判断是否为空
    bool Empty();

    // 判断指定消费者是否存在
    bool Exists(const string& ctag);

    // 清理所有消费者
    void clear();

private:
    string _qname;
    mutex _mutex;
    uint64_t _rr_seq;
    vector<Consumer::ptr> _consumers;
};

消费者统一管理结构

  1. 初始化/删除队列的消费者信息结构
  2. 向指定队列新增消费者
  3. 从指定队列移除消费者
  4. 移除指定队列所有消费者
  5. 从指定队列获取消费者
// 消费者统一管理结构
class ConsumerManager
{
public:
    using ptr = shared_ptr<ConsumerManager>;
    ConsumerManager()
    {}

    void InitQueueConsumer(const string& qname);

    void DestroyQueueConsumer(const string &qname);

    Consumer::ptr Create(const string &ctag, const string &queue_name,  bool ack_flag, const ConsumerCallback &cb);

    void remove(const string &ctag, const string &queue_name);

    Consumer::ptr choose(const string &queue_name);

    bool empty(const string &queue_name);

    bool exists(const string &ctag, const string &queue_name);

    void clear();
private:
    mutex _mutex;
    unordered_map<string, QueueConsumer::ptr> _qconsumers;
};

信道管理模块

管理信息

  1. 信道ID
  2. 信道关联的消费者
  3. 信道管理的连接
  4. 消费者管理句柄
  5. 虚拟机句柄
  6. 工作线程池句柄
  7. Protubuf协议处理句柄

管理操作

  1. 提供声明/删除交换机操作
  2. 提供声明/删除队列操作
  3. 提供声明/解绑队列操作
  4. 提供订阅/取消订阅队列消息操作
  5. 提供发布/确认消息操作
class Channel 
{
public:
    using ptr = shared_ptr<Channel>;
    Channel(const string &id, 
        const VirtualHost::ptr &host, 
        const ConsumerManager::ptr &cmp, 
        const ProtobufCodecPtr &codec, 
        const muduo::net::TcpConnectionPtr &conn,
        const threadpool::ptr &pool);

    //交换机的声明与删除
    void declareExchange(const declareExchangeRequestPtr &req);

    void deleteExchange(const deleteExchangeRequestPtr &req);

    //队列的声明与删除
    void declareQueue(const declareQueueRequestPtr &req);

    void deleteQueue(const deleteQueueRequestPtr &req);
    
    //队列的绑定与解除绑定
    void queueBind(const queueBindRequestPtr &req);

    void queueUnBind(const queueUnBindRequestPtr &req);

    //消息的发布
    void basicPublish(const basicPublishRequestPtr &req);

    //消息的确认
    void basicAck(const basicAckRequestPtr &req);

    //订阅队列消息
    void basicConsume(const basicConsumeRequestPtr &req);

    //取消订阅
    void basicCancel(const basicCancelRequestPtr &req);

private:
    void callback(const string tag, const MessageQueue::BasicProperties *bp, const string &body);
    void consume(const string &qname);
    void basicResponse(bool ok, const string &rid, const string &cid);
private:
    string _cid;
    Consumer::ptr _consumer;
    muduo::net::TcpConnectionPtr _conn;
    ProtobufCodecPtr _codec;
    ConsumerManager::ptr _cmp;
    VirtualHost::ptr _host;
    threadpool::ptr _pool;
};

class ChannelManager 
{
public:
    using ptr = shared_ptr<ChannelManager>;
    ChannelManager(){}

    bool openChannel(const string &id, 
        const VirtualHost::ptr &host, 
        const ConsumerManager::ptr &cmp, 
        const ProtobufCodecPtr &codec, 
        const muduo::net::TcpConnectionPtr &conn,
        const threadpool::ptr &pool);

    void closeChannel(const string &id);

    Channel::ptr getChannel(const string &id);
private:
    mutex _mutex;
    unordered_map<string, Channel::ptr> _channels;
};

服务器模块

对于服务器模块来说,主要是要借助于muduo库来进行实现,这里考虑到篇幅不再进行讲解,这里进行一个的总结:

首先提供了一个server发服务器,这是有一个通用的TCP服务器,而在其当中包含了一个主Reactor,这个Reactor主要是进行一个事件的读取,当有新连接到来后要如何进行读取和分发以及上面的io事件如何处理都是在这个模块来进行实现的,而对于新连接来说,会有一个dispatcher分发器,这个分发器的作用就是根据不同的消息类型进行合适的分发,装载到不同的处理函数当中,这就是muduo库一个整体的框架,而在这上面就是一些应用层的处理,例如有诸如Protubuf编解码器,进行应用层协议的封装,参数传递等等

创建MQBrokerServer模块

MQBrokerServer是对于整体服务器模块的整合,进行客户端的请求,提供对应的服务,为了方便查看,把讲解作为注释加到了代码中,这里就不进行讲解了

class Server 
{
public:
    typedef shared_ptr<google::protobuf::Message> MessagePtr;
    /*

    把服务器进行初始化,传递一个端口号和根目录,要初始化的内容有:
    1. 服务器 2. 分发器 3. Protubuf协议解析器 4. 虚拟机 5. 消费者 6. 连接管理 7. 线程池

    1. 服务器
    muduo库中的接口:对于服务器来说,要指定EventLoop,ip端口号,名字和选项
    TcpServer(EventLoop* loop, const InetAddress& listenAddr, const string& nameArg, Option option = kNoReusePort);

    2. 分发器:遇到不同情况怎么处理
    muduo库中的接口:需要指定一个defaultCb
    ProtobufDispatcher(const ProtobufMessageCallback& defaultCb);

    3. Protubuf协议解析器:进行Protubuf的协议解析
    muduo库中的接口:需要一个ProtobufMessageCallback函数回调
    ProtobufCodec(const ProtobufMessageCallback& messageCb);

    4. 虚拟机:前面自定义类型
    VirtualHost(const string &hname, const string &basedir, const string &dbfile);
    5. 消费者:前面自定义类型
    6. 连接:前面自定义类型
    7. 线程池:前面自定义类型
    */
    
    Server(int port, const string &basedir);
    
    // 启动服务器
    void start();

private:
    //打开信道
    void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp);

    //关闭信道
    void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp);

    //声明交换机
    void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp);

    //删除交换机
    void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp);

    //声明队列
    void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp);

    //删除队列
    void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp);

    //队列绑定
    void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp);

    //队列解绑
    void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp);

    //消息发布
    void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp);

    //消息确认
    void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp);

    //队列消息订阅
    void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp);

    //队列消息取消订阅
    void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp);

    void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp);

private:
    muduo::net::EventLoop _baseloop;
    muduo::net::TcpServer _server;//服务器对象
    ProtobufDispatcher _dispatcher;//请求分发器对象--要向其中注册请求处理函数
    ProtobufCodecPtr _codec;//protobuf协议处理器--针对收到的请求数据进行protobuf协议处理
    VirtualHost::ptr _virtual_host;
    ConsumerManager::ptr _consumer_manager;
    ConnectionManager::ptr _connection_manager;
    threadpool::ptr _threadpool;
};

客户端模块

在RabbitMQ当中,提供服务的是信道,因此在客户端的实现中,就弱化了对于客户端的概念,在这个项目当中并不会为用户展示出网络通信的概念,而是会用提供服务的方式来进行体现

其实总体来说,就是用一个接口来实现一个功能,接口内部完成向客户端请求的过程,对外不体现出客户端和服务端通信的概念,用户需要什么服务就调用什么接口即可

因此,对于客户端模块主要有以下的四大模块

  1. 订阅者模块:表示这是一个消费者
  2. 信道模块:包含一些常用接口,消息发布确认等等
  3. 连接模块:它的作用是进行打开和关闭信道
  4. 异步线程模块:客户端连接的IO事件监控,推送来的消息进行异步处理线程

基于上述的部分,就可以实现出下面的部分:

订阅者模块

包含的描述信息

  1. 订阅者标识
  2. 订阅队列名
  3. 是否自动确认
  4. 收到消息的回调函数
struct Consumer
{
    using ptr = shared_ptr<Consumer>;
    string tag;    //消费者标识
    string qname;  //消费者订阅的队列名称
    bool auto_ack;      //自动确认标志
    ConsumerCallback callback;

    Consumer()
    {
        lg(Debug, "new Consumer: %p", this);
    }

    Consumer(const string &ctag, const string &queue_name,  bool ack_flag, const ConsumerCallback &cb)
        : tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb))
    {
        lg(Debug, "new Consumer: %p", this);
    }

    ~Consumer() 
    {
        lg(Debug, "del Consumer: %p", this);
    }
};

信道管理模块

在服务端中有信道,自然在客户端中也有信道的概念,并且功能和服务端基本一致,不管是说是客户端的Channel还是服务端的Channel都是为了给用户提供服务而存在,区别为,服务端是给客户端提供服务的,而客户端是给用户提供服务的,也可以理解为是客户端的Channel来进行接口的调用,向服务端发送对应的请求,获取请求的服务

信道的描述信息

  1. 信道ID
  2. 信道关联的网络通信连接对象
  3. Protubuf协议处理对象
  4. 信道关联的消费者
  5. 请求对应的响应信息队列
  6. 互斥锁条件变量

信道的组织操作

  1. 提供创建信道操作
  2. 提供删除信道操作
  3. 提供交换机声明
  4. 提供删除交换机操作
  5. 提供创建队列操作
  6. 提供删除队列操作
  7. 提供交换机-队列绑定操作
  8. 提供交换机-队列解除绑定操作
  9. 提供添加订阅操作
  10. 提供解除订阅操作
  11. 提供发布消息操作
  12. 提供确认消息操作

信道的管理操作

  1. 创建信道
  2. 查询信道
  3. 删除信道

异步工作线程池

在客户端的角度有两个线程池需要处理:

  1. muduo库中的异步循环线程EventLoopThread
  2. 收到消息后进行异步处理的工作线程池

连接管理模块

对于客户端来说,操作的思想就是先创建连接,之后通过连接创建信道,通过信道提供服务这一系列的流程,这个模块就是针对于muduo库客户端进行的二次封装,向用户提供了一个创建Channel的接口,创建信道后,借助信道来提供指定的服务

class Connection 
{
public:
    using ptr = shared_ptr<Connection>;
    // 进行连接的创建实例
    Connection(const string &sip, int sport, const AsyncWorker::ptr &worker);
    // 打开信道
    Channel::ptr openChannel();
    // 关闭信道
    void closeChannel(const Channel::ptr &channel);

private:
    // 通用响应
    void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp);
    // 消息的推送
    void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumeResponsePtr& message, muduo::Timestamp);
    // 未知的信息
    void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp);
    // 连接
    void onConnection(const muduo::net::TcpConnectionPtr&conn);
private:
    muduo::CountDownLatch _latch; //实现同步的
    muduo::net::TcpConnectionPtr _conn; //客户端对应的连接
    muduo::net::TcpClient _client; //客户端
    ProtobufDispatcher _dispatcher; //请求分发器
    ProtobufCodecPtr _codec; //协议处理器
    AsyncWorker::ptr _worker;
    ChannelManager::ptr _channel_manager;
};

结束

至此,项目就基本结束了,但是这个项目毕竟是组件,实际的应用场景还是要结合上层的逻辑实际处理来完成,后续会更新它的使用场景项目

相关推荐

  1. SpringCloud-实现基于RabbitMQ消息队列

    2024-06-06 19:38:02       23 阅读
  2. 消息队列 RabbitMQ python实战

    2024-06-06 19:38:02       14 阅读
  3. 项目记录:RabbitMq+Redis配置消息队列

    2024-06-06 19:38:02       30 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-06 19:38:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-06 19:38:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-06 19:38:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-06 19:38:02       18 阅读

热门阅读

  1. make 是啥

    2024-06-06 19:38:02       8 阅读
  2. Android基础-工程目录结构说明

    2024-06-06 19:38:02       7 阅读
  3. 呼叫中心系统如何融入信创国产战略?

    2024-06-06 19:38:02       7 阅读
  4. 程序员高效工作(摸鱼)的 10 个方法

    2024-06-06 19:38:02       7 阅读
  5. nn.GRU和nn.GRUCell区别

    2024-06-06 19:38:02       9 阅读
  6. 【MySQL】探索 MySQL 的 GROUP_CONCAT 函数

    2024-06-06 19:38:02       11 阅读
  7. MySQL binlog三种模式

    2024-06-06 19:38:02       8 阅读
  8. Spring Boot中如何接入jetcache缓存

    2024-06-06 19:38:02       8 阅读
  9. selenium中switch_to.window切换窗口的用法

    2024-06-06 19:38:02       10 阅读
  10. 【杂记-浅析TCP预测攻击】

    2024-06-06 19:38:02       9 阅读
  11. 政府窗口服务第三方评估主要内容

    2024-06-06 19:38:02       7 阅读
  12. Docker 安装部署(CentOS 8)

    2024-06-06 19:38:02       8 阅读