flink消费kafka时获取元数据信息

当flink消费kafka时,只需要简单配置就能使用并正常运行

 val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")


    val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    val stream1 = env.addSource(consumer1)
    stream1.print()

    env.execute("KafkaSourceStreaming")

但是,这里用的是最简单的SimpleStringSchema,所以接收到的数据只是我们所理解的一条消息里的值,其包含的时间戳、offset、topic、partition等元信息都不能正常获取,当需要该部分信息时,可以利用KafkaDeserializationSchema 接口来实现自定义的反序列化逻辑。

object KafkaSourceStreaming {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val props = new Properties()
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.30:9092")
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink_test")


    val consumer1 = new FlinkKafkaConsumer[String]("mytest", new SimpleStringSchema(), props)
    val stream1 = env.addSource(consumer1)
    stream1.print()

    val consumer = new FlinkKafkaConsumer("mytest",new CustomKafkaDeserializationSchema(), props)
    val stream = env.addSource(consumer)
    stream.print()

    env.execute("KafkaSourceStreaming")
  }


  /**
    * 获取kafka元数据信息
    */
  class CustomKafkaDeserializationSchema extends KafkaDeserializationSchema[ConsumerRecord[String, String]] {
    override def deserialize(record: ConsumerRecord[Array[Byte], Array[Byte]]): ConsumerRecord[String, String] = {
      val key = if (record.key() == null) null else new String(record.key())
      val value = new String(record.value())
      new ConsumerRecord[String, String](
        record.topic(),
        record.partition(),
        record.offset(),
        record.timestamp(),
        record.timestampType(),
        record.checksum(),
        record.serializedKeySize(),
        record.serializedValueSize(),
        key,
        value,
        record.headers(),
        record.leaderEpoch()
      )
    }

    override def isEndOfStream(nextElement: ConsumerRecord[String, String]): Boolean = false

    override def getProducedType: TypeInformation[ConsumerRecord[String, String]] = {
      TypeInformation.of(new TypeHint[ConsumerRecord[String, String]]() {})
    }
  }
}

相关推荐

  1. flink消费kafka获取数据信息

    2024-06-14 08:30:04       32 阅读
  2. Flink对接Kafka的topic数据消费offset设置参数

    2024-06-14 08:30:04       61 阅读
  3. kafka无法消费数据

    2024-06-14 08:30:04       49 阅读
  4. springboot集成kafka消费数据

    2024-06-14 08:30:04       55 阅读
  5. Flink 数据类型 & TypeInformation信息

    2024-06-14 08:30:04       54 阅读
  6. FLink消费Kafka之FlinkConsumer到KafkaSource的转变】

    2024-06-14 08:30:04       67 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-14 08:30:04       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-14 08:30:04       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-14 08:30:04       82 阅读
  4. Python语言-面向对象

    2024-06-14 08:30:04       91 阅读

热门阅读

  1. 保存csv到mysql的通用脚本

    2024-06-14 08:30:04       23 阅读
  2. Shell 输入/输出重定向

    2024-06-14 08:30:04       27 阅读
  3. 人生结果等于思维方式乘以热情乘以能力

    2024-06-14 08:30:04       35 阅读
  4. Spring事务相关

    2024-06-14 08:30:04       34 阅读
  5. 深入理解MyBatis XML配置文件

    2024-06-14 08:30:04       31 阅读
  6. 深入解析Web通信 HTTP、HTTPS 和 WebSocket

    2024-06-14 08:30:04       26 阅读
  7. 阿里云aliyun cli的作用以及安装步骤

    2024-06-14 08:30:04       35 阅读
  8. ffmpeg把视频文件转码为MP4格式

    2024-06-14 08:30:04       34 阅读
  9. 「C系列」C 函数指针/回调函数

    2024-06-14 08:30:04       37 阅读
  10. Linux 如何查看磁盘空间占用

    2024-06-14 08:30:04       31 阅读