SpringCloud Stream笔记整理

  1. 添加kafka stream依赖

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    
  2. application.yml中添加配置

    --- #stream config
    spring:
      cloud:
        stream:
          binders:
            myKafka1:
              type: kafka
              environment:
                spring:
                  kafka:
                    bootstrap-servers: 127.0.0.1:9092
          bindings:
            helloFunc-in-0:
              destination: hello-topic
              group: hello-local-test-10
              binder: myKafka1
              consumer:
                batch-mode: true
            helloFunc-out-0:
              destination: hello-topic
              group: hello-local-test-10
              binder: myKafka1
              consumer:
                batch-mode: true
        # 注意 function 节点与stream 同级,而非子节点
        function:
          definition: helloFunc;
    
  3. 编写消费者:

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class HelloConsumer {
        @Bean
        public Consumer<Message<List<String>>> helloFunc() {
            return message -> {
                log.info("---------------------> ");
                List<String> list = message.getPayload();
                boolean result = this.handle(list);
                if (result) {
                    Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                    if (acknowledgment != null) {
                        acknowledgment.acknowledge();
                    }
                } else {
                    throw new RuntimeException("消费数据出错!");
                }
            };
        }
    
        private boolean handle(List<String> list){
            log.info("list size : {}", list.size());
            if (!CollectionUtils.isEmpty(list)){
                log.info("group first message : {}", list.get(0));
            }
            return true ;
        }
    }
    

相关推荐

  1. SpringCloudStream整合MQ(待完善)

    2024-03-15 11:12:05       36 阅读
  2. CSRF笔记整理

    2024-03-15 11:12:05       30 阅读
  3. SSRF笔记整理

    2024-03-15 11:12:05       28 阅读
  4. SpringCloud Stream笔记整理

    2024-03-15 11:12:05       21 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-15 11:12:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-15 11:12:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-15 11:12:05       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-15 11:12:05       20 阅读

热门阅读

  1. 智障版本GPT3实现

    2024-03-15 11:12:05       19 阅读
  2. 什么是单向数据流

    2024-03-15 11:12:05       20 阅读
  3. 《软件工程》复试问答题总结

    2024-03-15 11:12:05       20 阅读
  4. Github 2024-03-14 开源项目日报 Top10

    2024-03-15 11:12:05       21 阅读
  5. C++核心高级编程

    2024-03-15 11:12:05       22 阅读
  6. C++学习

    C++学习

    2024-03-15 11:12:05      21 阅读