【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse

【Flink-Sql-Kafka-To-ClickHouse】使用 FlinkSql 将 Kafka 数据写入 ClickHouse

1)需求分析

1、数据源为 Kafka,定义 Kafka-Topic 为动态临时视图表。

2、写入到 ClickHouse,自定义 Sink 表。

3、source 和 sink 都使用 Flink 集成的 Connector

2)功能实现

导入ClickHouse连接器

<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>flink-connector-clickhouse</artifactId>
    <version>1.14.0</version>
</dependency>

如果在服务器上执行,需要将 jar 放到 Flink 的 lib 目录下。

3)准备工作

3.1.Kafka

1、创建好Topic

2、准备测试数据

{
   
  "id": 1,
  "eventId": "TEST123",
  "eventStDt": "2022-11-3023:37:49",
  "bak6": "测试",
  "bak7": "https://test?user",
  "businessId": "17279811111111111111111111111111",
  "phone": "12345678910",
  "bak1": "1234",
  "bak2": "2022-12-0100:00:00",
  "bak13": "17279811111111111111111111111111",
  "bak14": "APP",
  "bak11": "TEST"
}

3.2.ClickHouse

1、创建表(此处我们使用生产环境中较为常用的 cluster 集群模式建表)

注意集群模式要创建两次表,一次为 local 本地表,一次为 cluster 集群表。

  • local
CREATE TABLE test.kafka2ck_test_local on cluster test_cluster 
(
  `id` UInt32,
  `eventId` LowCardinality(Nullable(String)),
  `eventStDt` LowCardinality(Nullable(String)),
  `bak6` LowCardinality(Nullable(String)),
  `bak7` LowCardinality(Nullable(String)),
  `businessId` LowCardinality(Nullable(String)),
  `phone` LowCardinality(Nullable(String)),
  `bak1` LowCardinality(Nullable(String)),
  `bak2` LowCardinality(Nullable(String)),
  `bak13` LowCardinality(Nullable(String)),
  `bak14` LowCardinality(Nullable(String)),
  `bak11` LowCardinality(Nullable(String))
)
ENGINE = ReplicatedMergeTree
PARTITION BY id
PRIMARY KEY id
ORDER BY id
SETTINGS index_granularity = 8192;
  • cluster
CREATE TABLE test.kafka2ck_test on cluster test_cluster 
(
  `id` UInt32,
  `eventId` LowCardinality(Nullable(String)),
  `eventStDt` LowCardinality(Nullable(String)),
  `bak6` LowCardinality(Nullable(String)),
  `bak7` LowCardinality(Nullable(String)),
  `businessId` LowCardinality(Nullable(String)),
  `phone` LowCardinality(Nullable(String)),
  `bak1` LowCardinality(Nullable(String)),
  `bak2` LowCardinality(Nullable(String)),
  `bak13` LowCardinality(Nullable(String)),
  `bak14` LowCardinality(Nullable(String)),
  `bak11` LowCardinality(Nullable(String))
)
ENGINE = Distributed('test_cluster', 'test', 'kafka2ck_test_local', rand());

4)Flink-Sql

  • source
CREATE TABLE source_kafka_test (
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING
 ) WITH (
    'connector' = 'kafka',
    'topic' = 'test',
    'format'='json',
    'properties.bootstrap.servers' = '${kafka-bootstrap-server}',
    'properties.group.id' = 'test01',
    'scan.startup.mode' = 'earliest-offset',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name' = 'kafka'
);
  • sink
CREATE TABLE sink_ck_test (
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
    'connector' = 'clickhouse',
    'url' = 'jdbc:clickhouse://123.1.1.1:9090',
    'database-name'='test',
    'table-name' = 'kafka2ck_test_local',
    'username' = 'test',
    'password' = '123456',
    'sink.batch-size' = '100',
    'sink.flush-interval' = '1000',
    'sink.max-retries' = '3'
);
  • insert
insert into sink_ck_test select * from source_kafka_test;

5)验证

在 Kafka 中写入对应 ClickHouse 格式的 Json 测试数据,观察 ClickHouse 中是否有数据写入。

{
   
  "id": 1,
  "eventId": "TEST123",
  "eventStDt": "2022-11-3023:37:49",
  "bak6": "测试",
  "bak7": "https://test?user",
  "businessId": "17279811111111111111111111111111",
  "phone": "12345678910",
  "bak1": "1234",
  "bak2": "2022-12-0100:00:00",
  "bak13": "17279811111111111111111111111111",
  "bak14": "APP",
  "bak11": "TEST"
}

最近更新

  1. TCP协议是安全的吗?

    2023-12-17 06:40:02       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-17 06:40:02       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-17 06:40:02       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-17 06:40:02       20 阅读

热门阅读

  1. C语言之数据结构(DAY31)

    2023-12-17 06:40:02       38 阅读
  2. 数据结构 数组与字符串

    2023-12-17 06:40:02       36 阅读
  3. c语言中的 *, &, ** 符合代表什么意思

    2023-12-17 06:40:02       132 阅读
  4. YOLO v8 目标检测识别翻栏

    2023-12-17 06:40:02       33 阅读
  5. 【AI算力】关于国产算力的一些调研分析

    2023-12-17 06:40:02       37 阅读
  6. c/c++中 qsort 与 bsearch 算法的使用

    2023-12-17 06:40:02       25 阅读
  7. vue制作简易日历

    2023-12-17 06:40:02       35 阅读
  8. 计算机网络

    2023-12-17 06:40:02       36 阅读
  9. 计算机网络英文总结

    2023-12-17 06:40:02       38 阅读
  10. B+树和索引

    2023-12-17 06:40:02       33 阅读