队列的三个需求
消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性
需求一:消息保序。消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了
需求二:重复消息处理(幂等性)。消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息
需求三:消息可靠性保证。当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了
Redis如何实现队列
基于 List 的消息队列解决方案
List 本身就是按先进先出的顺序对数据进行存取的。通过LPUSH/RPOP,进行生产消费。同时为了保证CPU不会一直消耗在RPOP上,可以改用BRPOP
BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据
以上解决了消息保序的问题
为了解决消费者程序本身能对重复消息进行判断,可以给每个消息队列的每个消息提供全局唯一ID。消费者呈现要把处理过的ID记录下来,同时当有消息收到时,需要对这个消息的ID进行检查。这样就可以保证幂等性
最后就是消息的可靠性。通过BRPOPLPUSH 命令,把进行消息消费时,会启动消息备份,这样就算消息者程序消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了
基于 Streams 的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID
- XREAD:用于读取消息,可以按 ID 读取数据
- XREADGROUP:按消费组形式读取消息
- XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成
- XADD 命令可以往消息队列中插入新消息,消息的格式是键 - 值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID
XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取
- 消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作
- 当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知 Streams“消息已经处理完成”
总结
List | Streams | |
---|---|---|
消息保序 | LPUSH/RPOP | XADD/XREAD |
阻塞读取 | BRPOP | XREAD block |
重复消息读取 | 生产者自行实现唯一ID | Streams自动生成 |
消息可靠性 | BRPOPLPUSH | PENDDING List 自动留存消息,XACK确认消息 |
适用场景 | 数据量小 | 数据量大(小于qps 10w/s) |