【Spring连载】使用Spring Data访问Redis(九)----Redis流 Streams


Redis Streams以抽象的方法对日志数据结构进行建模。通常,日志是仅追加(append-only)的数据结构,并且从一开始就在随机位置或通过流式传输新消息来消费。
Redis参考文档中了解有关Redis Streams的更多信息。
Redis Streams大致可以分为两个功能领域:

  • 追加记录
  • 消费记录

尽管这种模式与Pub/Sub有相似之处,但主要区别在于消息的持久性以及消息的消费方式。
Pub/Sub依赖于瞬态消息的广播(即,如果你不听,你就会错过消息),而Redis Stream使用了一种持久的、仅追加的数据类型,它会保留消息,直到流被修剪。消费方面的另一个区别是Pub/Sub注册服务器端订阅。Redis将到达的消息推送到客户端,而Redis Streams需要活动轮询(active polling)。
org.springframework.data.redis.connection 和 org.springframework.data.redis.stream包为Redis Streams提供了核心功能。

一、追加Appending

要发送记录,你可以像使用其他操作一样,使用低级(low-level)RedisConnection或高级StreamOperations。这两个实体都提供add (xAdd)方法,该方法接受记录和目标流作为参数。RedisConnection需要原始数据(字节数组),而StreamOperations允许任意对象作为记录传入,如以下示例所示:

// append message through connection
RedisConnection con =byte[] stream =ByteRecord record = StreamRecords.rawBytes().withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template =StringRecord record = StreamRecords.string().withStreamKey("my-stream");
template.opsForStream().add(record);

流记录携带一个Map,键值元组,作为它们的payload。将记录附加到流中会返回可作为进一步引用的RecordId。

二、消费Consuming

在消费端,你可以消费一个或多个流。Redis Streams提供读取命令,允许从已知流的任意位置(随机访问)消费流和从流的结束消费新的流记录。
在底层,RedisConnection提供了xRead和xReadGroup方法,它们分别映射Redis命令以在消费者组中进行各自读取。请注意,可以将多个流用作参数。
Redis中的订阅命令可能会被阻塞。也就是说,在连接(connection)上调用xRead会导致当前线程在开始等待消息时阻塞。只有当读取命令超时或收到消息时,线程才会被释放。
要消费流消息,可以在应用程序代码中轮询(poll)消息,也可以通过消息监听器容器使用两个异步接收中的一个(2.2章节),命令式或反应式。每次新记录到达时,容器都会通知应用程序代码。

2.1 同步接收Synchronous reception

虽然流消费通常与异步处理相关联,但也可以同步消费消息。重载的StreamOperations.read(…)方法提供了这个功能。在同步接收期间,调用线程可能会阻塞,直到消息可用为止。属性StreamReadOptions.block指定接收者在放弃等待消息之前应该等待多长时间。

// Read message through RedisTemplate
RedisTemplate template =List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
				StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
				StreamReadOptions.empty().count(2),
				StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

2.2 通过消息监听器容器进行异步接收Asynchronous reception through Message Listener Containers

由于其阻塞性,低级别轮询(low-level polling)没有吸引力,因为它需要为每个消费者进行连接和线程管理。为了缓解这个问题,SpringData提供了消息侦听器,它完成了所有繁重的工作。如果您熟悉EJB和JMS,您应该会发现这些概念很熟悉,因为它的设计尽可能接近Spring Framework及其消息驱动的POJO(MDP)中的支持。

Spring Data提供了两种针对所用编程模型量身定制的实现:

StreamMessageListenerContainer充当命令式编程模型的消息侦听器容器。它用于使用Redis流中的记录,并驱动注入其中的StreamListener实例。

StreamReceiver提供了消息侦听器的反应式变体。它用于将Redis流中的消息作为潜在的无限流使用,并通过Flux发出流消息。

StreamMessageListenerContainer和StreamReceiver负责消息接收和调度到侦听器中进行处理的所有线程。消息侦听器容器/接收器是MDP和消息传递提供者之间的中介,负责注册接收消息、资源获取和释放、异常转换等。这使您作为应用程序开发人员能够编写与接收消息(并对其作出反应)相关联的(可能复杂的)业务逻辑,并将Redis基础设施的样板问题委托给框架。

这两个容器都允许更改运行时配置,以便在应用程序运行时添加或删除订阅,而无需重新启动。此外,容器使用延迟订阅方法,仅在需要时使用RedisConnection。如果所有侦听器都被取消订阅,它会自动执行清理,线程就会被释放。

2.2.1 命令式Imperative StreamMessageListenerContainer

2.2.2 反应式Reactive StreamReceiver

2.3 确认策略Acknowledge strategies

2.4 读取偏移量策略ReadOffset strategies

三、序列化Serialization

四、对象映射Object Mapping

4.1 简单值Simple Values

4.2 复杂值Complex Values

最近更新

  1. TCP协议是安全的吗?

    2024-02-03 23:44:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-02-03 23:44:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-02-03 23:44:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-02-03 23:44:02       18 阅读

热门阅读

  1. 智慧机场物联网应用及网络安全挑战(下)

    2024-02-03 23:44:02       30 阅读
  2. 前端项目接口请求封装

    2024-02-03 23:44:02       28 阅读
  3. 【校门外的树(洛谷 P1047)】

    2024-02-03 23:44:02       28 阅读
  4. ChatGPT炸裂了

    2024-02-03 23:44:02       28 阅读
  5. kubenetes使用ConfigMap挂载ssh公钥实现pod免密

    2024-02-03 23:44:02       25 阅读
  6. 机器学习复习(8)——基本概念

    2024-02-03 23:44:02       21 阅读
  7. 力扣(leetcode)第268题丢失的数字(Python)

    2024-02-03 23:44:02       31 阅读
  8. docker- php7.4

    2024-02-03 23:44:02       29 阅读
  9. 服务器常遇的响应状态码

    2024-02-03 23:44:02       28 阅读