如何保证消息的可靠传输?如果消息丢了怎么办?如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?

如何保证消息的可靠传输?如果消息丢了怎么办?
数据的丢失问题,可能出现在生产者、 MQ 、消费者中
生产者丢失:生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题
啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启
RabbitMQ 事务 channel.txSelect ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么
生产者会收到异常报错,此时就可以回滚事务 channel.txRollback ,然后重试发送消息;如果收到
了消息,那么可以提交事务 channel.txCommit 。吞吐量会下来,因为太耗性能。所以一般来说,如
果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm
式之后,你每次写的消息都会分配一个唯一的 id ,然后如果写入了 RabbitMQ 中, RabbitMQ 会给 你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你一个
nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每
个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和
cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是
confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收
了之后会异步回调你一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都
是用 confirm 机制的。
MQ 中丢失:就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写
入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般
数据不会丢。设置持久化有两个步骤:创建 queue 的时候将其设置为持久化,这样就可以保证
RabbitMQ 持久化 queue 的元数据,但是不会持久化 queue 里的数据。第二个是发送消息的时候
将消息的 deliveryMode 设置为 2 ,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久
化到磁盘上去。必须要同时设置这两个持久化才行, RabbitMQ 哪怕是挂了,再次重启,也会从磁
盘上重启恢复 queue ,恢复这个 queue 里的数据。持久化可以跟生产者那边的 confirm 机制配合起
来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,
RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的。注意,哪怕是你给
RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得
及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。
消费端丢失:你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,
RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来
说,就是你关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确
保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack ?那
RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处
理,消息是不会丢的。
如何解决消息队列的延时以及过期失效问题?消息队列满了以 后该怎么处理?有几百万消息持续积压几小时,说说怎么解决?
消息积压处理办法:临时紧急扩容:
先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。 新建一个
topic partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。 然后写一个临时的分发
数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀
轮询写入临时建立好的 10 倍数量的 queue 。 接着临时征用 10 倍的机器来部署 consumer ,每一
consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer
源扩大 10 倍,以正常的 10 倍速度来消费数据。 等快速消费完积压数据之后,得恢复原先部署的
架构,重新用原先的 consumer 机器来消费消息。 MQ 中消息失效:假设你用的是 RabbitMQ
RabbtiMQ 是可以设置过期时间的,也就是 TTL 。如果消息在 queue 中积压超过一定的时间就会被
RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在 mq
里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有
类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比
如大家一起喝咖啡熬夜到晚上 12 点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那
批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回 来。也只能是这样了。假设 1 万个订单积压在 mq 里面,没有处理,其中 1000 个订单都丢了,你
只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。
mq 消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满
了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数
据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上
再补数据吧。

最近更新

  1. TCP协议是安全的吗?

    2024-02-23 10:24:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-23 10:24:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-23 10:24:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-23 10:24:01       20 阅读

热门阅读

  1. 穿越虚拟与现实的边界:XR技术重塑短剧叙事

    2024-02-23 10:24:01       27 阅读
  2. redis:数据倾斜是什么?怎么应对热点数据?

    2024-02-23 10:24:01       27 阅读
  3. vue中setup语法糖的优点

    2024-02-23 10:24:01       25 阅读
  4. 记忆化搜索

    2024-02-23 10:24:01       25 阅读
  5. 如何使用 boost.gil 解析 tiff 图片并返回像素

    2024-02-23 10:24:01       28 阅读
  6. 智能门锁开发之需要具备的安全保护技术

    2024-02-23 10:24:01       29 阅读
  7. GC垃圾回收算法

    2024-02-23 10:24:01       31 阅读
  8. 参数替换之${parameter+default}和${parameter:+default}

    2024-02-23 10:24:01       18 阅读
  9. Visual Studio快捷键记录

    2024-02-23 10:24:01       27 阅读