数据建设实践之数据规范

目录

一、命名规范

1.业务系统简称规范

2.数据源简称规范

3.数仓分层规范

4.业务板块简称规范

4.数据域简称规范

5.数据域简称规范

6.业务过程简称规范

7.表名规范

通用规范:

二、字段规范

1.Hive表字段类型规范

2.CK表字段类型规范

3.字段默认值规范

4.审计列规范

三、指标规范

四、建模规范

通用规范:

1.STG层建模规范

2.ODS层建模规范

3.DIM层建模规范

4.DWD层建模规范

5.DWS层建模规范

6.ADS层建模规范

五、建表规范

通用规范:

1.STG层建表规范

2.ODS层建表规范

3.DIM层建表规范

4.DWD层建表规范

5.DWS层建表规范

6.ADS层建表规范

7.DAS层建表规范

六、代码规范

通用规范:

1.STG层代码规范

2.ODS层代码规范

3.DIM层代码规范

4.DWD层代码规范

5.DWS层代码规范

6.ADS层代码规范

 7.DAS表代码规范


一、命名规范

1.业务系统简称规范

命名规则:3-6个英文字母,数字组成

举例:

No

英文简称

英文名称

说明

1

WMS

Warehouse Management System

仓库管理系统

2

MES

Manufacturing Execution System

生产管理系统

3

SCM

Supply Chain Management

供应链管理

4

CRM

Customer Relationship Management

客户管理系统

2.数据源简称规范

命名规则:3-6个英文字母,数字组成

举例:

No

简称

说明

1

WMSDB

仓库管理数据库

3.数仓分层规范

No 

简称

说明

1

STG

贴源层

2

ODS

操作层

3

DIM

公共维度层

4

DWD

明细层

5

DWS

指标层

6

ADS

应用层

4.业务板块简称规范

命名规则:3-6个英文字母,数字组成

举例:

No 

简称

说明

1

SAL

销售板块

2

FMS

财务板块

4.数据域简称规范

命名规则:3-6个英文字母,数字组成

举例:

No 

简称

说明

1

CUST

客户域

2

TRAD

交易域

5.数据域简称规范

命名规则:3-6个英文字母,数字组成

举例:

No 

简称

说明

1

CUST

客户域

2

TRAD

交易域

6.业务过程简称规范

命名规则:3-6个英文字母,数字组成

举例:

No 

简称

说明

1

ORD

下单

3

PAY

支付

7.表名规范

通用规范:

  1. 增量全量标识:i-增量,f-全量
  2. 刷新周期标识:ss-实时,mm-分钟,hh-小时,d-日,w-周,m-月,q-季,y-年
  3. 刷新周期范围缩写:nd-多少天,td-历史至今,sd-年初至今,自然周-cw

No

分层名称

命名规范

说明

1

STG

STG_{源数据库简称}_{源表名}_{刷新周期标识}{增量全量标识}

stg_ord_t_brand_di

2

ODS

ODS_{源数据库简称}_{源表名}_{刷新周期标识}{增量全量标识}

ods_ord_t_brand_df

3

DIM

DIM_{业务板块/pub}_{自定义表名}_{刷新周期标识}{增量全量标识}

dim_mes_factory_df

4

DWD

DWD_{业务板块}_{数据域}_{业务过程}_{自定义表名}_{刷新周期标识}{增量全量标识}

dwd_sal_trad_ord_pos_order_df

5

DWS

DWS_{业务板块}_{数据域}_{统计日期}_{自定义表名}_{统计周期范围}{刷新周期标识}

dws_sal_trad_confirm_date_shop_retn_order_1d

6

ADS

ADS_{业务板块}_{数据域}_{自定义表名}_{统计周期范围}{刷新周期标识}

ads_sal_trad_c_order_analysis_1d

7

TMP

TMP_{表名}

tmp_dim_mes_factory_df

二、字段规范

1.Hive表字段类型规范

No 

类型名称

字段类型

长度

1

字符串

string

2

长整型

bigint

3

短整型

tinyint

4

日期

string

5

浮点数

decimal

(18,4),(18,8)

2.CK表字段类型规范

No 

类型名称

字段类型

长度

1

字符串

String

2

长整型

Int64

3

短整型

Int8

4

日期(年月日)

Date

5

日期(年月日时分秒)

DateTime

6

浮点数

decimal

(18,4),(18,8)

3.字段默认值规范

No 

默认值类型

默认值

1

指标数值

0

2

维度数值

-99

3

时间

1970-01-01 00:00:00

日期

1970-01-01

5

字符串

None

4.审计列规范

分层名称

字段名称

注释

类型

STG

source_db

源数据库名称

string

imported_time

程序导入时间

string

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

ODS

source_db

源数据库名称

string

is_deleted

是否(物理)删除

tinyint

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

DIM

is_deleted

是否(物理)删除

tinyint

Is_current(拉链表)

是否当前版本

tinyint

start_time(拉链表)

开始使用日期

string

end_time(拉链表)

结束日期

string

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

DWD

is_deleted

是否(物理)删除

tinyint

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

DWS

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

ADS

inserted_time

目标初次插入时间

string

updated_time

目标最后更新时间

string

三、指标规范

指标分为:

原子指标:事实表中的度量,原子指标=业务过程+度量

派生指标:根据业务需求加工的度量,派生指标=修饰词+原子指标+统计周期范围+刷新周期标识

衍生指标:一个或多各派生指标通过数学公式加工的度量。衍生指标=派生指标+公式

命名规则:{修饰词}_{指标名称}_{统计周期范围}{刷新周期标识}_{统计时间序号}{可变标识}

统计时间序号:代表不同的统计时间,例如预约时间、下单时间。

取值范围{a-z}

可变标识:根据统计时间判断是否已经完成或正在发生的事实

i-不可变;v-可变

事例:pay_sale_amt_1d_ai

说明:按照订单支付时间统计最近1天已经支付的订单金额

四、建模规范

通用规范:

  1. 表和每个字段必须有注释
  2. 字段类型按照规范的数据类型
  3. 键类型:
  4.         主键(PK):表的主键
  5.         外键(FK):表的外键
  6.         业务主键(BUSK):从业务的角度来确定数据的粒度
  7.         变化时间戳(CDCK):增量更新字段
  8.         审计键(AUK):表的审计字段
  9.         分区键(PTK):表的分区字段
  10.     4.除了审计和分区字段必须有源表字段

1.STG层建模规范

  1. 表存储格式textfile
  2. 字段顺序和源库保持一致
  3. 表添加分区字段dt

2.ODS层建模规范

  1. 表存储格式orc
  2. 数据压缩是snappy
  3. 写入方式:truncate,merge
  4. 字段顺序和源库保持一致
  5. 表添加分区字段dt

3.DIM层建模规范

  1. 表存储格式orc
  2. 数据压缩是snappy
  3. 写入方式:truncate,linked
  4. 字段顺序按照主键组、外键组、字典组、属性组、指标组、审计组、分区组进行排序
  5. 外键组和字典组把相关的编码和名称冗余存储

4.DWD层建模规范

  1. 表存储格式orc
  2. 数据压缩是snappy
  3. 写入方式:merge
  4. 字段顺序按照主键组、外键组、字典组、属性组、指标组、审计组、分区组进行排序
  5. 外键组只存储ID,保证数据粒度一致,添加关联维度表信息
  6. 字典组把相关的编码和名称冗余存储

5.DWS层建模规范

  1. 表存储格式orc
  2. 数据压缩是snappy
  3. 字段顺序按照主键组、外键组、字典组、属性组、指标组、审计组、分区组进行排序
  4. 外键组只存储ID,保证数据粒度一致,添加关联维度表信息
  5. 字典组把相关的编码和名称冗余存储
  6. 表添加分区字段dt

6.ADS层建模规范

  1. 表存储格式orc
  2. 数据压缩是snappy
  3. 字段顺序按照业务需求进行排序
  4. 字典组把相关的编码和名称冗余存储
  5. 表添加分区字段dt

五、建表规范

通用规范:

  1. 表名前加上数据库名
  2. 数据库名、表名、字段名必须带上反引号
  3. 数据库关键字大写

1.STG层建表规范

举例:

DROP TABLE IF EXISTS `stg`.`stg_ord_t_brand_df`;
CREATE TABLE IF NOT EXISTS `stg`.`stg_ord_t_brand_df`(
    `id`                           bigint           COMMENT '主键' 
   ,`brand_china_name`             string           COMMENT '品牌名称(中文)' 
   ,`brand_english_name`           string           COMMENT '品牌名称(英文)' 
   ,`brand_desc`                   string           COMMENT '品牌描述' 
   ,`band_logo`                    string           COMMENT '品牌LOGO' 
   ,`status`                       tinyint          COMMENT '有效=1,无效=0' 
   ,`create_time`                  string           COMMENT '创建时间' 
   ,`update_time`                  string           COMMENT '修改时间' 
   ,`create_id`                    bigint           COMMENT '创建人ID' 
   ,`source_db`                    string           COMMENT '源数据库名' 
   ,`imported_time`                string           COMMENT '目标导入时间' 
   ,`inserted_time`                string           COMMENT '目标初次插入时间' 
   ,`updated_time`                 string           COMMENT '目标最后更新时间'                
) COMMENT '品牌表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd                                                          
STORED AS textfile;

2.ODS层建表规范

举例:

DROP TABLE IF EXISTS `ods`.`ods_ord_t_brand_df`;
CREATE TABLE IF NOT EXISTS `ods`.`ods_ord_t_brand_df`(
    `id`                           bigint           COMMENT '主键' 
   ,`brand_china_name`             string           COMMENT '品牌名称(中文)' 
   ,`brand_english_name`           string           COMMENT '品牌名称(英文)' 
   ,`brand_desc`                   string           COMMENT '品牌描述' 
   ,`band_logo`                    string           COMMENT '品牌LOGO' 
   ,`status`                       tinyint          COMMENT '有效=1,无效=0' 
   ,`create_time`                  string           COMMENT '创建时间' 
   ,`update_time`                  string           COMMENT '修改时间' 
   ,`create_id`                    bigint           COMMENT '创建人ID' 
   ,`source_db`                    string           COMMENT '源数据库名' 
   ,`is_deleted`                   tinyint          COMMENT '是否(物理)删除' 
   ,`inserted_time`                string           COMMENT '目标初次插入时间' 
   ,`updated_time`                 string           COMMENT '目标最后更新时间'                
) COMMENT '品牌表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd                                                          
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");

3.DIM层建表规范

  1. 如果是拉链表插入一条默认记录

举例:

DROP TABLE IF EXISTS `dim`.`dim_sal_commodity_df`;
CREATE TABLE IF NOT EXISTS `dim`.`dim_sal_commodity_df`(
    `commodity_id`                 bigint           COMMENT '主键' 
   ,`brand_id`                     bigint           COMMENT '品牌ID' 
   ,`brand_cn_name`                string           COMMENT '品牌中文名称' 
   ,`commodity_no`                 string           COMMENT '产品代码' 
   ,`commodity_name`               string           COMMENT '产品名称' 
   ,`commodity_desc`               string           COMMENT '产品描述' 
   ,`commodity_color`              string           COMMENT '颜色' 
   ,`commodity_length`             decimal(18,4)    COMMENT '长' 
   ,`commodity_width`              decimal(18,4)    COMMENT '宽' 
   ,`commodity_height`             decimal(18,4)    COMMENT '高' 
   ,`commodity_weight`             decimal(18,4)    COMMENT '重量' 
   ,`retail_price`                 decimal(18,4)    COMMENT '零售价' 
   ,`cost_price`                   decimal(18,4)    COMMENT '成本价' 
   ,`launch_date`                  string           COMMENT '上市日期' 
   ,`delisting_date`               string           COMMENT '下士日期' 
   ,`origin`                       string           COMMENT '产地' 
   ,`create_id`                    bigint           COMMENT '操作人ID' 
   ,`update_id`                    bigint           COMMENT '更新人ID' 
   ,`create_time`                  string           COMMENT '创建时间' 
   ,`update_time`                  string           COMMENT '更新时间' 
   ,`is_deleted`                   tinyint          COMMENT '是否(物理)删除' 
   ,`is_current`                   tinyint          COMMENT '是否当前版本' 
   ,`start_date`                   string           COMMENT '开始使用日期' 
   ,`end_date`                     string           COMMENT '结束日期' 
   ,`inserted_time`                string           COMMENT '目标初次插入时间' 
   ,`updated_time`                 string           COMMENT '目标最后更新时间'                
) COMMENT '商品维表'                                                          
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");

INSERT INTO  `dim`.`dim_sal_commodity_df` (
     `commodity_id`
    ,`brand_id`
    ,`brand_cn_name`
    ,`commodity_no`
    ,`commodity_name`
    ,`commodity_desc`
    ,`commodity_color`
    ,`commodity_length`
    ,`commodity_width`
    ,`commodity_height`
    ,`commodity_weight`
    ,`retail_price`
    ,`cost_price`
    ,`launch_date`
    ,`delisting_date`
    ,`origin`
    ,`create_id`
    ,`update_id`
    ,`create_time`
    ,`update_time` 
    ,`is_deleted`           
    ,`is_current`           
    ,`start_date`           
    ,`end_date`             
    ,`inserted_time`        
    ,`updated_time`         
) VALUES 
(
     -99
    ,-99
    ,'None'
    ,'None'
    ,'None'
    ,'None'
    ,'None'
    ,0
    ,0
    ,0
    ,0
    ,0
    ,0
    ,'1970-01-01'
    ,'1970-01-01'
    ,'None'
    ,-99
    ,-99
    ,'1970-01-01 00:00:00'
    ,'1970-01-01 00:00:00'
    ,0
    ,1
    ,'1970-01-01'
    ,'9999-12-31'
    ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')
    ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')
);

4.DWD层建表规范

举例:

DROP TABLE IF EXISTS `dwd`.`dwd_sal_trd_ord_order_df`;
CREATE TABLE IF NOT EXISTS `dwd`.`dwd_sal_trd_ord_order_df`(
    `order_id`                     bigint           COMMENT '主键id' 
   ,`commodity_id`                 bigint           COMMENT '商品id' 
   ,`user_id`                      bigint           COMMENT 'C端用户ID' 
   ,`order_status_code`            bigint           COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' 
   ,`order_status_name`            string           COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' 
   ,`pay_type_code`                bigint           COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' 
   ,`pay_type_name`                string           COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' 
   ,`pay_status_code`              bigint           COMMENT '支付状态(1=待付款,2=已付款)' 
   ,`pay_status_name`              string           COMMENT '支付状态(1=待付款,2=已付款)' 
   ,`total_amt`                    decimal(18,4)    COMMENT '订单总金额(明细分摊之后的汇总)' 
   ,`total_qty`                    decimal(18,4)    COMMENT '商品总件数' 
   ,`create_id`                    bigint           COMMENT '创建者' 
   ,`create_time`                  string           COMMENT '创建日期' 
   ,`update_id`                    bigint           COMMENT '更新者' 
   ,`update_time`                  string           COMMENT '更新时间' 
   ,`is_deleted`                   tinyint          COMMENT '是否(物理)删除' 
   ,`inserted_time`                string           COMMENT '目标初次插入时间' 
   ,`updated_time`                 string           COMMENT '目标最后更新时间'                
) COMMENT '订单表'                                                          
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");

5.DWS层建表规范

举例:

DROP TABLE IF EXISTS `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d`;
CREATE TABLE IF NOT EXISTS `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d`(
    `shop_id`                         bigint           COMMENT '门店ID' 
   ,`member_id`                       bigint           COMMENT '会员ID' 
   ,`commodity_id`                    bigint           COMMENT '商品ID' 
   ,`source_type_code`                tinyint          COMMENT '订单来源(1线上,2线下)' 
   ,`source_type_name`                string           COMMENT '订单来源:1-APP,2-POS,3-小程序,4-饿了么,5-鲜食APP,6-团购,7-京东到家' 
   ,`order_id`                        bigint           COMMENT '源订单ID' 
   ,`return_order_id`                 bigint           COMMENT '退货订单ID' 
   ,`return_order_no`                 string           COMMENT '退货订单编号' 
   ,`delivery_retn_amt_1d_ai`         decimal(18,4)    COMMENT '最近1天配送成功退货总金额' 
   ,`delivery_retn_cmd_cnt_1d_ai`     decimal(18,4)    COMMENT '最近1天配送成功退货商品数量' 
   ,`origin_sale_amt_1d_ai`           decimal(18,4)    COMMENT '最近1天原价金额' 
   ,`trade_amt_1d_ai`                 decimal(18,4)    COMMENT '最近1天实收总金额' 
   ,`real_delivery_amt_1d_ai`         decimal(18,4)    COMMENT '最近1天实发总金额' 
   ,`inserted_time`                   string           COMMENT '目标初次插入时间' 
   ,`updated_time`                    string           COMMENT '目标最后更新时间'                
) COMMENT '最近一天按退货完成日期线上退货订单汇总报表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd                                                          
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");

6.ADS层建表规范

举例:

DROP TABLE IF EXISTS `ads`.`ads_sal_trd_pay_date_order_sum_1d`;
CREATE TABLE IF NOT EXISTS `ads`.`ads_sal_trd_pay_date_order_sum_1d`(
    `report_date`                  string           COMMENT '统计日期' 
   ,`commodity_id`                 bigint           COMMENT '商品id' 
   ,`user_id`                      bigint           COMMENT 'C端用户ID' 
   ,`order_status_code`            tinyint          COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' 
   ,`order_status_name`            string           COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' 
   ,`pay_type_code`                tinyint          COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' 
   ,`pay_type_name`                string           COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' 
   ,`pay_status_code`              tinyint          COMMENT '支付状态(1=待付款,2=已付款)' 
   ,`pay_status_name`              string           COMMENT '支付状态(1=待付款,2=已付款)' 
   ,`total_amt_1d_ai`              decimal(18,4)    COMMENT '订单总金额(明细分摊之后的汇总)' 
   ,`total_qty_1d_ai`              decimal(18,4)    COMMENT '商品总件数' 
   ,`inserted_time`                string           COMMENT '目标初次插入时间' 
   ,`updated_time`                 string           COMMENT '目标最后更新时间'                
) COMMENT '最近1天按支付日期订单汇总报表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd                                                          
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");

7.DAS层建表规范

  1. 表名和ADS层表名保持一致
  2. 字段顺序和ADS层保持一致
  3. 排序字段按照业务主键进行设置
  4. 数据生命周期按照业务需求设置

举例:

DROP TABLE IF EXISTS `das`.`ads_sal_trd_pay_date_order_sum_1d`;
CREATE TABLE IF NOT EXISTS  `das`.`ads_sal_trd_pay_date_order_sum_1d`(
    `report_date`                  Date             COMMENT '统计日期' 
   ,`commodity_id`                 Int64            COMMENT '商品id' 
   ,`user_id`                      Int64            COMMENT 'C端用户ID' 
   ,`order_status_code`            Int8             COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' 
   ,`order_status_name`            String           COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' 
   ,`pay_type_code`                Int8             COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' 
   ,`pay_type_name`                String           COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' 
   ,`pay_status_code`              Int8             COMMENT '支付状态(1=待付款,2=已付款)' 
   ,`pay_status_name`              String           COMMENT '支付状态(1=待付款,2=已付款)' 
   ,`total_amt_1d_ai`              Decimal(18,4)    COMMENT '订单总金额(明细分摊之后的汇总)' 
   ,`total_qty_1d_ai`              Decimal(18,4)    COMMENT '商品总件数' 
   ,`inserted_time`                DateTime         COMMENT '目标初次插入时间' 
   ,`updated_time`                 DateTime         COMMENT '目标最后更新时间'  
)      
ENGINE = MergeTree()
ORDER BY (`report_date`,`commodity_id`,`user_id`)
PARTITION BY toYYYYMMDD(`report_date`) 
TTL report_date + INTERVAL 765 DAY; 

六、代码规范

通用规范:

  1. 保证代码可重复执行,而不影响结果
  2. 每层的源表只能是上一层的表,不能跨层查询。
  3. 表名前加上数据库名
  4. 数据库名、表名、字段名必须带上反引号
  5. 数据库关键字大写
  6. 分区名以参数的形式配置
  7. 指定队列名称
  8. 添加以下注释:作者、创建日期、功能描述、目标表、源表、修改日志信息(修改日期、修改人、修改内容)

1.STG层代码规范

  1. 源数据库和目标数据库以参数的形式配置
  2. STG层采用二种同步方式:

        增量同步:如果表的数据量比较大,并且有更新时间戳
        全量同步:如果表的数据量比较小或者缺少更新时间戳

举例:

增量脚本:

{
    "job": {
     "setting": {
            "speed": {
                "channel":4
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                     "username": "$USER_NAME", 
                     "password": "$PASSWORD", 
                        "connection": [
                            {
                                "jdbcUrl": ["$JDBCURL"], 
                                "querySql": [
                                    "
                        select 
                           `id` 
                          ,`commodity_code` 
                          ,`commodity_name` 
                          ,`commodity_describe` 
                          ,`brand_id` 
                          ,`retail_price` 
                          ,`cost_price` 
                          ,`launch_date` 
                          ,`delisting_date` 
                          ,`commodity_color` 
                          ,`create_time` 
                          ,`update_time` 
                          ,`origin` 
                          ,`length` 
                          ,`width` 
                          ,`height` 
                          ,`weight` 
                          ,`create_id` 
                          ,`update_id` 
                          ,`status`     
													,'order'
													,now()
													,now()
													,now()   
													from t_commodity  
													 WHERE update_time  BETWEEN 
										             str_to_date( '$START_TIME', '%Y-%m-%d %H:%i:%s' ) AND str_to_date( '$END_TIME', '%Y-%m-%d %H:%i:%s' )
                                     "
                                ]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                                    {
                                            "name": "id",
                                            "type": "bigint"
                                    },
                                    {
                                            "name": "commodity_code",
                                            "type": "string"
                                    },
                                    {
                                            "name": "commodity_name",
                                            "type": "string"
                                    },
                                    {
                                            "name": "commodity_describe",
                                            "type": "string"
                                    },
                                    {
                                            "name": "brand_id",
                                            "type": "bigint"
                                    },
                                    {
                                            "name": "retail_price",
                                            "type": "double"
                                    },
                                    {
                                            "name": "cost_price",
                                            "type": "double"
                                    },
                                    {
                                            "name": "launch_date",
                                            "type": "string"
                                    },
                                    {
                                            "name": "delisting_date",
                                            "type": "string"
                                    },
                                    {
                                            "name": "commodity_color",
                                            "type": "string"
                                    },
                                    {
                                            "name": "create_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "update_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "origin",
                                            "type": "string"
                                    },
                                    {
                                            "name": "length",
                                            "type": "double"
                                    },
                                    {
                                            "name": "width",
                                            "type": "double"
                                    },
                                    {
                                            "name": "height",
                                            "type": "double"
                                    },
                                    {
                                            "name": "weight",
                                            "type": "double"
                                    },
                                    {
                                            "name": "create_id",
                                            "type": "bigint"
                                    },
                                    {
                                            "name": "update_id",
                                            "type": "bigint"
                                    },
                                    {
                                            "name": "status",
                                            "type": "tinyint"
                                    },
                                    {
                                            "name": "source_db",
                                            "type": "string"
                                    },
                                    {
                                            "name": "imported_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "inserted_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "updated_time",
                                            "type": "string"
                                    }
                        	], 
                        "hadoopConfig":{
                            "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                            "dfs.ha.namenodes.mycluster": "nn1,nn2",
                            "dfs.namenode.rpc-address.mycluster.nn1": "node101:8020",
                            "dfs.namenode.rpc-address.mycluster.nn2": "node102:8020",
                            "dfs.nameservices": "mycluster"
                           },                        	
                        "compress": "", 
                        "defaultFS": "$HDFS_URL", 
                        "fieldDelimiter": "\u0001", 
                        "fileName": "$TRGT_TABLE_NAME", 
                        "fileType": "text", 
                        "path": "${HIVE_DB_DIR}stg.db/$TRGT_TABLE_NAME/dt=$PARTITION", 
                        "writeMode": "truncate"
                    }
                }
            }
        ], 
       
    }
}

全量脚本:

{
    "job": {
     "setting": {
            "speed": {
                "channel":4
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                     "username": "$USER_NAME", 
                     "password": "$PASSWORD", 
                     "splitPk": "id",
                        "column": [
                           "id" 
                          ,"brand_china_name" 
                          ,"brand_english_name" 
                          ,"brand_desc" 
                          ,"band_logo" 
                          ,"status" 
                          ,"create_time" 
                          ,"update_time" 
                          ,"create_id"        
													,"'order'"
													,'now()'
													,'now()'
													,'now()'            
                        	], 
                        "connection": [
                            {
                                "jdbcUrl": ["$JDBCURL"], 
                                "table": ["t_brand"]
                            }
                        ]
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                                    {
                                            "name": "id",
                                            "type": "bigint"
                                    },
                                    {
                                            "name": "brand_china_name",
                                            "type": "string"
                                    },
                                    {
                                            "name": "brand_english_name",
                                            "type": "string"
                                    },
                                    {
                                            "name": "brand_desc",
                                            "type": "string"
                                    },
                                    {
                                            "name": "band_logo",
                                            "type": "string"
                                    },
                                    {
                                            "name": "status",
                                            "type": "tinyint"
                                    },
                                    {
                                            "name": "create_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "update_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "create_id",
                                            "type": "bigint"
                                    },
                                    {
                                            "name": "source_db",
                                            "type": "string"
                                    },
                                    {
                                            "name": "imported_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "inserted_time",
                                            "type": "string"
                                    },
                                    {
                                            "name": "updated_time",
                                            "type": "string"
                                    }
                        	], 
                        "hadoopConfig":{
                            "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                            "dfs.ha.namenodes.mycluster": "nn1,nn2",
                            "dfs.namenode.rpc-address.mycluster.nn1": "node101:8020",
                            "dfs.namenode.rpc-address.mycluster.nn2": "node102:8020",
                            "dfs.nameservices": "mycluster"
                           },
                        "defaultFS": "$HDFS_URL", 
                        "fieldDelimiter": "\u0001", 
                        "fileName": "$TRGT_TABLE_NAME", 
                        "fileType": "text", 
                        "path": "${HIVE_DB_DIR}stg.db/$TRGT_TABLE_NAME/dt=$PARTITION", 
                        "writeMode": "truncate"
                    }
                }
            }
        ], 

    }
}

2.ODS层代码规范

ODS层采用二种同步方式:

  • 全量覆盖:如果数据删除比较频繁,且只用当前的数据,如果数据合并会造成大量重复无用的数据
  • 数据合并:一般情况下用数据合并

举例:

全量覆盖:

-- ================================================
-- 作者:      Administrator
-- 创建日期:  2024-06-11
-- 功能描述:  同步ods层品牌表
-- 目标表:    ods_ord_t_brand_df
-- 源表:      stg_ord_t_brand_df
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `ods`.`ods_ord_t_brand_df` PARTITION (`dt`='${PARTITITON}')
SELECT 	 
		  t1.`id`                                                                                           AS `id` 
		 ,t1.`brand_china_name`                                                                             AS `brand_china_name` 
		 ,t1.`brand_english_name`                                                                           AS `brand_english_name` 
		 ,t1.`brand_desc`                                                                                   AS `brand_desc` 
		 ,t1.`band_logo`                                                                                    AS `band_logo` 
		 ,t1.`status`                                                                                       AS `status` 
		 ,t1.`create_time`                                                                                  AS `create_time` 
		 ,t1.`update_time`                                                                                  AS `update_time` 
		 ,t1.`create_id`                                                                                    AS `create_id`  
		 ,t1.`source_db`                                                                                    AS `is_deleted`         
		 ,0                                                                                                 AS `is_deleted`
		 ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                             AS `inserted_time`
		 ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                             AS `updated_time`
FROM `stg`.`stg_ord_t_brand_df` t1 WHERE `dt`='${PARTITITON}';

数据合并:

-- ================================================
-- 作者:      Administrator
-- 创建日期:  2024-06-11
-- 功能描述:  同步ods层品牌表
-- 目标表:    ods_ord_t_commodity_df
-- 源表:      stg_ord_t_commodity_di
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `ods`.`ods_ord_t_commodity_df` PARTITION (`dt`='${PARTITITON}')
SELECT 
		  COALESCE(t2.`id`,t1.`id`)                                                                         AS `id` 
		 ,COALESCE(t2.`commodity_code`,t1.`commodity_code`)                                                 AS `commodity_code` 
		 ,COALESCE(t2.`commodity_name`,t1.`commodity_name`)                                                 AS `commodity_name` 
		 ,COALESCE(t2.`commodity_describe`,t1.`commodity_describe`)                                         AS `commodity_describe` 
		 ,COALESCE(t2.`brand_id`,t1.`brand_id`)                                                             AS `brand_id` 
		 ,COALESCE(t2.`retail_price`,t1.`retail_price`)                                                     AS `retail_price` 
		 ,COALESCE(t2.`cost_price`,t1.`cost_price`)                                                         AS `cost_price` 
		 ,COALESCE(t2.`launch_date`,t1.`launch_date`)                                                       AS `launch_date` 
		 ,COALESCE(t2.`delisting_date`,t1.`delisting_date`)                                                 AS `delisting_date` 
		 ,COALESCE(t2.`commodity_color`,t1.`commodity_color`)                                               AS `commodity_color` 
		 ,COALESCE(t2.`create_time`,t1.`create_time`)                                                       AS `create_time` 
		 ,COALESCE(t2.`update_time`,t1.`update_time`)                                                       AS `update_time` 
		 ,COALESCE(t2.`origin`,t1.`origin`)                                                                 AS `origin` 
		 ,COALESCE(t2.`length`,t1.`length`)                                                                 AS `length` 
		 ,COALESCE(t2.`width`,t1.`width`)                                                                   AS `width` 
		 ,COALESCE(t2.`height`,t1.`height`)                                                                 AS `height` 
		 ,COALESCE(t2.`weight`,t1.`weight`)                                                                 AS `weight` 
		 ,COALESCE(t2.`create_id`,t1.`create_id`)                                                           AS `create_id` 
		 ,COALESCE(t2.`update_id`,t1.`update_id`)                                                           AS `update_id` 
		 ,COALESCE(t2.`status`,t1.`status`)                                                                 AS `status`
		 ,COALESCE(t2.`source_db`,t1.`source_db`)                                                           AS `source_db`         
		 ,COALESCE(t1.`is_deleted`,  0)                                                                     AS `is_deleted`
		 ,COALESCE(t1.`inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss'))                AS `inserted_time`
		 ,CASE  WHEN t2.`update_time` >t1.`update_time` 
		        THEN DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') 
		        ELSE COALESCE(t1.`updated_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss'))  
		 END                                                                                                AS `updated_time`
FROM 
  (SELECT * FROM  `ods`.`ods_ord_t_commodity_df` WHERE `dt`='${PRE_PARTITITON}')   t1  
FULL JOIN (SELECT * FROM `stg`.`stg_ord_t_commodity_di` WHERE `dt`='${PARTITITON}') t2
ON t1.`id`=t2.`id`;

3.DIM层代码规范

  1. 保证每个字段没有空值,如果有按默认值填充
  2. DIM层采用二种同步方式:
  • 全量覆盖:如果只用当前的数据,历史数据无效
  • 拉链表:一般情况下用拉链表

举例:
全量覆盖:

-- ================================================
-- 作者:      Administrator
-- 创建日期:  2024-06-11
-- 功能描述:  同步dim层品牌维表
-- 目标表:    dim_sal_brand_df
-- 源表:      ods_ord_t_brand_df
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `dim`.`dim_sal_brand_df`
SELECT  
		  defaultv(t1.`id`,-99)                                                                             AS `brand_id` 
		 ,defaultv(t1.`brand_china_name`,'None')                                                            AS `brand_cn_name` 
		 ,defaultv(t1.`brand_english_name`,'None')                                                          AS `brand_en_name` 
		 ,defaultv(t1.`brand_desc`,'None')                                                                  AS `brand_desc` 
		 ,defaultv(t1.`band_logo`,'None')                                                                   AS `band_logo` 
		 ,defaultv(t1.`status`,-99)                                                                         AS `band_status` 
		 ,defaultv(t1.`create_time`,'1970-01-01 00:00:00')                                                  AS `create_time` 
		 ,defaultv(t1.`update_time`,'1970-01-01 00:00:00')                                                  AS `update_time` 
		 ,defaultv(t1.`create_id`,-99)                                                                      AS `create_id`      
		 ,0                                                                                                 AS `is_deleted`
		 ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                             AS `inserted_time`
		 ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                             AS `updated_time`
FROM  `ods`.`ods_ord_t_brand_df` t1
WHERE t1.`dt` = '${PARTITITON}';

拉链表:

-- ================================================
-- 作者:      Administrator
-- 创建日期:  2024-06-11
-- 功能描述:  同步dim层商品维表
-- 目标表:    dim_sal_commodity_df
-- 源表:      ods_ord_t_commodity_df,ods_ord_t_brand_df
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
WITH `tmp_dim_sal_commodity_df` AS 
  (SELECT 
		  COALESCE(t2.`commodity_id`,t1.`commodity_id`)                                                     AS `commodity_id` 
		 ,COALESCE(t2.`brand_id`,t1.`brand_id`)                                                             AS `brand_id` 
		 ,COALESCE(t2.`brand_cn_name`,t1.`brand_cn_name`)                                                   AS `brand_cn_name` 
		 ,COALESCE(t2.`commodity_no`,t1.`commodity_no`)                                                     AS `commodity_no` 
		 ,COALESCE(t2.`commodity_name`,t1.`commodity_name`)                                                 AS `commodity_name` 
		 ,COALESCE(t2.`commodity_desc`,t1.`commodity_desc`)                                                 AS `commodity_desc` 
		 ,COALESCE(t2.`commodity_color`,t1.`commodity_color`)                                               AS `commodity_color` 
		 ,COALESCE(t2.`commodity_length`,t1.`commodity_length`)                                             AS `commodity_length` 
		 ,COALESCE(t2.`commodity_width`,t1.`commodity_width`)                                               AS `commodity_width` 
		 ,COALESCE(t2.`commodity_height`,t1.`commodity_height`)                                             AS `commodity_height` 
		 ,COALESCE(t2.`commodity_weight`,t1.`commodity_weight`)                                             AS `commodity_weight` 
		 ,COALESCE(t2.`retail_price`,t1.`retail_price`)                                                     AS `retail_price` 
		 ,COALESCE(t2.`cost_price`,t1.`cost_price`)                                                         AS `cost_price` 
		 ,COALESCE(t2.`launch_date`,t1.`launch_date`)                                                       AS `launch_date` 
		 ,COALESCE(t2.`delisting_date`,t1.`delisting_date`)                                                 AS `delisting_date` 
		 ,COALESCE(t2.`origin`,t1.`origin`)                                                                 AS `origin` 
		 ,COALESCE(t2.`create_id`,t1.`create_id`)                                                           AS `create_id` 
		 ,COALESCE(t2.`update_id`,t1.`update_id`)                                                           AS `update_id` 
		 ,COALESCE(t2.`create_time`,t1.`create_time`)                                                       AS `create_time` 
		 ,COALESCE(t2.`update_time`,t1.`update_time`)                                                       AS `update_time`  
		 ,COALESCE(t2.`is_deleted`,  t1.`is_deleted`)                                                       AS `is_deleted`                                                                                                 
		 ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                             AS `inserted_time`
		 ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                             AS `updated_time`
		 ,CASE  
		  WHEN  t1.`commodity_id` IS NULL                 THEN 'insert'     
		  ELSE 'update'
		  END                                                                                               AS `flag`
 FROM 
  (SELECT * FROM `dim`.`dim_sal_commodity_df` WHERE `is_current` =1)   t1  
 FULL JOIN (SELECT          
		           defaultv(t1.`id`,-99)                                                                    AS `commodity_id` 
		          ,defaultv(t1.`brand_id`,-99)                                                              AS `brand_id` 
		          ,defaultv(t2.`brand_china_name`,'None')                                                   AS `brand_cn_name` 
		          ,defaultv(t1.`commodity_code`,'None')                                                     AS `commodity_no` 
		          ,defaultv(t1.`commodity_name`,'None')                                                     AS `commodity_name` 
		          ,defaultv(t1.`commodity_describe`,'None')                                                 AS `commodity_desc` 
		          ,defaultv(t1.`commodity_color`,'None')                                                    AS `commodity_color` 
		          ,defaultv(t1.`length`,0)                                                                  AS `commodity_length` 
		          ,defaultv(t1.`width`,0)                                                                   AS `commodity_width` 
		          ,defaultv(t1.`height`,0)                                                                  AS `commodity_height` 
		          ,defaultv(t1.`weight`,0)                                                                  AS `commodity_weight` 
		          ,defaultv(t1.`retail_price`,0)                                                            AS `retail_price` 
		          ,defaultv(t1.`cost_price`,0)                                                              AS `cost_price` 
		          ,defaultv(t1.`launch_date`,'1970-01-01')                                                  AS `launch_date` 
		          ,defaultv(t1.`delisting_date`,'1970-01-01')                                               AS `delisting_date` 
		          ,defaultv(t1.`origin`,'None')                                                             AS `origin` 
		          ,defaultv(t1.`create_id`,-99)                                                             AS `create_id` 
		          ,defaultv(t1.`update_id`,-99)                                                             AS `update_id` 
		          ,defaultv(t1.`create_time`,'1970-01-01 00:00:00')                                         AS `create_time` 
		          ,defaultv(t1.`update_time`,'1970-01-01 00:00:00')                                         AS `update_time`          
              ,t1.`is_deleted` 
              ,t1.`inserted_time`                             
              ,t1.`updated_time` 
            FROM `ods`.`ods_ord_t_commodity_df` t1 
            LEFT JOIN `ods`.`ods_ord_t_brand_df` t2 ON   t1.`brand_id`=t2.`id`  AND t2.`dt` ='${PARTITITON}'  
            WHERE t1.`dt`='${PARTITITON}'
	 ) t2
 ON t1.`commodity_id`=t2.`commodity_id` 
 WHERE MD5(CONCAT(                                 
       t2.`brand_id` 
      ,t2.`brand_cn_name` 
      ,t2.`commodity_no` 
      ,t2.`commodity_name` 
      ,t2.`commodity_desc` 
      ,t2.`commodity_color` 
      ,t2.`commodity_length` 
      ,t2.`commodity_width` 
      ,t2.`commodity_height` 
      ,t2.`commodity_weight` 
      ,t2.`retail_price` 
      ,t2.`cost_price` 
      ,t2.`launch_date` 
      ,t2.`delisting_date` 
      ,t2.`origin` 
      ,t2.`create_id` 
      ,t2.`update_id` 
      ,t2.`create_time` 
      ,t2.`update_time`                                                      
      ,t2.`is_deleted`
)) <> MD5(CONCAT(                                
       t1.`brand_id` 
      ,t1.`brand_cn_name` 
      ,t1.`commodity_no` 
      ,t1.`commodity_name` 
      ,t1.`commodity_desc` 
      ,t1.`commodity_color` 
      ,t1.`commodity_length` 
      ,t1.`commodity_width` 
      ,t1.`commodity_height` 
      ,t1.`commodity_weight` 
      ,t1.`retail_price` 
      ,t1.`cost_price` 
      ,t1.`launch_date` 
      ,t1.`delisting_date` 
      ,t1.`origin` 
      ,t1.`create_id` 
      ,t1.`update_id` 
      ,t1.`create_time` 
      ,t1.`update_time`                                                      
      ,t1.`is_deleted`
)) OR t1.`commodity_id` IS NULL)

INSERT OVERWRITE TABLE `dim`.`dim_sal_commodity_df`
SELECT * FROM
(
    SELECT 
         t1.`commodity_id` 
        ,t1.`brand_id` 
        ,t1.`brand_cn_name` 
        ,t1.`commodity_no` 
        ,t1.`commodity_name` 
        ,t1.`commodity_desc` 
        ,t1.`commodity_color` 
        ,t1.`commodity_length` 
        ,t1.`commodity_width` 
        ,t1.`commodity_height` 
        ,t1.`commodity_weight` 
        ,t1.`retail_price` 
        ,t1.`cost_price` 
        ,t1.`launch_date` 
        ,t1.`delisting_date` 
        ,t1.`origin` 
        ,t1.`create_id` 
        ,t1.`update_id` 
        ,t1.`create_time` 
        ,t1.`update_time`                                                      
        ,t1.`is_deleted`     
        ,CASE
                WHEN t1.`end_date` = '9999-12-31' AND t2.`commodity_id` IS NOT NULL THEN 0
                ELSE t1.`is_current`
         END AS `is_current`
        ,t1.`start_date` 
        ,CASE
                WHEN t1.`end_date` = '9999-12-31' AND t2.`commodity_id` IS NOT NULL   THEN DATE_FORMAT(DATE_ADD(CURRENT_TIMESTAMP, -1), 'yyyy-MM-dd') 
                ELSE t1.`end_date`
         END AS `end_date`
        ,t1.`inserted_time`                             
        ,t1.`updated_time`
    FROM `dim`.`dim_sal_commodity_df`   t1
    LEFT  JOIN (SELECT `commodity_id` FROM `tmp_dim_sal_commodity_df` WHERE `flag`='update')    t2
    ON t1.`commodity_id` = t2.`commodity_id`
		UNION ALL 
    SELECT  
         t1.`commodity_id` 
        ,t1.`brand_id` 
        ,t1.`brand_cn_name` 
        ,t1.`commodity_no` 
        ,t1.`commodity_name` 
        ,t1.`commodity_desc` 
        ,t1.`commodity_color` 
        ,t1.`commodity_length` 
        ,t1.`commodity_width` 
        ,t1.`commodity_height` 
        ,t1.`commodity_weight` 
        ,t1.`retail_price` 
        ,t1.`cost_price` 
        ,t1.`launch_date` 
        ,t1.`delisting_date` 
        ,t1.`origin` 
        ,t1.`create_id` 
        ,t1.`update_id` 
        ,t1.`create_time` 
        ,t1.`update_time`                                                      
        ,0 `is_deleted`				      
        ,1 `is_current`
        ,CASE WHEN `flag`='insert' THEN  '1970-01-01' ELSE DATE_FORMAT(DATE_ADD(CURRENT_TIMESTAMP, -1), 'yyyy-MM-dd')  END  AS `start_date`   
        ,'9999-12-31' AS `end_date`
        ,`inserted_time`                             
        ,`updated_time` 
    FROM `tmp_dim_sal_commodity_df` t1 
)   a;

4.DWD层代码规范

  1. 更新时间范围要以参数的形式配置
  2. 保证每个字段没有空值,如果有按默认值填充
  3. 关联大表时要加上时间范围条件而不能全表扫描

举例:

-- ================================================
-- 作者:      Administrator
-- 创建日期:  2024-06-11
-- 功能描述:  同步dwd层订单表
-- 目标表:    dwd_sal_trd_ord_order_df
-- 源表:       ods_ord_t_order_df.,dim_pub_dictionary
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `dwd`.`dwd_sal_trd_ord_order_df` 
SELECT 
		  COALESCE(t2.`order_id`,t1.`order_id`)                                                             AS `order_id` 
		 ,COALESCE(t2.`commodity_id`,t1.`commodity_id`)                                                     AS `commodity_id` 
		 ,COALESCE(t2.`user_id`,t1.`user_id`)                                                               AS `user_id` 
		 ,COALESCE(t2.`order_status_code`,t1.`order_status_code`)                                           AS `order_status_code` 
		 ,COALESCE(t2.`order_status_name`,t1.`order_status_name`)                                           AS `order_status_name` 
		 ,COALESCE(t2.`pay_type_code`,t1.`pay_type_code`)                                                   AS `pay_type_code` 
		 ,COALESCE(t2.`pay_type_name`,t1.`pay_type_name`)                                                   AS `pay_type_name` 
		 ,COALESCE(t2.`pay_status_code`,t1.`pay_status_code`)                                               AS `pay_status_code` 
		 ,COALESCE(t2.`pay_status_name`,t1.`pay_status_name`)                                               AS `pay_status_name` 
		 ,COALESCE(t2.`total_amt`,t1.`total_amt`)                                                           AS `total_amt` 
		 ,COALESCE(t2.`total_qty`,t1.`total_qty`)                                                           AS `total_qty` 
		 ,COALESCE(t2.`create_id`,t1.`create_id`)                                                           AS `create_id` 
		 ,COALESCE(t2.`create_time`,t1.`create_time`)                                                       AS `create_time` 
		 ,COALESCE(t2.`update_id`,t1.`update_id`)                                                           AS `update_id` 
		 ,COALESCE(t2.`update_time`,t1.`update_time`)                                                       AS `update_time`
		 ,COALESCE(t2.`is_deleted`, t1.`is_deleted`)                                                        AS `is_deleted`
		 ,COALESCE(t1.`inserted_time`, t2.`inserted_time`)                                                  AS `inserted_time`
		 ,COALESCE(t2.`updated_time`, t1.`updated_time`)                                                    AS `updated_time`
FROM 
  (SELECT * FROM  `dwd`.`dwd_sal_trd_ord_order_df`)   t1  
FULL JOIN (SELECT          
		           defaultv(t1.`id`,-99)                                                                    AS `order_id` 
		          ,defaultv(t1.`commodity_id`,-99)                                                          AS `commodity_id` 
		          ,defaultv(t1.`user_id`,-99)                                                               AS `user_id` 
		          ,defaultv(t1.`order_status`,-99)                                                          AS `order_status_code` 
		          ,defaultv(t2.`dic_name`,'None')                                                           AS `order_status_name` 
		          ,defaultv(t1.`pay_type`,-99)                                                              AS `pay_type_code` 
		          ,defaultv(t2.`dic_name`,'None')                                                           AS `pay_type_name` 
		          ,defaultv(t1.`pay_status`,-99)                                                            AS `pay_status_code` 
		          ,defaultv(t1.`pay_status`,'None')                                                         AS `pay_status_name` 
		          ,defaultv(t1.`total_amount`,0)                                                            AS `total_amt` 
		          ,defaultv(t1.`total_quantity`,0)                                                          AS `total_qty` 
		          ,defaultv(t1.`create_id`,-99)                                                             AS `create_id` 
		          ,defaultv(t1.`create_time`,'1970-01-01 00:00:00')                                         AS `create_time` 
		          ,defaultv(t1.`update_id`,-99)                                                             AS `update_id` 
		          ,defaultv(t1.`update_time`,'1970-01-01 00:00:00')                                         AS `update_time`          
              ,t1.`is_deleted` 
              ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                    AS `inserted_time`
              ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                    AS `updated_time`
            FROM `ods`.`ods_ord_t_order_df.` t1 
            LEFT JOIN `dim`.`dim_pub_dictionary` t2 ON     AND t2.`dt` ='${PARTITITON}'  
            WHERE t1.`dt`='${PARTITITON}'
            AND t1.`update_time` >=  DATE_FORMAT(${START_TIME}, 'yyyy-MM-dd')  AND  t1.`update_time` < DATE_FORMAT(${END_TIME}, 'yyyy-MM-dd')

	 ) t2
ON t1.`order_id`=t2.`order_id`;

5.DWS层代码规范

  1. 更新时间范围要以参数的形式配置
  2. 保证每个字段没有空值,如果有按默认值填充
  3. 可变指标和不可变指标不能放在同一个表里

举例:

-- ================================================
-- 作者:     Administrator
-- 创建日期: 2024-03-02
-- 功能描述: 最近一天按退货完成日期线上退货订单汇总报表
-- 目标表:   dws_sal_trd_complete_date_xj_retn_order_1d
-- 源表:     dwd_sal_trd_retn_ord_xj_order_df 
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d`
SELECT       
             t1.`shop_id`  
            ,t1.`member_id`
            ,t1.`commodity_id`  
            ,t1.`source_type_code`    
            ,t1.`source_type_name`   
            ,t1.`return_time`                                                                                                                                    AS `return_order_date`
            ,t1.`create_time`                                                                                                                                    AS `return_order_time`
            ,t1.`update_time`                                                                                                                                    AS `return_complete_date`
            ,t1.`order_id`
            ,t1.`return_order_id`
            ,t1.`return_order_no`
            ,SUM(t1.`commodity_price` * t1.`commodity_quantity`)                                                                                                 AS `origin_retn_sale_amt_1d_ai`
            ,SUM(t1.`commodity_sale_amt`)                                                                                                                        AS `trade_retn_amt_1d_ai`        
            ,SUM(t1.`commodity_real_amt`)                                                                                                                        AS `delivery_retn_amt_1d_ai` 
            ,SUM(t1.`commodity_real_quantity` )                                                                                                                  AS `delivery_retn_cmd_cnt_1d_ai` 
            ,SUM(CASE WHEN t1.`order_status_code` = 6 THEN t1.`commodity_real_amt` ELSE 0 END)                                                                   AS `commodity_real_amt_1d_ai`
            ,SUM(CASE WHEN t1.`order_status_code` = 6 THEN t1.`commodity_real_quantity` ELSE 0 END)                                                              AS `commodity_real_quantity_1d_ai`
            ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                                                                               AS `inserted_time`
            ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                                                                               AS `updated_time`
            ,DATE_FORMAT(t1.`update_time` , 'yyyy-MM-dd')                                                                                                        AS `dt`
FROM  `px_dwd`.`dwd_sal_trd_retn_ord_xj_order_df` t1 
WHERE 
  t1.`dt` ='9999-12-31' 
  AND t1.`is_deleted`=0 
  AND t1.`return_order_status_code`=7
  AND t1.`update_time` >= DATE_FORMAT(${START_TIME}, 'yyyy-MM-dd')  
  AND t1.`update_time`< DATE_FORMAT(${END_TIME}, 'yyyy-MM-dd')
GROUP BY 
             t1.`shop_id`  
            ,t1.`member_id`
            ,t1.`commodity_id` 
            ,t1.`return_time` 
            ,t1.`create_time` 
            ,t1.`update_time`  
            ,t2.`source_type_code`    
            ,t2.`source_type_name`
            ,t1.`order_id`
            ,t1.`return_order_id`
            ,t1.`return_order_no`
            ,DATE_FORMAT(t1.`update_time` , 'yyyy-MM-dd');

6.ADS层代码规范

  1. 更新时间范围要以参数的形式配置
  2. 保证每个字段没有空值,如果有按默认值填充
  3. 维度表字段根据需求取快照值或当前值

举例:

-- ================================================
-- 作者:     Administrator
-- 创建日期: 2024-04-16
-- 功能描述: 最近1天线上门店商品销售分析报表
-- 目标表:   ads_sal_trd_pay_retn_order_shop_item_1d
-- 源表:     dws_sal_trd_complete_date_xj_retn_order_1d,dim_sal_shop,dim_pub_commodity
-- 修改日志:  
-- 修改日期:         修改人:     修改内容:
-- ================================================

set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `px_ads`.`ads_sal_trd_pay_retn_order_shop_item_1d`
SELECT 
     t1.`dt`                                                                                    AS `report_date`
    ,t1.`shop_id`
    ,t2.`shop_no`
    ,t2.`shop_name`
    ,t1.`commodity_id`
    ,t3.`commodity_no`
    ,t3.`commodity_name`
    ,t1.`source_type_code`
    ,t1.`source_type_name`
    ,SUM(t2.`trade_amt_1d_ai`)                                                                  AS `trade_amt_1d_ai`
    ,SUM(t2.`gross_cmd_cnt_1d_ai`)                                                              AS `gross_cmd_cnt_1d_ai`
    ,SUM(t2.`pay_sale_amt_1d_ai`)                                                               AS `pay_sale_amt_1d_ai`
    ,SUM(t2.`pay_cmd_cnt_1d_ai`)                                                                AS `pay_cmd_cnt_1d_ai`
    ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                      AS `inserted_time`
    ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')                                      AS `updated_time`
    ,t1.`dt`
FROM `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d` t1
LEFT JOIN `px_dim`.`dim_sal_shop` t2 ON t1.`shop_id` = t2.`shop_id` AND t1.`dt`>= DATE_FORMAT(t2.`start_time`, 'yyyy-MM-dd' ) AND t1.`dt`< DATE_FORMAT(t2.`end_time`, 'yyyy-MM-dd' )
LEFT JOIN `px_dim`.`dim_pub_commodity` t3 ON t1.`commodity_id` = t3.`commodity_id` AND t3.`is_current` =1 
WHERE t1.`dt` >= DATE_FORMAT(${START_TIME}, 'yyyy-MM-dd') AND t1.`dt` < DATE_FORMAT(${END_TIME}, 'yyyy-MM-dd')
GROUP BY 
         t1.`shop_id`
        ,t2.`shop_no`
        ,t2.`shop_name`
        ,t1.`commodity_id`
        ,t3.`commodity_no`
        ,t3.`commodity_name`
        ,t1.`source_type_code`
        ,t1.`source_type_name`
        ,t1.`dt` 

 7.DAS表代码规范

  1. 源数据库和目标数据库以参数的形式配置

举例:

{
    "job": {
     "setting": {
            "speed": {
                "channel":4
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader", 
                    "parameter": {
                     "path": "${HIVE_DB_DIR}ads.db/$TRGT_TABLE_NAME/dt=$PARTITION",
                     "hadoopConfig":{
                            "dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
                            "dfs.ha.namenodes.mycluster": "nn1,nn2",
                            "dfs.namenode.rpc-address.mycluster.nn1": "node101:8020",
                            "dfs.namenode.rpc-address.mycluster.nn2": "node102:8020",
                            "dfs.nameservices": "mycluster"
                           },
                     "defaultFS": "$HDFS_URL",
                        "column": [
                                    {
                                            "index": "0",
                                            "type": "string"
                                    },
                                    {
                                            "index": "1",
                                            "type": "long"
                                    },
                                    {
                                            "index": "2",
                                            "type": "long"
                                    },
                                    {
                                            "index": "3",
                                            "type": "long"
                                    },
                                    {
                                            "index": "4",
                                            "type": "string"
                                    },
                                    {
                                            "index": "5",
                                            "type": "long"
                                    },
                                    {
                                            "index": "6",
                                            "type": "string"
                                    },
                                    {
                                            "index": "7",
                                            "type": "long"
                                    },
                                    {
                                            "index": "8",
                                            "type": "string"
                                    },
                                    {
                                            "index": "9",
                                            "type": "double"
                                    },
                                    {
                                            "index": "10",
                                            "type": "double"
                                    },
                                    {
                                            "index": "11",
                                            "type": "string"
                                    },
                                    {
                                            "index": "12",
                                            "type": "string"
                                    }
						          ],
                         "fileType": "orc",      
                         "encoding": "UTF-8",    
                         "fieldDelimiter": "\u0001"   

                    }
                }, 
                "writer": {
                    "name": "clickhousewriter", 
                    "parameter": {
                    	"username": "$USER_NAME",
                      "password": "$PASSWORD",
                        "column": [
                           "report_date" 
                          ,"commodity_id" 
                          ,"user_id" 
                          ,"order_status_code" 
                          ,"order_status_name" 
                          ,"pay_type_code" 
                          ,"pay_type_name" 
                          ,"pay_status_code" 
                          ,"pay_status_name" 
                          ,"total_amt" 
                          ,"total_qty" 
                          ,"inserted_time" 
                          ,"updated_time"	 										
                        	], 
                        "preSql": ["ALTER TABLE $TRGT_TABLE_NAME DROP PARTITION '$CK_PARTITION'"],
                        "connection": [
                            {
                                "jdbcUrl":"$JDBCURL", 
                                "table": ["$TRGT_TABLE_NAME"]
                            }
                        ]
                    }
                }
            }
        ], 
       
    }
}

 

相关推荐

  1. 数据建设实践数据规范

    2024-07-11 19:50:03       21 阅读
  2. 数仓实践数据仓库建设公共规范指南

    2024-07-11 19:50:03       18 阅读
  3. 数据建设实践数据平台(七)

    2024-07-11 19:50:03       25 阅读
  4. 数据建设实践数据平台(二)

    2024-07-11 19:50:03       23 阅读
  5. 数据建设实践数据平台(四)安装mysql

    2024-07-11 19:50:03       22 阅读
  6. 数据建设实践数据平台(五)安装hive

    2024-07-11 19:50:03       22 阅读
  7. 数据治理体系建设

    2024-07-11 19:50:03       55 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-11 19:50:03       66 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-11 19:50:03       70 阅读
  3. 在Django里面运行非项目文件

    2024-07-11 19:50:03       57 阅读
  4. Python语言-面向对象

    2024-07-11 19:50:03       68 阅读

热门阅读

  1. Google订阅补坑

    2024-07-11 19:50:03       23 阅读
  2. 低代码开发在金融系统中的应用研究

    2024-07-11 19:50:03       19 阅读
  3. conda 创建新的虚拟环境报错

    2024-07-11 19:50:03       21 阅读
  4. C++处理json数据注意点(url传递接收json数据)

    2024-07-11 19:50:03       20 阅读
  5. Windows批处理指令与Shell的关系

    2024-07-11 19:50:03       18 阅读
  6. 模电基础 - 直流电源

    2024-07-11 19:50:03       23 阅读
  7. Python魔法函数(Magic Methods简介

    2024-07-11 19:50:03       23 阅读
  8. C语言 输出n阶魔方阵

    2024-07-11 19:50:03       24 阅读
  9. ARM/Linux嵌入式面经(十一):地平线嵌入式实习

    2024-07-11 19:50:03       22 阅读
  10. xss攻击

    2024-07-11 19:50:03       22 阅读
  11. Rust简明教程第六章-错误处理&生命周期

    2024-07-11 19:50:03       25 阅读
  12. 【Django】Django 使用连接串配置数据库

    2024-07-11 19:50:03       22 阅读
  13. Sass 和 SCSS

    2024-07-11 19:50:03       19 阅读
  14. 系统迁移从CentOS7.9到Rocky8.9

    2024-07-11 19:50:03       23 阅读
  15. 深入理解CSS中的块格式化上下文(BFC)

    2024-07-11 19:50:03       21 阅读