华为fusioninsight集群kafka连接配置

1.在application.propertie新增如下配置:

fusioninsight.kafka.bootstrap-servers= ${KAFKA_URL:10.7.212.111:9200}
fusioninsight.kafka.security.protocol= SASL_PLAINTEXT
fusioninsight.kafka.kerberos.domain.name= hadoop.hadoop.com
fusioninsight.kafka.sasl.kerberos.service.name= kafka
kerberos.jaas=${JAAS_PATH:E:\demo\huawei\\jaas.conf}
kerberos.krb5=${KRB5_PATH:E:\demo\huawei\\krb5.conf}

2.新增kafka配置

@Configuration
@Slf4j
public class PlatformConsumerConfig {

    @Bean(name = "authKafkaContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(10);
        ContainerProperties properties = factory.getContainerProperties();
        properties.setMissingTopicsFatal(false);
        properties.setPollTimeout(1500);
        //设置kafka监听工厂禁止自启动
        factory.setAutoStartup(false);
        factory.setBatchListener(true);
        return factory;
    }

    @Bean
    public PlatformListener platformListener() {
        return new PlatformListener();
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Value("${fusioninsight.kafka.bootstrap-servers}")
    public String boostrapServers;

    @Value("${fusioninsight.kafka.security.protocol}")
    public String securityProtocol;

    @Value("${fusioninsight.kafka.kerberos.domain.name}")
    public String kerberosDomainName;

    @Value("${fusioninsight.kafka.sasl.kerberos.service.name}")
    public String kerberosServiceName;

    @Value("${kerberos.krb5}")
    private String kerberoskrb5;

    @Value("${kerberos.jaas}")
    private String kerberosJaas;

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, boostrapServers);
        configs.put(AdminClientConfig.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        configs.put("sasl.kerberos.service.name", kerberosServiceName);
        configs.put("kerberos.domain.name", kerberosDomainName);
        return new KafkaAdmin(configs);
    }

    @Bean
    public ConsumerFactory<Object, Object> consumerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("security.protocol", securityProtocol);
        configs.put("kerberos.domain.name", kerberosDomainName);
        configs.put("bootstrap.servers", boostrapServers);
        configs.put("sasl.kerberos.service.name", kerberosServiceName);
        configs.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return new DefaultKafkaConsumerFactory<>(configs);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("sasl.kerberos.service.name", kerberosServiceName);
        configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("security.protocol", securityProtocol);
        configs.put("kerberos.domain.name", kerberosDomainName);
            configs.put("bootstrap.servers", boostrapServers);
            // 这里开始认证 使用自己配置的文件路径
            System.setProperty("java.security.auth.login.config", kerberosJaas);
            System.setProperty("java.security.krb5.conf", kerberoskrb5);
            log.info("---kerberos on kafka use default--"+ "jaas:" + System.getProperty("java.security.auth.login.config") + " krb5:" + System.getProperty("java.security.krb5.conf"));
            ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs);
            return new KafkaTemplate<>(producerFactory);
    }


}

3.使用

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
dataMessageVO.setAccountList(accountList);
messageSend2KafkaVo.setData(dataMessageVO);
messageSend2KafkaVo.setType("account");
log.info("--message to kafka for account:-->" + JSON.toJSONString(messageSend2KafkaVo));
ProducerRecord<String, String> record = new ProducerRecord<String, String>("asset_rep", JSON.toJSONString(messageSend2KafkaVo));
Object o = kafkaTemplate.send(record).get();
log.info("--message to kafka for account result2:--->" + o.toString());

相关推荐

  1. 华为fusioninsightkafka连接配置

    2023-12-08 08:02:01       37 阅读
  2. Kafka安装与配置

    2023-12-08 08:02:01       20 阅读
  3. 连接ssl加密的kafka

    2023-12-08 08:02:01       8 阅读
  4. Kafka的安装与配置(二)

    2023-12-08 08:02:01       30 阅读
  5. Kafka部署

    2023-12-08 08:02:01       34 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-08 08:02:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-08 08:02:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-08 08:02:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-08 08:02:01       20 阅读

热门阅读

  1. android 13.0 第三方输入法app设置系统默认输入法

    2023-12-08 08:02:01       37 阅读
  2. macos安装metal 加速版 pytorch

    2023-12-08 08:02:01       40 阅读
  3. 你知道模拟养成游戏如何开发吗?

    2023-12-08 08:02:01       41 阅读
  4. MLX:苹果发布新的机器学习框架

    2023-12-08 08:02:01       39 阅读
  5. OSPF/Open Shortest Path First

    2023-12-08 08:02:01       40 阅读