RabbitMQ入门到实战——高级篇

消息的可靠性

 生产者的可靠性(确保消息一定到达MQ)

生产者重连

这⾥除了enabled是false外,其他 initial-interval 等默认都是⼀样的值。

生产者确认 

生产者确认代码实现

application中增加配置:(publisher-returns ⼀般不⽤配置)

 2. 在RabbitTemplate中设置回调函数 ReturnCallback ,在Rabbit发送信息失败时触发(如果开了 publisher-returns)

3. 在发送信息的RabbitTemplate 中指定 ConfirmCallback

总结:

设置⽇志级别

面试题:如何保证⽣产者发送消息的可靠性

⾸先,可以在Rabbit MQ中配置适量次数的⽣产者重试,重试时间等重连机制,避免⽹络波动影响 然后,如果是其他原因导致失败,Rabbit MQ也提供了接受信息后的回执,可以设置回调函数来保 证⽣产者接受信息。基本保证可靠性。 但是,由于MQ回调需要消耗额外的资源,如果不是对消息可靠性有较⾼要求,最好不要开启。(这里在P21测试中印证了,发100w条信息,半天才发40w条。关闭后显著提升速度) 

MQ的可靠性

问题:MQ出现故障,如宕机重启,消息会丢失。内存有限,MQ会阻塞

数据持久化durable

控制台页面:

Java代码实现(默认持久化)

paged out:当内存被占满,部分消息转移到磁盘,MQ阻塞不能访问的状态

非持久化:(优先写⼊内存)

 

持久化(直接写入磁盘 - 注意:这种模式下也会预先写⼀些信息到内存中保障安全性) 

Lazy queue

创建: ①⽤Bean创建

②基于注解

③控制台

总结(RabbitMQ如何保证消息可靠性)

  • 交换机、队列、发送的信息持久化
  • 使⽤LazyQueue(⾃动将所有消息持久化)

消费者可靠性

消费者确认

失败重试机制 

 重试策略

消费者确认问题:如果是业务异常,自动返回nack,程序会不断重试,不断抛异常。浪费资源
解决:设置重试策略,设定重试参数,重试多几次 

效果:重试三次后依然失败,将消息reject丢掉。

失败消息处理策略

问题:重试次数耗尽后,直接丢掉,处理草率

使⽤MessageRecoverer接⼝处理,以下是三种实现方式:

代码实现:

1.配置开启重试机制

2.定义接收失败消息的交换机、队列及其绑定关系

3.定义RepublishMessageRecoverer实现类

 总结

投递给异常交换机,需要实现 RepublishMessageRecoverer

业务幂等性

使⽤了MQ,不可避免的会有消息重复现象。就会导致消费者重复消费。要使非幂等业务转变为幂等。

 ⽅案1:给消息设置唯⼀id。如:类似token保证表单不重复提交。

 

缺点: 1. 业务增多。保存到数据库,判断id是否重复
2. 影响性能。涉及数据库操作(写、判断) 

 还可以基于乐观锁优化,避免线程并步运⾏:

面试题:微服务中的支付服务和交易服务如何保持订单状态⼀致? 

扩展:
为什么选择异步?同步异步(同步调用的问题),聊聊优化的时间。
生产者可靠性等等具体实现(也可突出自己用雪花,而不是默认的uuid)
幂等性判断如何实现?常见幂等方案有哪些
基于之前交易服务⼀直处于被动,再用个定时任务化为主动,定期查询支付状态 

消息过期机制

可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了。这种机制允许系统自动清理和丢弃那些长时间未被消费的消息,以避免消息队列中积累过多的过期消息,从而保持系统的效率和可靠性。

例如:消费者(库存系统)挂了,一个订单 15 分钟还没被库存系统处理,这个订单其实已经失效了,哪怕库存系统再恢复,其实也不用扣减库存。
适用场景:清理过期数据、模拟延迟队列的实现(不开会员就慢速)、专门让某个程序处理过期请求。

延迟消息

例如:会员/非会员:会员用户,我们希望立即处理其请求;而对于普通用户,我们希望让其排队等待一段时间(比如5分钟)后再进行处理。这时,可以利用延迟队列实现。消费者可以监听延迟队列,普通用户的请求由一个程序处理监听该延迟队列,而会员用户的请求则由另一个程序监听一个高优先级的队列。一旦你掌握了消息队列的知识,就可以实现这样的程序逻辑。延迟队列的实现可以借助消息过期机制。具体的实现思路是创建两个队列,第一个队列中的消息设置了过期时间,比如5分钟,然后将过期的消息转移到第二个队列中。接着,让相应的用户程序监听第二个队列,这样第二个队列就成为了延迟队列。 

死信

通过将设置消息过期时间,并令其过期达到延迟发送。

问题:死信交换机本身不是做延迟消息,需要等时间过期才能实现延迟消息

 

取消超时订单的基本思路

问题:每创建⼀个订单,队列都有⼀个30min延迟消息,MQ压⼒过大
解决:将30min切开,进行分段检查

 代码实现:

在订单业务的最后拓展代码:
1. 定义延迟消息体与延迟常量
        a. 延迟消息体

b. 延迟常量(交换机 - topic,队列,Key)

2. 发送信息

优化:每次都要new⼀个类,并配置。可以单独提取出来,new⼀个类

监听延迟信息

定义监听器:构造注⼊orderService,注解中开启延迟信息,传参为延迟消息体

5 与 6 应该为分布式事务,需要写在 OrderService 中,改动如下:

 

参考:MQ高级-15.延迟消息-监听延迟消息_哔哩哔哩_bilibili 

相关推荐

最近更新

  1. TCP协议是安全的吗?

    2024-01-12 18:04:08       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-12 18:04:08       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-12 18:04:08       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-12 18:04:08       20 阅读

热门阅读

  1. #Uniapp:uni-app加载全局组件的方法easycom

    2024-01-12 18:04:08       41 阅读
  2. springboot 集成 @Cacheable简单示例

    2024-01-12 18:04:08       39 阅读
  3. 平方朋友对C++

    2024-01-12 18:04:08       30 阅读
  4. Jenkins相关问题及答案(2024)

    2024-01-12 18:04:08       28 阅读
  5. Docker快速安装MongoDB数据库

    2024-01-12 18:04:08       36 阅读
  6. 停止css @keyframes动画

    2024-01-12 18:04:08       36 阅读
  7. ubuntu 22.04源码装ros1 noetic

    2024-01-12 18:04:08       42 阅读
  8. 2024年Ubuntu18.04执行do-release-upgrade报错的解决方案

    2024-01-12 18:04:08       34 阅读