Redis的三种消息队列实现方式

目录

前言

List实现消息队列

PubSub消息队列

Stream消息队列

三种实现方式对比


前言

为什么要使用Redis的消息队列?

成本低,对于RabbitMQ或是Kafka来说,已经是重量级的消息队列。

Redis的三种实现方式:

  • List结构:一种有序的双向链表
  • PubSub发布订阅:基于点对点的消息模型
  • Stream:在Redis5.0之后提供的,比较完善的消息队列模型

List实现消息队列

我们可以利用Redis中List的命令LPUSH与RPOP来实现消息的发送与接收,但是需要注意的是,队列中没有消息时,RPOP会返回null,不会向JVM中阻塞队列一样进行阻塞并等待消息,因此这里应该使用BRPOP来实现阻塞效果。

优点:利用Redis存储,不受限于JVM内存上限。

基于Redis的持久化机制,数据安全性有保证。

可以满足消息有序性。

缺点:无法避免消息丢失。

只支持单消费者。

PubSub消息队列

是Redis2.0版本引入的消息传递模型,顾名思义,消费者可以订阅一个或多个channel,生产者向对应的channel发送消息后,所有订阅者都能收到相关信息。

PubSub消息队列的基本命令

# 订阅一个或多个频道
SUBSCRIBE channel [channel]
# 向一个频道发送消息
PUBLISH channel msg
# 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern [pattern]

优点:采用发布订阅模式,支持多生产者,多消费者。

缺点:不支持数据持久化。

无法避免消息丢失。

消息堆积有上限,超出时数据丢失。

Stream消息队列

Stream是Redis5.0之后引入新的数据类型,支持持久化,因此相比于PubSub更加安全,可以通过Stream实现一个功能完善的消息队列

发送消息的命令:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID filed value[filed value]

命令解释:

  • key:队列名称
  • NOMKSTREAM:如果队列不存在,是否自动创建队列,默认是自动创建
  • MAXLEN|MINID [=|~] threshold [LIMIT count]:设置消息队列的最大消息数量
  • *|ID:消息的唯一ID,*代表由Redis自动生成,格式是"时间戳-递增数字"
  • field value:发送到队列的消息名称为Entry

读取消息的第一种方法

命令如下 

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

命令解释:

  • COUNT count:每次读取消息的最大数量
  • BLOCK milliseconds:当没有消息时进行阻塞,并指定阻塞时长,如果为0则指永久阻塞
  • STREAMS key:要从哪个队列读取消息
  • ID:起始ID,只返回大于该ID的消息,0表示从第一个消息开始读取,$表示从最新消息开始

XREAD命令的特点:

  • 消息可回溯
  • 一个消息可以被多个消费者拿到
  • 可以阻塞读取
  • 有消息漏读的风险

读取消息的第二种方法

将多个消费者划分到一个组(Consumer Group)当中,监听同一个队列。特点如下

  • 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
  • 消息标识:消费者组会维护一个标示记录最后一个被处理的消息哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
  • 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。

创建消费者组命令:

XGROUP CREATE key groupName ID [MKSTREAM]

命令解释:

  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
  • MKSTREAM:队列不存在时自动创建
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname comsumername 

从消费者组中读取消息

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID[ID ...]

命令解释:

  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID。" > "表示从下一个未消费的消息开始。其它则是根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

Group类型的消息队列特点:

  • 消息可回溯
  • 可以多消费者争抢消息
  • 可以阻塞读取
  • 没有消息漏读风险
  • 有消息确认机制,保证消息至少被消费一次

三种实现方式对比

LIST

PubSub

Stream

消息持久化

支持

不支持

支持

阻塞读取

支持

支持

支持

消息堆积处理

受限于内存空间,可以利用多消费者加快处理

受限于消费者缓冲区

受限于队列长度,可以利用消费者组提高消费速度,减少堆积

消息确认机制

不支持

不支持

支持

消息回溯

不支持

不支持

支持

相关推荐

  1. springboot redis 实现消息队列

    2023-12-06 01:36:07       30 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-06 01:36:07       17 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-06 01:36:07       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-06 01:36:07       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-06 01:36:07       18 阅读

热门阅读

  1. 测试用例的设计思路

    2023-12-06 01:36:07       37 阅读
  2. ES6知识

    ES6知识

    2023-12-06 01:36:07      36 阅读
  3. 基于矢量控制的永磁同步电机调速系统

    2023-12-06 01:36:07       35 阅读
  4. BabySpartan:对non-uniform computation的Lasso-based SNARK

    2023-12-06 01:36:07       39 阅读
  5. Vue-安装及安装相应插件

    2023-12-06 01:36:07       40 阅读
  6. 【Element-ui】Element-ui是什么?如何安装

    2023-12-06 01:36:07       36 阅读
  7. 【Docker仓库】部署Docker Registry web-ui管理镜像仓库

    2023-12-06 01:36:07       36 阅读
  8. Uniapp Vue3 基础到实战 教学视频

    2023-12-06 01:36:07       34 阅读
  9. Android MTK平台配置应用可卸载

    2023-12-06 01:36:07       37 阅读
  10. ubuntu1804安装jupyter中的js环境

    2023-12-06 01:36:07       40 阅读
  11. 1076 Forwards on Weibo (链接表层序遍历)

    2023-12-06 01:36:07       37 阅读