【RocketMQ系列九】SpringCloudStream整合RocketMQ

您好,我是码农飞哥(wei158556),感谢您阅读本文,欢迎一键三连哦
💪🏻 1. Python基础专栏,基础知识一网打尽,9.9元买不了吃亏,买不了上当。 Python从入门到精通
😁 2. 毕业设计专栏,毕业季咱们不慌忙,几百款毕业设计等你选。
❤️ 3. Python爬虫专栏,系统性的学习爬虫的知识点。9.9元买不了吃亏,买不了上当 。python爬虫入门进阶
❤️ 4. Ceph实战,从原理到实战应有尽有。 Ceph实战
❤️ 5. Java高并发编程入门,打卡学习Java高并发。 Java高并发编程入门

1. Spring Cloud Stream是什么?

Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。

官网:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/

官网概述:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-notable-deprecations

该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub 语义、消费者组和有状态分区的支持。

简单的理解就是Spring Cloud Stream 通过在上层定义统一消息的编程模型,屏蔽了底层消息中间件的差异,降低了使用成本。下图展示了Spring Cloud Stream的处理架构

带粘合剂的 SCSt

image-20231006092654679

Spring Cloud Stream的核心构建块(编程模型)是:

  1. Destination Binders: 负责提供与外部消息传递系统集成的组件。Binders 可以生成Bindings。
  2. **Bindings: ** 外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。即用来绑定消息生产者和消息消费者。它有两种类型,INPUT和OUTPUT,INPUT对应消费者,OUTPUT对应生产者。
  3. Message: 生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范的数据结构。

2. Spring Cloud Stream的执行流程

SpringCloudStream

3. 注解代码实现

首先创建一个生产者项目 my-springcloud-rocketmq-producer 和一个消费者项目 my-springcloud-rocketmq-consumer。

本demo使用的 版本号是 cloud 2021.0.5.0 +springboot 2.6.13

在 my-springcloud-rocketmq-producer 上的操作

3.1. 引入依赖

  <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

3.2 . 属性文件配置

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: my-springcloud-stream-topic
      rocketmq:
        binder:
          name-server: 172.31.184.89:9876

3.3. 定义生产者

在MySpringcloudRocketmqProducerApplication 添加 @EnableBinding(Source.class) 注解。然后创建生产者。

@Component
public class MyProducer {
   
	@Resource
	private Source source;
	
	public void sendMessage(String msg) {
   
		// 封装消息头
		Map<String, Object> headers = new HashMap<>();
		headers.put(MessageConst.PROPERTY_TAGS, "tagA");
		// 创建消息对象
		Message<String> message = MessageBuilder.createMessage(msg, new MessageHeaders(headers));
		// 发送消息
		source.output().send(message);
	}
}

在 my-springcloud-rocketmq-consumer上的操作

3.4. 引入依赖同生产者

3.5. 配置文件修改

spring.cloud.stream.rocketmq.binder.name-server=172.31.184.89:9876
spring.cloud.stream.bindings.input.destination=my-springcloud-stream-topic
spring.cloud.stream.bindings.input.group=my-springcloud-stream-consume-group

3.6. 定义消费者

在MySpringcloudRocketmqConsumerApplication 类上添加 @EnableBinding(Sink.class)注解。

@Component
public class MyConsumer {
   
   @StreamListener(Sink.INPUT)
   public void processMessage(String message) {
   
      System.out.println("收到的消息=" + message);
   }
}

相关推荐

  1. RocketMQ笔记()SpringBoot整合RocketMQ消息过滤

    2023-12-29 02:14:03       17 阅读
  2. SpringBoot整合RocketMQ

    2023-12-29 02:14:03       31 阅读
  3. Spring Boot整合RocketMQ

    2023-12-29 02:14:03       39 阅读
  4. <span style='color:red;'>RocketMQ</span>

    RocketMQ

    2023-12-29 02:14:03      34 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-29 02:14:03       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-29 02:14:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-29 02:14:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-29 02:14:03       20 阅读

热门阅读

  1. WPF DataGrid

    2023-12-29 02:14:03       35 阅读
  2. 文件管理练习

    2023-12-29 02:14:03       38 阅读
  3. 基于Antlr4实现自定义语法规则

    2023-12-29 02:14:03       39 阅读
  4. 如何利用 NAS 搭建网站服务器?

    2023-12-29 02:14:03       31 阅读
  5. The connection to the server localhost:8080

    2023-12-29 02:14:03       35 阅读
  6. Vue3.0-watch&&watchEffect函数

    2023-12-29 02:14:03       34 阅读
  7. vue的插槽解析

    2023-12-29 02:14:03       37 阅读