2. kafka消息队列

一、kafka消息队列

消息服务, 简称MQ
用于在分布式业务环境,实现不同组件、不同的功能模块的高效通信

代表性的MQ软件:
kafka, 十万并发
RocketMQ 百万并发
rabbitMQ
zeroMQ

二、消息服务的术语

在这里插入图片描述

  • producer 生产者
    产生消息的进程

  • consumer 消费者
    接收、处理消息的进程

  • broker 消息服务器

  • topics 主题
    消息的分组,根据业务不同的模块建不同的主题

  • partation 分区
    确保某一个主题的消息的有序性

三、kafka消息确认机制 ACK

producer发送消息后,leader将消息同步给follower,然后返回ack给producer,表示消息已收到,此时才可以继续发送下一条消息。

kafka提供了以下3种ack级别:
0:leader接收到消息马上返回ack,此时可能还没有写入磁盘,可能丢失数据
1:leader将消息写入磁盘后,马上返回ack,此时可能还没同步follower,同样可能丢失数据
-1(all):leader和follower都将数据写入磁盘后,返回ack。但是如果在写入磁盘后,ack尚未发送,此时leader发生故障,会导致数据写入重复

四、kafka安装部署

1、环境规划

192.168.140.10 kafka
192.168.140.11 kafka
192.168.140.12 kafka

2、使用事先部署好的zookeeper管理kafka的高可用

3、安装jdk

4、安装kafka

[root@node01 ~]# tar xf kafka_2.12-3.3.1.tgz -C /usr/local/
[root@node01 ~]# mv /usr/local/kafka_2.12-3.3.1/ /usr/local/kafka33

[root@node01 ~]# vim /etc/profile
export KAFKA_HOME=/usr/local/kafka33
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin

[root@node01 ~]# source /etc/profile

5、配置kafka

[root@node01 ~]# mkdir /usr/local/kafka33/log
[root@node01 ~]# vim /usr/local/kafka33/config/server.properties

broker.id=0

listeners=PLAINTEXT://192.168.140.10:9092
log.dirs=/usr/local/kafka33/log

num.network.threads=8
num.io.threads=16

zookeeper.connect=192.168.140.10:2181,192.168.140.11:2181,192.168.140.12:2181

另外两台消息服务器配置参考上述,注意修改broker id、监听IP

6、启动kafka

[root@node01 bin]# ./kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties 
[root@node01 bin]# 
[root@node01 bin]# netstat -tunlp | grep 9092
tcp6       0      0 192.168.140.10:9092     :::*                    LISTEN      12309/java          
[root@node01 bin]# 
[root@node01 bin]# 

在zookeeper中查看kafka注册的数据

[root@node01 bin]#  /usr/local/zookeeper/bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 0] ls /brokers 
[ids, seqid, topics]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids 
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/ids/0
[]
[zk: localhost:2181(CONNECTED) 3] 
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.10:9092"],"jmx_port":-1,"features":{},"host":"192.168.140.10","timestamp":"1718782930115","port":9092,"version":5}

7、测试生产者、消费者模型

7.1 创建主题

[root@node01 ~]# kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --bootstrap-server 192.168.140.10:9092
Created topic test.
[root@node01 ~]# 
[root@node01 ~]# kafka-topics.sh --list --bootstrap-server 192.168.140.10:9092
test
[root@node01 ~]# 

7.2 测试生产者产生数据

[root@node01 ~]# kafka-console-producer.sh --broker-list 192.168.140.10:9092 --topic test
>nginx
>httpd
>php
>mysql
>redis

7.3 测试消费者接收数据

[root@node01 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.140.10:9092 --topic test --from-beginning
nginx
httpd
php
mysql
redis

相关推荐

  1. 消息队列Kafka

    2024-06-18 16:44:03       47 阅读
  2. 消息队列Kafka

    2024-06-18 16:44:03       60 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-18 16:44:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-18 16:44:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-18 16:44:03       82 阅读
  4. Python语言-面向对象

    2024-06-18 16:44:03       91 阅读

热门阅读

  1. 椋鸟C++笔记#5:C++内存管理

    2024-06-18 16:44:03       32 阅读
  2. 【网络协议栈】IGMP

    2024-06-18 16:44:03       21 阅读
  3. Jenkins简要说明

    2024-06-18 16:44:03       35 阅读
  4. 【Mysql】 MySQL索引的使用

    2024-06-18 16:44:03       32 阅读
  5. 安装docker+mysql的一些坑

    2024-06-18 16:44:03       29 阅读
  6. C++的标准容器及其应用

    2024-06-18 16:44:03       21 阅读
  7. WDF驱动开发-工作项

    2024-06-18 16:44:03       31 阅读
  8. 姜萍成了工具人?

    2024-06-18 16:44:03       25 阅读
  9. 2024最新四级翻译【练习2】

    2024-06-18 16:44:03       29 阅读
  10. Linux第十一章:Samba文件共享服务

    2024-06-18 16:44:03       27 阅读
  11. QSet使用详解

    2024-06-18 16:44:03       31 阅读
  12. 水土保持设计乙级资质升甲级的条件?

    2024-06-18 16:44:03       26 阅读