RocketMQ

1.RocketMQ的架构是怎么样的?

RocketMQ主要由Producer、Broker和Consumer三部分组成,如下图所示:

  • Producer:消息生产者,负责将消息发送到Broker。

  • Broker:消息中转服务器,负责存储和转发消息。RocketMQ支持多个Broker构成集群,每个Broker都拥有独立的存储空间和消息队列。

  • Topic:消息主题,是消息的逻辑分类单位。Producer将消息发送到特定的Topic中,Consumer从指定的Topic中消费消息。

  • Message Queue:消息队列,是Topic的物理实现。一个Topic可以有多个Queue,每个Queue都是独立的存储单元。Producer发送的消息会被存储到对应的Queue中,Consumer从指定的Queue中消费消息。

  • Consumer:消息消费者,负责从Broker消费消息。

  • NameServer:名称服务,维护了Broker地址、Topic和Queue等元数据信息及它们之间的映射关系。Producer和Consumer在启动时需要连接到NameServer获取Broker的地址信息。

2.RocketMQ的事务消息是如何实现的?

RocketMQ的事务消息是通过TransactionListener接口来实现的。

在发送事务消息时,首先向RocketMQ Broker发送一条“half消息”(即半消息),半消息将被存储在Broker端的事务消息日志中,但是这个消息还不能被消费者消费。

接下来,在半消息发送成功后,应用程序通过执行本地事务来确定是否要提交该事务消息。

如果本地事务执行成功,就会通知RocketMQ Broker提交该事务消息,使得该消息可以被消费者消费;否则,就会通知 RocketMQ Broker 回滚该事务消息,该消息将被删除,从而保证消息不会被消费者消费。

拆解下来的话,主要有以下4个步骤:

  • 发送半消息:应用程序向RocketMQ Broker发送一条半消息,该消息在Broker端的事务消息日志中被标记为"prepared"状态。

  • 执行本地事务:RocketMQ会通知应用程序执行本地事务。如果本地事务执行成功,应用程序通知RocketMQ Broker 提交该事务消息。

  • 提交事务消息:RocketMQ Broker 收到提交消息以后,会在事务消息日志中将该消息的状态从"prepared"改为"committed",消息最终会进入消息队列,使得该消息可以被消费者消费。

  • 回滚事务消息:如果本地事务执行失败,应用程序通知 RocketMQ Broker 回滚该事务消息,RocketMQ 将该消息的状态从"prepared"改为"rollback",并将该消息从事务消息日志中删除,从而保证该消息不会被消费者消费。

  • 未知事务状态:如果Broker收到的消息不能确认本地事务执行成功还是失败,Broker每隔一段时间会对未提交的半事务消息进行回查,来获取事务状态。

3.RocketMQ如何保证消息的有序性?

RocketMQ中提供了基于队列的顺序消费。即同一个队列内的消息可以做到有序。需要发送顺序消息时,

在生产者一方,需要在send方法中,传入一个MessageQueueSelector,并且,MessageQueueSelector中需要实现一个select方法,这个方法就是用来定义要把消息发送到哪个MessageQueue的,通常可以使用取模法进行路由:

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    //mqs:该Topic下所有可选的MessageQueue  
    //msg:待发送的消息 
    //arg:发送消息时传递的参数
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;   
        //根据参数,计算出一个要接收消息的MessageQueue的下标    
        int index = id % mqs.size();
        //返回这个MessageQueue
        return mqs.get(index);
    }
}, orderId);

在消费者一方,需要使用MessageListener回调函数的有序消费模式MessageListenerOrderly接收消息:

consumer.registerMessageListener(new MessageListenerOrderly() {
    Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs ,ConsumeOrderlyContext context) {
        System.out.printf("Receive order msg:" + new String(msgs.get(0).getBody()));
        return ConsumeOrderlyStatus.SUCCESS; 
    }
});

消费者还需要通过三次加锁来保证顺序消费,这个加锁过程是自动的:

1. 分布式锁申请:

  • 消费者初始化时,ConsumeMessageOrderlyService 会尝试向 Broker 申请分布式锁。如果获取成功,后续该消费者组的消息将只分发给当前消费者。

2. MessageQueue加锁:

  • 在开始实际处理消息之前,需要申请 MessageQueue 锁。申请成功后,可以确保在同一时间内,同一个队列中只有一个消费者线程可以处理消息。这样可以保证消息在同一个队列中被顺序处理。

3. ProcessQueue加锁:

  • 在消息拉取的过程中,消费者会一次性拉取多条消息,并将其放入本地的 ProcessQueue 中。
  • 在处理 ProcessQueue 中的消息时,为了确保在重平衡(例如消费者加入或退出)期间不会出现消息重复消费,需要对 ProcessQueue 进行加锁。

4.RocketMQ如何保证消息不丢失?

RocketMQ的消息想要确保不丢失,需要生产者、消费者以及Broker的共同努力,缺一不可。

在生产者端,消息的发送分为同步发送和异步发送。同步发送是丢失几率最小的一种发送方式,消息的发送会同步阻塞等待Broker返回结果,在Broker确认收到消息之后,生产者才会拿到SendResult。如果这个过程中发生异常,那么就说明消息发送可能失败了,就需要生产者进行重新发送消息。

在Broker端,Broker接收到消息后会把消息先存储到内存中,存储成功之后,就返回确认结果给生产者。Broker自己会通过异步刷盘的方式将内存中的数据刷新到磁盘上。但是这个过程中,如果机器挂了,那么就可能会导致数据丢失。如果想要保证消息不丢失,可以将消息保存机制修改为同步刷盘,这样,Broker会在同步请求中把数据保存在磁盘上,确保保存成功后再返回确认结果给生产者。

#默认情况为ASYNC_FLUSH
flushDiskType = SYNC_FLUSH

为了进一步保证消息不丢失,Broker还可以通过集群的方式进行部署,通常采用一主多从的部署方式,并且采用主从同步的方式做数据复制。当主Broker宕机时,从Broker会接管主Broker的工作,保证消息不丢失。同时,RocketMQ的Broker还可以配置多个实例,消息会在多个Broker之间进行冗余备份,从而保证数据的可靠性。

默认方式下,Broker在接收消息后,写入 master 成功,就可以返回确认响应给生产者了,接着消息将会异步复制到 slave 节点。但是如果这个过程中,Master的磁盘损坏了。那就会导致数据丢失了。

如果想要解决这个问题,可以配置同步复制的方式,即Master在将数据同步到Slave节点后,再返回给生产者确认结果。

#默认为ASYNC_MASTER
brokerRole=SYNC_MASTER

在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 当然,也可以先把数据保存在数据库中,就返回,然后自己再慢慢处理。

但是,需要注意的是RocketMQ只能最大限度的保证消息不丢失,没有办法做到100%保证不丢失。

5.RocketMQ有几种集群方式?

3种,分别是单Master模式、多Master模式以及多Master多Slave模式。

单Master集群:这是一种最简单的集群方式,只包含一个Master节点和若干个Slave节点。所有的写入操作都由Master节点负责处理,Slave节点主要用于提供读取服务。当Master节点宕机时,集群将无法继续工作。

多Master集群:这种集群方式包含多个Master节点,不部署Slave节点。这种方式的优点是配置简单,单个Master宕机或重启维护对应用无影响。在磁盘配置为RAID10时,即使机器宕机不可恢复的情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;缺点是单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

多Master多Slave集群:这种集群方式包含多个Master节点和多个Slave节点。每个Master节点都可以处理写入操作,并且有自己的一组Slave节点。当其中一个Master节点宕机时,消费者仍然可以从Slave消费。优点是数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;缺点是性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

6.RocketMQ消息堆积了怎么解决?

RocketMQ的消息堆积,一般都是因为客户端本地消费过程中,由于消费耗时过长或消费并发度较小等原因,导致客户端消费能力不足,出现消息堆积的问题。

当线上出现消息堆积的问题时,一般有以下几种方式来解决:

  • 增加消费者数量:消息堆积了,消费不过来了,那就把消费者的数量增加一下,让更多的实例来消费这些消息。

  • 提升消费者消费速度:消费者消费的慢可能是消息堆积的主要原因,想办法提升消费速度,比如引入线程池、本地消息存储后即返回成功后续再慢慢消费等。

  • 降低生产者的生产速度:如果生产者可控的话,可以让生产者生成消息的速度慢一点。

  • 增加 Topic 队列数:如果一个 Topic 的队列数比较少,那么就容易出现消息堆积的情况。可以通过增加队列数来提高消息的处理并发度,从而减少消息堆积。

  • 清理过期消息:有一些过期消息、或者一直无法成功的消息,在业务做评估之后,如果无影响或者影响不大,其实是可以清理的。

  • 调整 RocketMQ 的配置参数:RocketMQ 提供了很多可配置的参数,例如消息消费模式、消息拉取间隔时间等,可以根据实际情况来调整这些参数,从而优化消息消费的效率。

7.介绍一下RocketMQ的工作流程?

1、启动NameServer,他会等待Broker、Producer以及Consumer的链接。

2、启动Broker,会和NameServer建立连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系。

3、启动Producer,启动时先随机和NameServer集群中的一台建立长连接,并从NameServer中获取当前发送的Topic所在的所有Broker的地址;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送。

4、Broker接收Producer发送的消息,当配置为同步复制时,master需要先将消息复制到slave节点,然后再返回“写成功状态”响应给生产者;当配置为同步刷盘时,则还需要将消息写入磁盘中,再返回“写成功状态”;要是配置的是异步刷盘和异步复制,则消息只要发送到master节点,就直接返回“写成功”状态。

5、启动Consumer,过程和Producer类似,先随机和一台NameServer建立连接,获取订阅信息,然后在和需要订阅的Broker建立连接,获取消息。

8.RocketMQ怎么实现消息分发的?

RocketMQ 支持两种消息模式:广播消费( Broadcasting )和集群消费( Clustering )。

广播消费:当使用广播消费模式时,RocketMQ 会将每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

广播模式下,RocketMQ 保证消息至少被客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。并且,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过。

集群消费:当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。

集群模式下,每一条消息都只会被分发到一台机器上处理。但是不保证每一次失败重投的消息路由到同一台机器上。一般来说,用集群消费的更多一些。

通过设置MessageModel可以调整消费方式:

// MessageModel设置为CLUSTERING(不设置的情况下,默认为集群订阅方式)。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// MessageModel设置为BROADCASTING。
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

9.RocketMQ的消息是推还是拉?

RocketMQ的消费模式分为两种,一种是推(Push),一种是拉(Pull),开发者可以自行选择。

推是服务端主动推送消息给客户端,拉是客户端需要主动到服务端轮询获取数据。

推的优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

拉优点是客户端可以依据自己的消费能力进行消费,但是频繁拉取会给服务端造成压力,并且可能会导致消息消费不及时。

需要注意的是,RocketMQ的push模式其实底层的实现还是基于pull实现的,只不过他把pull给封装的比较好,让你以为是在push。

public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer 
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer

10.用了RocketMQ一定能实现削峰的效果吗?

在高并发场景下,系统可能会面临短时间内大量请求涌入,导致系统负载急剧上升,甚至超过系统的处理能力,造成服务瘫痪。使用MQ来缓冲这些请求,可以将大量并发请求暂存到队列中,然后按照系统能够处理的速度逐渐消费这些请求。这样可以避免系统因为瞬间的高并发而崩溃,实现系统流量的平滑处理。这就是削峰填谷。

但是,并不是说用了MQ了就一定实现了削峰填谷了。这要看MQ的消费方式。

MQ的消息有推和拉两种模式,如果是那种推的模式,会在上游发送消息之后,就立即推给消费者,那这种情况对于接收消息的人来说,还是承担了很大的请求量。而如果消费者的消费能力不行,那么消息也会堆积到消费者端。

并且一旦消息量太大,也会导致消费失败,那么消息就会重投,这就会导致更多的请求量过来。

所以,如果想要起到很好地削峰填谷的作用,需要使用拉模式来消费消息,这样自己就可以控制速度了。消息就可以在MQ的队列中堆积,而不是在客户端堆积,通过队列的缓冲来起到削峰填谷的作用!

相关推荐

  1. <span style='color:red;'>RocketMQ</span>

    RocketMQ

    2024-07-15 06:24:02      44 阅读
  2. <span style='color:red;'>RocketMQ</span>

    RocketMQ

    2024-07-15 06:24:02      24 阅读
  3. <span style='color:red;'>RocketMQ</span>

    RocketMQ

    2024-07-15 06:24:02      18 阅读
  4. RocketMq总结

    2024-07-15 06:24:02       36 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-15 06:24:02       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-15 06:24:02       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-15 06:24:02       58 阅读
  4. Python语言-面向对象

    2024-07-15 06:24:02       69 阅读

热门阅读

  1. SpringBoot实战:定时任务

    2024-07-15 06:24:02       20 阅读
  2. .NET 开源库技术栈汇总

    2024-07-15 06:24:02       19 阅读
  3. UDP 报文结构与注意事项全解析

    2024-07-15 06:24:02       27 阅读
  4. 深入理解Symfony框架中的数据验证机制

    2024-07-15 06:24:02       20 阅读
  5. OpenCV——实现视频图像的来回摆动的效果

    2024-07-15 06:24:02       16 阅读
  6. 【c++】VSstudio win32 应用开发

    2024-07-15 06:24:02       23 阅读
  7. 深入理解Scikit-learn:决策树与随机森林算法详解

    2024-07-15 06:24:02       22 阅读