重生之 SpringBoot3 入门保姆级学习(24、场景整合 kafka 消息发送服务)

重生之 SpringBoot3 入门保姆级学习(24、场景整合 kafka 消息发送服务)

6.4 消息发送服务


  • 访问 kafka-ui (注意这里需要换成你自己的服务器或者虚拟机的 IP 地址,虚拟机可以用局域网 192.168.xxx.xxx 的地址)
http://192.168.1.4:8080/ui
  • 创建主题

image-20240615215130002

  • 创建成功

image-20240615215153137

  • application.properties 配置文件(注意需要改成你的 kafka 地址,也就是浏览器访问的主机 IP )
spring.kafka.bootstrap-servers=192.168.1.4:9092
  • 测试代码
package com.zhong.message;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StopWatch;

import java.util.concurrent.CompletableFuture;

@SpringBootTest
class Boot309MessageApplicationTests {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Test
    void contextLoads() {
        // 秒表功能
        StopWatch stopWatch = new StopWatch();

        // 异步处理等待处理完
        CompletableFuture[] futures = new CompletableFuture[10000];

        // 开始计时
        stopWatch.start();
        for (int i = 0; i < 10000; i++) {
            // 异步的
            CompletableFuture future = kafkaTemplate.send("newshh", "name", "小钟");
            futures[i] = future;
        }
        CompletableFuture.allOf(futures).join();
        // 停止计时
        stopWatch.stop();
        // 统计计时从开始到结束用了多少毫秒
        long millis = stopWatch.getTotalTimeMillis();
        System.out.println("10000个消息发送完成!ms时间:" + millis);
    }

}
  • 运行测试

image-20240616103426476

  • 发送成功

image-20240616103312638

image-20240616103331430

  • 新建 Person 类
package com.zhong.message.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

/**
 * @ClassName : Person
 * @Description :
 * @Author : zhx
 * @Date: 2024-06-14 16:11
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Person implements Serializable {
    private Long id;
    private String name;
    private Integer age;
    private String email;
}
  • 新建测试代码
package com.zhong.message;

import com.zhong.message.entity.Person;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.StopWatch;

import java.util.concurrent.CompletableFuture;

@SpringBootTest
class Boot309MessageApplicationTests {

    @Autowired
    KafkaTemplate kafkaTemplate;

    @Test
    void send() {
        CompletableFuture future = kafkaTemplate.send("newsPerson", "person", new Person(1L, "小王", 21, "testemal"));
        future.join();
        System.out.println("消息发送成功!");
    }

}
  • 如果发送出现问题请配置值的序列化规则
# 值的序列化规则
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

image-20240616104315342

  • 测试成功

image-20240616104500960

image-20240616104651323

最近更新

  1. TCP协议是安全的吗?

    2024-06-17 07:04:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-17 07:04:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-17 07:04:03       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-17 07:04:03       20 阅读

热门阅读

  1. 22.2 正则表达式-数据验证、数据变换

    2024-06-17 07:04:03       6 阅读
  2. golang实现循环队列

    2024-06-17 07:04:03       8 阅读
  3. github基础使用

    2024-06-17 07:04:03       6 阅读
  4. QSharedMemory使用详解

    2024-06-17 07:04:03       7 阅读
  5. Qt 实战(4)信号与槽 | 4.3、信号连接信号

    2024-06-17 07:04:03       6 阅读
  6. 跨域资源共享(CORS)问题与解决方案

    2024-06-17 07:04:03       7 阅读
  7. wxml与标准的html的异同?

    2024-06-17 07:04:03       6 阅读
  8. 3.1. 马氏链-马氏链的定义和示例

    2024-06-17 07:04:03       6 阅读
  9. Android基础-JNI

    2024-06-17 07:04:03       6 阅读
  10. 一个简单的UDP客户端和服务端的完整C++示例

    2024-06-17 07:04:03       7 阅读
  11. 学习vite的核心原理

    2024-06-17 07:04:03       6 阅读
  12. Flutter学习(一)

    2024-06-17 07:04:03       7 阅读
  13. 【websocket】怎么终止websocket断开重连

    2024-06-17 07:04:03       9 阅读