RocketMQ 消息的顺序和重复

世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!—— 沈询

一、消息顺序

分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:

  1. 消息的顺序问题
  2. 消息的重复问题

有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:

1. 不关注乱序的应用实际大量存在

2. 队列无序并不意味着消息无序

最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。

一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

// RocketMQ默认提供了两种MessageQueueSelector实现:随机/HashSendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Overridepublic MessageQueue select(List mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); }}, orderId);

在获取到路由信息以后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

private SendResult send() { // 获取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; // 根据我们的算法,选择一个发送队列// 这里的arg = orderId mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); if (mq != null) { returnthis.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); } }}

二、消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。

造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

1. 消费端处理消息的业务逻辑保持幂等性

2. 保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现

第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。

我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。

RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。

三、事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。

大事务 = 小事务 + 异步

将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。

那问题是:我们是先扣款还是先发送消息呢?

可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。

四、Producer

// 构造ProducerDefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");// 初始化Producer,整个应用生命周期内,只需要初始化1次

producer.start();// 构造

MessageMessage msg = new Message("TopicTest1",// topic"TagA",// tag:给消息打标签,用于区分一类消息,可为null"OrderID188",// key:自定义Key,可以用于去重,可为null ("Hello MetaQ").getBytes());// body:消息内容// 发送消息并返回结果SendResult sendResult = producer.send(msg);// 清理资源,关闭网络连接,注销自己producer.shutdown();

在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:

1. 如果没有指定namesrv地址,将会自动寻址

2. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳…

3. 启动负载均衡的服务

五、消息存储

RocketMQ的消息存储是由consume queue和commit log配合完成的。

1、Consume Queue

consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log上的位置。

我们可以在配置中指定consumequeue与commitlog存储的目录

每个topic下的每个queue都有一个对应的consumequeue文件,比如:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

  1. 根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。
  2. 按照消费端的GroupName来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA。
    1. 按照消费端的GroupName来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA。

死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:

CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量

Size存储中消息的大小

Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)

2、Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机所有的queue共享,不做任何区分。

文件的默认位置如下,仍然可通过配置文件修改:

${user.home} \store\${commitlog}\${fileName}

六、消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

RocketMQ最佳实践

一、Producer最佳实践

1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。

2、每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。

3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

4、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。

5、某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

二、Consumer最佳实践

  1. 消费过程要做到幂等(即消费端去重)
  2. 尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量
  3. 优化每条消息消费过程

三、其他配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

RocketMQ 版提供三种方式来发送普通消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。

一、RocketMQ中的专业术语

  • Topic

topic表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息…… 一条消息必须有一个Topic。

  • Tag

Tag表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息….. 一条消息可以没有Tag。RocketMQ提供2级消息分类,方便大家灵活控制。

  • Queue

一个topic下,我们可以设置多个queue(消息队列)。当我们发送消息时,需要要指定该消息的topic。RocketMQ会轮询该topic下的所有队列,将消息发送出去。

  • Producer 与 Producer Group

Producer表示消息队列的生产者。消息队列的本质就是实现了publish-subscribe模式,生产者生产消息,消费者消费消息。所以这里的Producer就是用来生产和发送消息的,一般指业务系统。

Producer Group是一类Producer的集合名称,这类Producer通常发送一类消息,且发送逻辑一致。

  • Consumer 与 Consumer Group

消息消费者,一般由后台系统异步消费消息。

  • Push Consumer

Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。

  • Pull Consumer

Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。

Consumer Group是一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

  • Broker

消息的中转者,负责存储和转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。

  • 广播消费

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一次。在广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

  • 集群消费

一个Consumer Group中的Consumer实例平均分摊消费消息。例如某个Topic有 9 条消息,其中一个Consumer Group有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。

  • NameServer

NameServer即名称服务,两个功能:

接收broker的请求,注册broker的路由信息

接口client的请求,根据某个topic获取其到broker的路由信息

NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册;Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定时获取topic路由信息。

二、RocketMQ Overview

Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列;如果做集群消费,则多个Consumer实例平均消费这个Topic对应的队列集合。

再看下RocketMQ物理部署结构图:

RocketMQ网络部署特点:

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId=0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer 完全无状态,可集群部署。

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

参考:MQ如何解决消息的顺序&重复两大硬伤?-CSDN博客

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-22 17:56:03       52 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-22 17:56:03       54 阅读
  3. 在Django里面运行非项目文件

    2024-07-22 17:56:03       45 阅读
  4. Python语言-面向对象

    2024-07-22 17:56:03       55 阅读

热门阅读

  1. 基于vite + pnpm monorepo 实现一个UI组件库

    2024-07-22 17:56:03       18 阅读
  2. sqlserver15(2019)修改参数READ_COMMITTED_SNAPSHOT

    2024-07-22 17:56:03       16 阅读
  3. 黑龙江网络安全等级保护测评策略概述

    2024-07-22 17:56:03       14 阅读
  4. Hi3751V351常用命令

    2024-07-22 17:56:03       15 阅读
  5. 鸿蒙笔记--存储

    2024-07-22 17:56:03       15 阅读
  6. Github订阅地址

    2024-07-22 17:56:03       15 阅读
  7. Qt:愚蠢的qmake

    2024-07-22 17:56:03       20 阅读
  8. 《设计模式之美》读书笔记2

    2024-07-22 17:56:03       16 阅读
  9. Seata 面试题及答案整理,最新面试题

    2024-07-22 17:56:03       19 阅读