聚合函数
例如 max(val) , argMax(arg,val)
如果在聚合函数后面加入后缀if,则是
maxIf(val,UInt8)
argMaxIf(arg,val,UInt8)
也就是当满足某个条件时候,才会对这一行数据进行函数处理。
例如:
字段:is_deleted
AggregateFunction(argMaxIf, Int32, Int64, UInt8)
该字段插入数据:
insert into table A
slect
argMaxIfState(is_deleted, event_behavior_timestamp, isNotNull(is_deleted)) AS is_deleted
from B
表引擎AggregatingMergeTree
该引擎继承自MergeTree,改变了数据部分合并的逻辑。ClickHouse将所有具有相同主键的行替换为存储聚合函数状态组合的单行。
您可以使用AggregatingMergeTree表进行增量数据聚合,包括聚合的物化视图。
使用该引擎的表中,除了主键外其余使用 AggregateFunction、SimpleAggregateFunction数据类型
AggregateFunction 是数据类型,第一个参数是使用的聚合函数名称,后面多个参数是聚合函数传入参数的类型。如果加入if后缀,则多出一个参数类型UInt8
聚合函数可以具有实现定义的中间状态,该状态可以序列化为AggregateFunction(…)数据类型,并通常通过物化视图存储在表中。生成聚合函数状态的常用方法是调用带有-State后缀的聚合函数。要在将来获得聚合的最终结果,必须使用带- mergessuffix的相同聚合函数。
CREATE TABLE test.agg_visits (
StartDate DateTime64 NOT NULL,
CounterID UInt64,
Visits AggregateFunction(sum, Nullable(Int32)),
Users AggregateFunction(uniq, Nullable(Int32))
)
ENGINE = AggregatingMergeTree() ORDER BY (StartDate, CounterID);
向表agg_visits 插入数据,根据group by进行聚合,聚合表存储聚合函数列的状态值,需要在对应的聚合函数上加入后缀State,例如sumState
insert into table test.agg_visits
SELECT
StartDate,
CounterID,
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
FROM test.visits
GROUP BY StartDate, CounterID;
查看 test.agg_visits表数据,对应的聚合数据要儒后缀 Merge,例如sumMerge
SELECT
StartDate,
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.agg_visits
GROUP BY StartDate
ORDER BY StartDate;
-- 因为存储的是聚合函数的中间状态,所以不加group by的时候,是全局聚合。当插入数据的时候,存储的是以StartDate, CounterID分组下聚合函数的状态值,在查询的时候使用group by StartDate,则会对聚合函数状态下的值进行按照StartDate分组进行聚合函数。
-- 如果查询时使用group by StartDate,CounterID,aa 那么查出的时候也是按照StartDate,CounterID分组的,而不是StartDate,CounterID,aa。
-- 因为聚合表在建表的时候已经决定了存储主键相同下的聚合函数的中间状态值
SELECT
sumMerge(Visits) AS Visits,
uniqMerge(Users) AS Users
FROM test.agg_visits
物化视图
创建物化视图,物化视图和普通视图区别是,物化视图是存储数据得,并且可以动态得监听底表数据变化,并将变化得数据写入物化视图中。而普通视图是不存储数据的
CREATE MATERIALIZED VIEW test.visits_mv TO test.agg_visits
AS SELECT
StartDate,
CounterID,
sumState(Sign) AS Visits,
uniqState(UserID) AS Users
FROM test.visits
GROUP BY StartDate, CounterID;
视图visits_mv 是基于表agg_visits之上建立的,并将数据插入agg_visits和视图visits_mv。
底表是visits,当底表数据变化时候,agg_visits和visits_mv 会进行自动更新数据。
当直接向agg_visits插入数据,那么当底表visits插入了一条新数据时候,只能手动去更新agg_visits表数据,不能主动识别
生产例子
CREATE MATERIALIZED VIEW app.app_eap_entity_member_de_153578 TO app.app_eap_entity_member_aggregate_de_153578
AS
SELECT
aliuid_info_final AS aliuid_info,
aliuid_info_value_timestamp,
birthday_final AS birthday,
birthday_value_timestamp,
cre_date_final AS cre_date,
cre_date_value_timestamp,
creator_final AS creator,
creator_value_timestamp,
creator_name_final AS creator_name,
creator_name_value_timestamp,
creator_store_final AS creator_store,
creator_store_value_timestamp,
data_trace_id_final AS data_trace_id,
data_trace_id_value_timestamp,
wechat_type_final AS wechat_type,
wechat_type_value_timestamp,
dt_final AS dt,
version_timestamp_final AS version_timestamp,
version,
is_deleted_final AS is_deleted
FROM
(
SELECT
argMaxIfState(event_behavior_timestamp, event_behavior_timestamp, isNotNull(aliuid_info)) AS aliuid_info_value_timestamp,
argMaxIfState(aliuid_info, event_behavior_timestamp, isNotNull(aliuid_info)) AS aliuid_info_final,
argMaxIfState(event_behavior_timestamp, event_behavior_timestamp, isNotNull(manager_name)) AS manager_name_value_timestamp,
argMaxIfState(manager_name, event_behavior_timestamp, isNotNull(manager_name)) AS manager_name_final,
argMaxIfState(event_behavior_timestamp, event_behavior_timestamp, isNotNull(member_creator)) AS member_creator_value_timestamp,
argMaxIfState(member_creator, event_behavior_timestamp, isNotNull(member_creator)) AS member_creator_final,
pguid AS pguid,
argMaxIfState(event_behavior_timestamp, event_behavior_timestamp, isNotNull(wechat_type)) AS wechat_type_value_timestamp,
argMaxIfState(wechat_type, event_behavior_timestamp, isNotNull(wechat_type)) AS wechat_type_final,
argMaxIfState(dt, event_behavior_timestamp, isNotNull(dt)) AS dt_final,
argMaxIfState(version_timestamp, event_behavior_timestamp, isNotNull(version_timestamp)) AS version_timestamp_final,
maxState(event_behavior_timestamp) AS version,
argMaxIfState(is_deleted, event_behavior_timestamp, isNotNull(is_deleted)) AS is_deleted_final
FROM app.app_eap_entity_member_log_de_153578
GROUP BY pguid
)
CREATE TABLE app.app_eap_entity_member_aggregate_de_153578
(
`aliuid_info` AggregateFunction(argMaxIf, Nullable(String), Int64, UInt8),
`aliuid_info_value_timestamp` AggregateFunction(argMaxIf, Int64, Int64, UInt8),
`birthday` AggregateFunction(argMaxIf, Nullable(String), Int64, UInt8),
`pguid` Int64,
`wechat_type` AggregateFunction(argMaxIf, Nullable(Int32), Int64, UInt8),
`wechat_type_value_timestamp` AggregateFunction(argMaxIf, Int64, Int64, UInt8),
`dt` AggregateFunction(argMaxIf, String, Int64, UInt8),
`version_timestamp` AggregateFunction(argMaxIf, Int64, Int64, UInt8),
`version` AggregateFunction(max, Int64),
`is_deleted` AggregateFunction(argMaxIf, Int32, Int64, UInt8)
)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/app/app_eap_entity_member_aggregate_de_153578', '{replica}')
PARTITION BY xxHash64(pguid) % 10
ORDER BY pguid
SETTINGS index_granularity = 8192, only_allow_select_statement = 0
CREATE TABLE app.app_eap_entity_member_log_de_153578
(
`event_data_id` String,
`aliuid_info` Nullable(String),
`birthday` Nullable(String),
`cre_date` Nullable(Int64),
`creator` Nullable(String),
`creator_name` Nullable(String),
`pguid` Int64,
`takeover_time` Nullable(Int64),
`version_timestamp` Int64,
`is_deleted` Int32 DEFAULT 0,
`dt` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/app/app_eap_entity_member_log_de_153578', '{replica}')
PARTITION BY (tenant_channel, substring(dt, 1, 4))
ORDER BY (event_behavior_id, event_data_id)
SETTINGS index_granularity = 8192, only_allow_select_statement = 0
位图
https://blog.csdn.net/weixin_39025362/article/details/110390251
Clickhouse实现数据的有限更新
select
arrayJoin(bitmapToArray(groupBitmapAndState(pguid))) as pguid,
'user_tag_huiyuan',
'L0',
'${start_date2}',
'all',
'all'
from (
select bitmapBuild(groupArray(toUInt64(pguid))) as pguid
from(
select pguid from app.app_eap_entity_member_aggregate_de
group by pguid
having argMaxIfMerge(user_status) in ( 0 )
and argMaxIfMerge(is_deleted) = 0
)
)