Flink流批一体计算(24):Flink SQL之mysql维表实时关联

目录

1.维表

2.数据准备

创建源数据

创建维度表

创建Sink表

3.配置任务

Flink SQL创建kafka源表

Flink SQL创建MySQL维表

Flink SQL创建MySQL结果表

编写计算任务

核验数据


1.维表

目前在实时计算的场景中,大多数都使用过MySQL、Hbase、redis作为维表引擎存储一些维度数据,然后在DataStream API中调用MySQL、Hbase、redis客户端去获取到维度数据进行维度扩充。

本案例采用MySQL创建维表,与创建MySQL sink表语法相同。

2.数据准备

创建源数据

重启kafka,创建Topic:  case_kafka_mysql

写入json格式的数据

  {"ts": "20201011","id": 8,"price_amt":211}

创建维度表

在MySQL中创建名为product_dim的表

CREATE TABLE `product_dim` (
  `id` bigint(11) NOT NULL,
  `coupon_price_amt` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

向数据表插入如下数据:

INSERT INTO `product_dim` VALUES (1, 1);
INSERT INTO `product_dim` VALUES (3, 1);
INSERT INTO `product_dim` VALUES (8, 1);
创建Sink表

在MySQL中创建名为sync_test_3的表

CREATE TABLE `sync_test_3` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `ts` varchar(64) DEFAULT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx` (`ts`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

3.配置任务

Flink SQL创建kafka源表
create table flink_test_3 (
  id BIGINT,
  ts VARCHAR,
  price_amt BIGINT,
  proctime AS PROCTIME ()
)
 with (
    'connector' = 'kafka',
    'topic' = 'case_kafka_mysql',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.group.id' = 'flink_gp_test3',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'properties.zookeeper.connect' = '127.0.0.1:2181/kafka'
  );
Flink SQL创建MySQL维表
create table flink_test_3_dim (
  id BIGINT,
  coupon_price_amt BIGINT
)
WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
   'table-name' = 'product_dim',
   'username' = 'root',
   'password' = 'Admin',
   'lookup.max-retries' = '3',
   'lookup.cache.max-rows' = 1000
 );

WITH参数

参数

说明

类型

备注

lookup.cache.max-rows

指定缓存的最大行数。如果超过该值,则最老的行记录将会过期,会被新的记录替换掉。

Integer

默认情况下,维表Cache是未开启的。

lookup.cache.ttl

指定缓存中每行记录的最大存活时间。如果某行记录超过该时间,则该行记录将会过期。

Duration

默认情况下,维表Cache是未开启的。你可以设置lookup.cache.max-rows lookup.cache.ttl参数来启用维表Cache。启用缓存时,采用的是LRU策略缓存。

lookup.cache.caching-missing-key

是否缓存空的查询结果。

Boolean

参数取值如下:

true(默认值):缓存空的查询结果。

false:不缓存空的查询结果。

lookup.max-retries

查询数据库失败的最大重试次数。

Integer

默认值为3

Flink SQL创建MySQL结果表
CREATE TABLE sync_test_3 (
                   ts string,
                   total_gmv bigint,
                   PRIMARY KEY (ts) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://127.0.0.1:3306/db01?characterEncoding=UTF-8',
   'table-name' = 'sync_test_3',
   'username' = 'root',
   'password' = 'Admin'
 );
编写计算任务
INSERT INTO sync_test_3
SELECT
  ts,
  SUM(price_amt - coupon_price_amt) AS total_gmv
FROM
  (
    SELECT
      a.ts as ts,
      a.price_amt as price_amt,
      b.coupon_price_amt as coupon_price_amt
    FROM
      flink_test_3 as a
      LEFT JOIN flink_test_3_dim  FOR SYSTEM_TIME AS OF  a.proctime  as b
     ON b.id = a.id
  )
GROUP BY ts;
核验数据
SELECT id, ts, total_gmv FROM sync_test_3;

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2023-12-06 16:22:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2023-12-06 16:22:02       100 阅读
  3. 在Django里面运行非项目文件

    2023-12-06 16:22:02       82 阅读
  4. Python语言-面向对象

    2023-12-06 16:22:02       91 阅读

热门阅读

  1. Three.js的THREE.LOD如何加载gltf模型

    2023-12-06 16:22:02       59 阅读
  2. pytorch学习4-简易卷积实现

    2023-12-06 16:22:02       67 阅读
  3. BiLSTM-CRF的中文命名实体识别

    2023-12-06 16:22:02       58 阅读
  4. 关于STM32G0 FLASH 写入时出现PGSERR的一种处理办法

    2023-12-06 16:22:02       57 阅读
  5. 管理Android12系统的WLAN热点

    2023-12-06 16:22:02       50 阅读
  6. Android AIDL实现开放系统级API 提供三方app调用

    2023-12-06 16:22:02       39 阅读
  7. spark不同结构Dataset合并

    2023-12-06 16:22:02       50 阅读
  8. 云原生高级--shell自动化脚本备份

    2023-12-06 16:22:02       65 阅读
  9. SCAU:18043 找出3个数中最大的数

    2023-12-06 16:22:02       55 阅读
  10. 关于python vue influxdb的一切 笔记

    2023-12-06 16:22:02       61 阅读
  11. SD-WAN异地组网提升多元企业网络体验

    2023-12-06 16:22:02       61 阅读
  12. 数据结构--堆排序

    2023-12-06 16:22:02       41 阅读
  13. swiper/vue 获取 swiper实例方法

    2023-12-06 16:22:02       62 阅读