大数据之kafka应用

2024启kafka

kafka常见命令

  1. 指定jmx端口启动kafka lsof -i :9999

    JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
    
  2. 新建topic:副本不能大于kafka_server数

    /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --partitions 3 --replication-factor 1 --create --topic knowScript
    
  3. 查看topic的详情

    /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --describe --topic knowScript
    
  4. 罗列所有的topic

    /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --list
    
  5. 给topic拓展分区:只增大不减小

    /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --alter --topic knowScript --partitions 4
    
  6. 查看topic的磁盘大小,单位为Byte

    /opt/kafka_2.12-3.1.0/bin/kafka-log-dirs.sh --bootstrap-server only:9092 --topic-list knowScript --describe |  grep -oP '(?<=size":)\d+' |  awk '{ sum += $1 } END { print sum }'
    
  7. 删除topic

    /opt/kafka_2.12-3.1.0/bin/kafka-topics.sh --bootstrap-server only:9092 --topic knowScript --delete
    
  8. 查看消费者组列表

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --list
    
  9. 查看特定消费者组的消费情况

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
    
  10. 新建一个消费者组:消费一个topic即可

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group makeGroup
    
  11. 删除一个消费者组

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --delete --group makeGroup
    
  12. 往特定topic生产message,不指定key

    /opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript
    
  13. 往特定topic生产message,指定key

    /opt/kafka_2.12-3.1.0/bin/kafka-console-producer.sh --bootstrap-server only:9092 --topic knowScript --property parse.key=true --property key.separator=,
    
  14. 消费指定topic的message:–from-beginning,默认为最近的消息消费

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --from-beginning
    
  15. 消费topic的message,限制条数

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0 --max-messages 3
    
  16. 消费指定topic的message:指定分区的offset

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset latest --partition 0
    
    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset earliest --partition 0
    
    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic knowScript --offset 10 --partition 0
    
  17. 查看kafka的log文件明文内容

    /opt/kafka_2.12-3.1.0/bin/kafka-dump-log.sh --files /opt/kafka_2.12-3.1.0/data/kafka_logs/knowScript-0/00000000000000000000.log  -deep-iteration --print-data-log
    
  18. 查看topic的message总数

    1. 每个分区的最小offset

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript  --time -2
      
    2. 每个分区的最大offset

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server only:9092 -topic knowScript  --time -1
      
  19. 查看指定消费者组的消费情况offset和lag

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
    
  20. 重设指定消费者组消费指定的offset(每个分区)

    1. 打印结果,没有执行(生产使用这个验证)

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --dry-run
      
    2. 直接执行

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --execute
      
    3. 重设指定消费者组指定分区的指定offset(topic后使用 :$partitionNum来设定指定的partition)

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1  --execute
      
  21. 指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning
    
  22. 生产message基准测试; num-records生产的message条数,through为-1表示不限制吞吐量,record-size表示每条record的大小为1024K, producer-props后面跟着kv的producer属性配置

    /opt/kafka_2.12-3.1.0/bin/kafka-producer-perf-test.sh --topic test_producer_perf --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=only:9092 acks=-1 linger.ms=2000 compression.type=lz4
    
  23. 消费message基准测试

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-perf-test.sh --broker-list only:9092 --messages 10000000 --topic test_producer_perf
    

生产上重放信息

  1. 查看指定消费者组的消费情况offset

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --describe --group consumeByOffsetId
    
  2. 重设指定消费者组消费指定的offset(每个分区)

    /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --execute
    
    • 打印结果,没有执行

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 4 --topic consumerTopicByOffset  --dry-run
      
    • other:重设指定消费者组指定分区的指定offset(topic后使用 :partitionNum来设定指定的partition)

      /opt/kafka_2.12-3.1.0/bin/kafka-consumer-groups.sh --bootstrap-server only:9092 --group consumeByOffsetId --reset-offsets --to-offset 3 --topic consumerTopicByOffset:1  --dry-run
      
  3. 指定消费者组从最新的offset进行消费(没有指定group,则全部都消费,指定了会从消费者组最新的offset开始消费)

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --group consumeByOffsetId --from-beginning
    
  • 指定分区指定offset进行开始消费(查看某消费者组在特定的分区丢失了哪些消息)

    /opt/kafka_2.12-3.1.0/bin/kafka-console-consumer.sh --bootstrap-server only:9092 --topic consumerTopicByOffset --offset 10 --partition 0
    

jmx

jmx的配置和开启

  1. 配置jmx

    JMX_PORT=9999 /opt/kafka_2.12-3.1.0/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
    
    • 配置文件配置更稳定

jmx的使用

jmx例子一
  1. 参考url的MBEAN

    https://docs.confluent.io/platform/current/kafka/monitoring.html#kafka-monitoring-metrics-broker

    1. 找到category类别下面的MBEAN:

      kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic={topicName}
      
    2. 获取到对应的MBEAN的指标值

      /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
      
      • 输出列有

        "time","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:Count","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:EventType","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FifteenMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:FiveMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:MeanRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:OneMinuteRate","kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec:RateUnit"
        
      • 寻找上面的name的值,如 FifteenMinuteRate

        /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --date-format "YYYY-MM-dd HH:mm:ss" --attributes FifteenMinuteRate --reporting-interval 5000
        
        • –date-format格式化时间 、 --reporting-interval指定更新时间间隔

jmx例子二

  1. 找到category类别下面的MBEAN

    kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs
    
  2. 获取到对应的MBEAN的指标值

    /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
    
    1. 输出列有

      "time","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:50thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:75thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:95thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:98thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:999thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:99thPercentile","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Count","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Max","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Mean","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:Min","kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs:StdDev"
      
      • 寻找上面的name的值,如 75thPercentile、 95thPercentile

        /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 75thPercentile
        
        /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes 95thPercentile
        

jmx例子三

  1. 找到category类别下面的MBEAN

    kafka.network:type=RequestChannel,name=ResponseQueueSize
    
  2. 获取到对应的MBEAN的指标值

    /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
    
    • 输出列有

      • "time","kafka.network:type=RequestChannel,name=ResponseQueueSize:Value"
        
    • 寻找上面的name的值,如 Value

      • /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.network:type=RequestChannel,name=ResponseQueueSize --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi --attributes Value
        

jmx例子四(special)

  1. 并不是所有的MBEAN都有对应的jmx指标值;例如:kafka.log:type=Log,name=LogEndOffset

  2. 下面的命令会报错

    /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
    
  3. 但是从jconsole localhost:9999的弹窗MBEAN可以找到;上面更深层的信息

    /opt/kafka_2.12-3.1.0/bin/kafka-run-class.sh kafka.tools.JmxTool --object-name kafka.log:type=Log,name=LogEndOffset,topic=flink2kafka,partition=0 --jmx-url service:jmx:rmi:///jndi/rmi://only:9999/jmxrmi
    

相关推荐

  1. 数据kafka应用

    2024-04-12 05:50:01       17 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-12 05:50:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-12 05:50:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-12 05:50:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-12 05:50:01       20 阅读

热门阅读

  1. (最新)itext7 freemarker动态模板转pdf

    2024-04-12 05:50:01       13 阅读
  2. jodconverter+openOffice word文档pdf转换

    2024-04-12 05:50:01       14 阅读
  3. Spring Cloud启动类上的注解详解

    2024-04-12 05:50:01       15 阅读
  4. SpringMVC原理分析(十二)--异常处理流程

    2024-04-12 05:50:01       13 阅读
  5. 浅谈:从医疗元宇宙向更多实业领域的拓展

    2024-04-12 05:50:01       15 阅读
  6. Flask、Django和Tornado怎么选

    2024-04-12 05:50:01       11 阅读
  7. 【C#】C#匹配两个相似的字符串莱文斯坦距离

    2024-04-12 05:50:01       13 阅读
  8. 定期与设定域名地址交互工具

    2024-04-12 05:50:01       14 阅读
  9. 探索量子计算:打开未来技术的大门

    2024-04-12 05:50:01       14 阅读
  10. android 判断是否联网

    2024-04-12 05:50:01       12 阅读
  11. C#面:如何创建一个自定义异常?

    2024-04-12 05:50:01       16 阅读
  12. js常用数据处理方法

    2024-04-12 05:50:01       17 阅读