PiflowX组件-ReadFromKafka

ReadFromKafka组件

组件说明

从kafka中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称 展示名称 默认值 允许值 是否必填 描述 例子
kafka_host KAFKA_HOST “” 逗号分隔的Kafka broker列表。 127.0.0.1:9092
topic TOPIC “” 读取数据的topic名。亦支持用分号间隔的topic列表,如 ‘topic-1;topic-2’。" "注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。 topic-1
topic_pattern TOPIC_PATTERN “” 匹配读取topic名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的topic都将被Kafka consumer订阅。注意,‘topic’ 和 ‘topic-pattern’ 两个选项只能使用其中一个。 topic1_*
startup_mode STARTUP_MODE “” Set(“earliest-offset”, “latest-offset”, “group-offsets”, “timestamp”, “specific-offsets”) Kafka consumer 的启动模式。 earliest-offset
schema SCHEMA “” Kafka消息的schema信息。 id:int,name:string,age:int
format FORMAT “” Set(“json”, “csv”, “avro”, “parquet”, “orc”, “raw”, “protobuf”,“debezium-json”, “canal-json”, “maxwell-json”, “ogg-json”) 用来反序列化Kafka消息的格式。注意:该配置项和 ‘value.format’ 二者必需其一。 json
group GROUP “” Kafka source的消费组id。如果未指定消费组ID,则会使用自动生成的"KafkaSource-{tableIdentifier}"作为消费组ID。 group_1
properties PROPERTIES “” Kafka source连接器其他配置

ReadFromKafka示例配置

{
   
  "flow": {
   
    "name": "DataGenTest",
    "uuid": "1234",
    "stops": [
      {
   
        "uuid": "0000",
        "name": "DataGen1",
        "bundle": "cn.piflow.bundle.flink.common.DataGen",
        "properties": {
   
          "schema": "[{\"filedName\":\"id\",\"filedType\":\"INT\",\"kind\":\"sequence\",\"start\":1,\"end\":10000},{\"filedName\":\"name\",\"filedType\":\"STRING\",\"kind\":\"random\",\"length\":15},{\"filedName\":\"age\",\"filedType\":\"INT\",\"kind\":\"random\",\"max\":100,\"min\":1}]",
          "count": "100",
          "ratio": "5"
        }
      },
      {
   
        "uuid": "1111",
        "name": "WriteToKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka",
        "properties": {
   
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "schema": "",
          "format": "json",
          "properties": "{}"
        }
      },
      {
   
        "uuid": "2222",
        "name": "ReadFromKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka",
        "properties": {
   
          "kafka_host": "hadoop01:9092",
          "topic": "test",
          "group": "test",
          "startup_mode": "earliest-offset",
          "schema": "id:int,name:string,age:int",
          "format": "json",
          "properties": "{}"
        }
      },
      {
   
        "uuid": "3333",
        "name": "ShowData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowData",
        "properties": {
   
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
   
        "from": "DataGen1",
        "outport": "",
        "inport": "",
        "to": "WriteToKafka1"
      },
      {
   
        "from": "WriteToKafka1",
        "outport": "",
        "inport": "",
        "to": "ReadFromKafka1"
      },
      {
   
        "from": "ReadFromKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowData1"
      }
    ]
  }
}
示例说明

本示例演示了通过DataGen组件生成id,name,age3个字段100条数据,每秒生成5条数据,通过WriteToKafka组件将数据写入到kafka的test topic中,然后通过ReadFromKafka组件从test topic中读取数据,最后使用ShowData组件将数据打印在控制台。

字段描述
[
    {
          
        "filedName": "id",
        "filedType": "INT",
        "kind": "sequence",
        "start": 1,
        "end": 10000
    },
        {
          
        "filedName": "name",
        "filedType": "STRING",
        "kind": "random",
        "length": 15
    },
        {
          
        "filedName": "age",
        "filedType": "INT",
        "kind": "random",
        "max": 100,
        "min": 1
    } 
]

1.id字段

id字段类型为INT,使用sequence生成器,序列生成器的起始值为1,结束值为10000.

2.name字段

name字段类型为STRING,使用random生成器,生成字符长度为15。

3.age字段

age字段类型为INT,使用random生成器,随机生成器的最小值为1,最大值为100。
在这里插入图片描述

相关推荐

  1. PiflowX-OracleCdc

    2024-01-03 15:44:04       33 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-03 15:44:04       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-03 15:44:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-03 15:44:04       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-03 15:44:04       20 阅读

热门阅读

  1. vue3-13

    vue3-13

    2024-01-03 15:44:04      33 阅读
  2. openCV处理音视频的常用API及一般流程

    2024-01-03 15:44:04       40 阅读
  3. 详解 docker 镜像制作的两种方式

    2024-01-03 15:44:04       33 阅读
  4. 腾讯云2核2G轻量应用服务器300GB月流量够用吗?

    2024-01-03 15:44:04       40 阅读
  5. 基于遗传算法的药品配送,遗传算法原理

    2024-01-03 15:44:04       36 阅读
  6. 搞懂RestTemplate一篇就够了

    2024-01-03 15:44:04       36 阅读
  7. flutter项目初始化

    2024-01-03 15:44:04       42 阅读
  8. 搭建个人深度学习工作站(捡垃圾)

    2024-01-03 15:44:04       42 阅读
  9. QtConcurrent记录

    2024-01-03 15:44:04       38 阅读
  10. 力扣42. 接雨水

    2024-01-03 15:44:04       35 阅读
  11. 算法:动态规划

    2024-01-03 15:44:04       40 阅读
  12. setFirstResult ,setMaxResults

    2024-01-03 15:44:04       33 阅读
  13. pip安装报错SSL

    2024-01-03 15:44:04       45 阅读
  14. 简易版前端项目离线方案-接口及页面离线缓存

    2024-01-03 15:44:04       37 阅读
  15. C++ gRPC helloworld 示例代码

    2024-01-03 15:44:04       39 阅读