Redis Stream消息队列之基本语法与使用方式

前言

本文的主角是Redis Stream,它是Redis5.0版本新增加的数据结构,主要用于消息队列,提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失,功能颇为强大。其实,Redis本身是有一个Redis发布订阅来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃。所以,Redis的发布订阅像是Vue2的Bus或Vue3的Mitt,属于后端版的事件总线。此外,Redis本身的List和Sorted Set也可以实现,但是也有各自的缺点,如List没有消息多播功能,没有ACK机制,无法重复消费等,Sorted Set不支持阻塞式获取消息、不允许重复消费、不支持分组。相比之下,Redis Stream明显胜出。

一、消息队列相关命令

1.XADD - 添加消息到末尾

(1)语法格式:

XADD key ID field value

(2)参数:

  • key:队列名称,如果不存在就创建
  • ID:消息ID,我们使用*表示由redis生成,可以自定义,但是要自己保证递增性。
  • field value:记录。

(3)示例:

127.0.0.1:6379[15]> XADD MQ * name Vegeta sex male age 18
"1703235642574-0"
127.0.0.1:6379[15]> XADD MQ * name Bulma sex female age 18
"1703235648454-0"
127.0.0.1:6379[15]>

2.XLEN - 获取流包含的元素数量,即消息长度

(1)语法格式:

XLEN key

(2)参数:

  • key:队列名称。

(3)示例:

127.0.0.1:6379[15]> XLEN MQ
(integer) 2

3.XRANGE - 获取消息列表,会自动过滤已经删除的消息

(1)语法格式:

XRANGE key start end [COUNT count]

(2)参数:

  • key:队列名
  • start:开始值,- 表示最小值
  • end:结束值,+ 表示最大值
  • count:数量

(3)示例:

127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703235642574-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"
2) 1) "1703235648454-0"
   2) 1) "name"
      2) "Bulma"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"

4.XREVRANGE - 反向获取消息列表,ID从大到小

(1)语法格式:

XREVRANGE key end start [COUNT count]

(2)参数:

  • key:队列名
  • end:结束值,+ 表示最大值
  • start:开始值,- 表示最小值
  • count:数量

(3)示例:

127.0.0.1:6379[15]> XREVRANGE MQ + - COUNT 2
1) 1) "1703235648454-0"
   2) 1) "name"
      2) "Bulma"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"
2) 1) "1703235642574-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"

5.XDEL - 删除消息

(1)语法格式:

XDEL key ID [ID ...]

(2)参数:

  • key:队列名称
  • ID:消息 ID

(3)示例:

127.0.0.1:6379[15]> XADD MQ * name Kakarotto sex male age 18
"1703238230846-0"
127.0.0.1:6379[15]> XADD MQ * name Android18 sex female age 18
"1703238306386-0"
127.0.0.1:6379[15]> XDEL MQ 1703238230846-0
(integer) 1
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703235642574-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"
2) 1) "1703235648454-0"
   2) 1) "name"
      2) "Bulma"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"
3) 1) "1703238306386-0"
   2) 1) "name"
      2) "Android18"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"

6.XTRIM - 对流进行修剪,限制长度

(1)语法格式:

XTRIM key MAXLEN [~] count

(2)参数:

  • key:队列名称
  • MAXLEN:Stream中最大消息数量(即保留的消息数量)
  • [~]:若使用了 ~ 符号,则表示限制的是消息的大小而非数量。
  • count:需要删除的消息数量

(3)示例:

# 限制MQ最多1条消息,其余删除
127.0.0.1:6379[15]> XTRIM MQ MAXLEN 1
(integer) 2
127.0.0.1:6379[15]> XRANGE MQ - +
1) 1) "1703238306386-0"
   2) 1) "name"
      2) "Android18"
      3) "sex"
      4) "female"
      5) "age"
      6) "18"

7.XREAD - 以阻塞或非阻塞方式读取一个或多个队列的消息

(1)语法格式:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]

(2)参数:

  • count:数量,默认值为10
  • milliseconds:可选,阻塞毫秒数,没有设置就是非阻塞模式
  • key:队列名
  • id:指定读取的起始位置,可以是特定的消息ID,也可以是"$"表示最新的消息,或者是"0"表示从最早的消息开始读取。

(3)示例:

127.0.0.1:6379[15]> XADD MQ * name Vegeta sex male age 18
"1703297838206-0"
127.0.0.1:6379[15]> XADD MQ * name Bulma sex female age 18
"1703297844215-0"
# 读取MQ最早的默认条消息
127.0.0.1:6379[15]> XREAD STREAMS MQ 0
1) 1) "MQ"
   2) 1) 1) "1703238306386-0"
         2) 1) "name"
            2) "Android18"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"
      2) 1) "1703297838206-0"
         2) 1) "name"
            2) "Vegeta"
            3) "sex"
            4) "male"
            5) "age"
            6) "18"
      3) 1) "1703297844215-0"
         2) 1) "name"
            2) "Bulma"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"

# 读取MQ第二条消息,需指定第二条消息的ID
127.0.0.1:6379[15]> XREAD STREAMS MQ 1703297838206-0
1) 1) "MQ"
   2) 1) 1) "1703297844215-0"
         2) 1) "name"
            2) "Bulma"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"

# 读取MQ最新的一条消息,需开启阻塞,阻塞时长为10s。如果10s内未读取到消息则退出阻塞。
【客户端A】127.0.0.1:6379[15]> XREAD BLOCK 100000 STREAMS MQ $
1) 1) "MQ"
   2) 1) 1) "1703300894359-0"
         2) 1) "name"
            2) "Ranchi"
            3) "sex"
            4) "male"
            5) "age"
            6) "18"
(2.04s)
【客户端A】127.0.0.1:6379[15]>

# 另开一个终端向MQ队列中写入一条消息,阻塞读的终端就能接收到消息。
root@帅龍之龍:~# redis-cli -h 127.0.0.1 -p 6379 -a 123456
Warning: Using a password with '-a' or '-u' option on the command line interface may not be safe.
【客户端B】127.0.0.1:6379> select 15
OK
【客户端B】127.0.0.1:6379[15]> XADD MQ * name Ranchi sex male age 18
"1703300894359-0"
【客户端B】127.0.0.1:6379[15]>

(4)注意:XREAD存在消息漏读的风险,当正在处理一条消息时,又有多条消息到达,此时读取的是最新那条!

二、消费者组相关命令

消费者组将多个消费者划分到一个组中,监听同一个队列,具有消息分流、消息标示、消息确认的特点。
·消息分流:分流给组内的不同消费者,不会重复消费,反而加快消费
·消息标示:消费者组会记录最后一个被处理的消息,确保每一个消息都会被消费
·消息确认:消费者获取消息后,消息处于pending状态,然后将其存入pending-list列表,当处理完成后,通过XACK确认消息,将消息标记为已处理,然后从pending-list被移除

1.XGROUP CREATE - 创建消费者组

(1)语法格式:

XGROUP CREATE key group id|$

(2)参数:

  • key:队列名称,如果不存在就创建起始ID
  • group:消费者组名
  • id:起始ID,$代表队列中最后一条消息,0代表队列中第一条消息

(3)示例:

# 创建一个从队列第一条消息开始消费的消费者组
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupA 0
OK
# 创建一个从队列最后一条消息开始消费的消费者组
127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupB $
OK

2.XGROUP CREATECONSUMER - 在指定的消费者组中添加消费者

(1)语法格式:

XGROUP CREATECONSUMER key group consumer

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名
  • consumer:消费者名

(3)示例:

127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer1
(integer) 1
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer2
(integer) 1
127.0.0.1:6379[15]> XGROUP CREATECONSUMER MQ mqGroupA consumer3
(integer) 1

3.XINFO STREAM - 打印流信息

(1)语法格式:

XINFO STREAM key

(2)参数:

  • key:队列名称

(3)示例:

127.0.0.1:6379[15]> XINFO STREAM MQ
 1) "length"
 2) (integer) 5
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1703300894359-0"
 9) "max-deleted-entry-id"
10) "1703238230846-0"
11) "entries-added"
12) (integer) 8
13) "recorded-first-entry-id"
14) "1703238306386-0"
15) "groups"
16) (integer) 2
17) "first-entry"
18) 1) "1703238306386-0"
    2) 1) "name"
       2) "Android18"
       3) "sex"
       4) "female"
       5) "age"
       6) "18"
19) "last-entry"
20) 1) "1703300894359-0"
    2) 1) "name"
       2) "Ranchi1"
       3) "sex"
       4) "male"
       5) "age"
       6) "18"

4.XINFO GROUPS - 打印消费者组的信息

(1)语法格式:

XINFO GROUPS key

(2)参数:

  • key:队列名称

(3)示例:

127.0.0.1:6379[15]> XINFO GROUPS MQ
1)  1) "name"
    2) "mqGroupA"
    3) "consumers"
    4) (integer) 3
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "0-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 5
2)  1) "name"
    2) "mqGroupB"
    3) "consumers"
    4) (integer) 0
    5) "pending"
    6) (integer) 0
    7) "last-delivered-id"
    8) "1703300894359-0"
    9) "entries-read"
   10) (nil)
   11) "lag"
   12) (integer) 0

5.XGROUP DELCONSUMER - 在指定的消费者组中删除消费者

(1)语法格式:

XGROUP DELCONSUMER key group consumer

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名
  • consumer:消费者名

(3)示例:

127.0.0.1:6379[15]> XGROUP DELCONSUMER MQ mqGroupA consumer3
(integer) 0

6.XGROUP DESTROY - 删除指定的消费者组

(1)语法格式:

XGROUP DESTROY key group

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名

(3)示例:

127.0.0.1:6379[15]> XGROUP DESTROY MQ mqGroupB
(integer) 1

7.XGROUP SETID - 为消费者组设置新的最后递送消息ID

(1)语法格式:

XGROUP SETID key group id|$

(2)参数:

  • key:队列名称,如果不存在就创建
  • group:消费者组名
  • id:起始ID,$代表队列中最后一条消息,0代表队列中第一条消息

(3)示例:

127.0.0.1:6379[15]> XGROUP CREATE MQ mqGroupB $
OK
127.0.0.1:6379[15]> XGROUP SETID MQ mqGroupB $
OK

8.XREADGROUP GROUP - 读取消费者组中的消息

(1)语法格式:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

(2)参数:

  • group:消费者组名
  • consumer:消费者名
  • count:读取数量
  • milliseconds:阻塞毫秒数
  • key:队列名
  • id:起始ID,>代表从下一条未消费的消息开始,0代表从pending-list中第一条消息开始,其它根据指定id从pending-list中获取已消费但未确认的消息开始

(3)示例:

# 指定消费者组的消费者去读取下一条未消费的消息
127.0.0.1:6379[15]> XREADGROUP GROUP mqGroupA consumer1 COUNT 1 STREAMS MQ >
1) 1) "MQ"
   2) 1) 1) "1703238306386-0"
         2) 1) "name"
            2) "Android18"
            3) "sex"
            4) "female"
            5) "age"
            6) "18"
127.0.0.1:6379[15]>
127.0.0.1:6379[15]> XREADGROUP GROUP mqGroupA consumer2 COUNT 1 STREAMS MQ >
1) 1) "MQ"
   2) 1) 1) "1703297838206-0"
         2) 1) "name"
            2) "Vegeta"
            3) "sex"
            4) "male"
            5) "age"
            6) "18"
127.0.0.1:6379[15]>

(4)注意:若某个消费者,消费了某条消息,但是并没有处理成功时(如消费者进程宕机),这条消息可能会丢失,因为组内其他消费者不能再次消费到该消息了!

9.XPENDING - 显示待处理消息的相关信息

(1)语法格式:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

(2)参数:

  • key:队列名
  • group:消费者组名
  • start:开始值,-表示最小值
  • end:结束值,+表示最大值
  • count:数量

(3)示例:

127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 2 # 已读取但未处理的消息数
2) "1703238306386-0" # 起始消息ID
3) "1703297838206-0" # 结束消息ID
4) 1) 1) "consumer1"
      2) "1"
   2) 1) "consumer2"
      2) "1"
127.0.0.1:6379[15]> XPENDING MQ mqGroupB
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)

10.XACK - 将消息标记为"已处理"

(1)语法格式:

XACK key group id [id ...]

(2)参数:

  • key:队列名
  • group:消费者组名
  • id:消息ID

(3)示例:

127.0.0.1:6379[15]> XACK MQ mqGroupA 1703238306386-0
(integer) 1
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer2"
      2) "1"
127.0.0.1:6379[15]>

11.XCLAIM - 转移消息的归属权

(1)语法格式:

XCLAIM key group consumer min-idle-time id [id ...] [IDLE ms] [TIME unix-time-milliseconds] [RETRYCOUNT count] [FORCE] [JUSTID] [LASTID lastid]

(2)参数:

  • key:队列名
  • group:消费者组名
  • consumer:消费者名
  • min-idle-time:从被读取到未处理的时间
  • id:消息ID

(3)示例:

# 在指定的消费者组中,将cosumer2已读取5分钟(300秒,300000毫秒),但未处理的`1703297838206-0`消息转移给consumer1
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer2"
      2) "1"
127.0.0.1:6379[15]> XCLAIM MQ mqGroupA consumer1 3600000 1703297838206-0
(empty array) # 转移不成功
127.0.0.1:6379[15]> XCLAIM MQ mqGroupA consumer1 300000 1703297838206-0
1) 1) "1703297838206-0"
   2) 1) "name"
      2) "Vegeta"
      3) "sex"
      4) "male"
      5) "age"
      6) "18"
127.0.0.1:6379[15]> XPENDING MQ mqGroupA
1) (integer) 1
2) "1703297838206-0"
3) "1703297838206-0"
4) 1) 1) "consumer1"
      2) "1"
127.0.0.1:6379[15]> XPENDING MQ mqGroupA - + 10
1) 1) "1703297838206-0"
   2) "consumer1"
   3) (integer) 1171095 # IDLE被重置了
   4) (integer) 2 # 读取次数被+1

(4)说明:某个消费者读取了消息但没有处理,这时消费者宕机或重启等就会导致该消息失踪。那么就需要该消息转移给其他的消费者处理,就是消息转移。转移除了要指定ID外,还需要指定min-idle-time最小空闲时间,该值要小于消息从被读取到未处理的时间。

三、消息队列的帮助命令

(1)语法格式:

HELP XXX

(2)参数:

  • XXX:命令关键字

(3)示例:

127.0.0.1:6379[15]> help XADD

  XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
  summary: Appends a new message to a stream. Creates the key if it doesn't exist.
  since: 5.0.0
  group: stream

127.0.0.1:6379[15]> HELP XGROUP

  XGROUP (null)
  summary: A container for consumer groups commands.
  since: 5.0.0
  group: stream

  XGROUP CREATE key group id|$ [MKSTREAM] [ENTRIESREAD entries-read]
  summary: Creates a consumer group.
  since: 5.0.0
  group: stream

  XGROUP CREATECONSUMER key group consumer
  summary: Creates a consumer in a consumer group.
  since: 6.2.0
  group: stream

  XGROUP DELCONSUMER key group consumer
  summary: Deletes a consumer from a consumer group.
  since: 5.0.0
  group: stream

  XGROUP DESTROY key group
  summary: Destroys a consumer group.
  since: 5.0.0
  group: stream

  XGROUP HELP (null)
  summary: Returns helpful text about the different subcommands.
  since: 5.0.0
  group: stream

  XGROUP SETID key group id|$ [ENTRIESREAD entries-read]
  summary: Sets the last-delivered ID of a consumer group.
  since: 5.0.0
  group: stream

127.0.0.1:6379[15]>

相关推荐

  1. Redis Stream消息队列基本语法使用方式

    2023-12-27 07:44:01       40 阅读
  2. Apache Kafka: 强大消息队列系统的介绍使用

    2023-12-27 07:44:01       54 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-27 07:44:01       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-27 07:44:01       106 阅读
  3. 在Django里面运行非项目文件

    2023-12-27 07:44:01       87 阅读
  4. Python语言-面向对象

    2023-12-27 07:44:01       97 阅读

热门阅读

  1. [oracle数据库]dblink的使用

    2023-12-27 07:44:01       63 阅读
  2. 如何将自建的ElasticSearch注册成一个服务

    2023-12-27 07:44:01       59 阅读
  3. codeforces 1676F

    2023-12-27 07:44:01       65 阅读
  4. latexshop 使用bug:xxx has a comma at the end

    2023-12-27 07:44:01       56 阅读
  5. c++ qt QtWidgetsApplication 项目 使用外部ui

    2023-12-27 07:44:01       61 阅读
  6. GO基础进阶篇 (八)、runtime包

    2023-12-27 07:44:01       67 阅读
  7. k8s解决 搭建集群的时候notReady问题

    2023-12-27 07:44:01       62 阅读
  8. 【Go语言入门:Go程序的流程控制语句】

    2023-12-27 07:44:01       50 阅读
  9. client-go使用方法

    2023-12-27 07:44:01       61 阅读
  10. Unity编辑器紫色

    2023-12-27 07:44:01       58 阅读
  11. mysql如何分析sql是否成功使用索引

    2023-12-27 07:44:01       68 阅读