- 1. ReplacingMergeTree
- 1.1 使用场景
- 1.2 执行SQL
- 1.3 说明
- 2. SummingMergeTree
- 2.1 使用场景
- 2.2 执行SQL
- 2.3 说明
- 2.4 修改order by字段
- 3. AggregatingMergeTree
- 3.1 使用场景
- 3.2 执行SQL
- 3.3 说明
- 4. CollapsingMergeTree
- 4.1 使用场景
- 4.2 执行SQL
- 4.3 说明
- 4.4 sign的1和-1的抵消规则
- 4.4.1 sign = 1和sign = -1的条数相同, 且最后一条为sign = 1
- 4.4.2 sign = 1和sign = -1的条数相同, 且最后一条为sign = -1
- 4.4.3 sign = 1比sign = -1数据多n条(没有sign = -1也可)
- 4.4.4 sign = -1比sign = 1数据多n条(没有sign = 1也可)
- 4.4.5 第一条数据为sign = -1
- 4.5 不用optimize的查询方式
- 5. VersionedCollapsingMergeTree
- 5.1 使用场景
- 5.2 执行SQL
- 5.3 说明
- 5.4 sign的1和-1的抵消规则
- 5.4.1 sign = 1和sign = -1的条数相同
- 5.4.2 sign = 1比sign = -1数据条数不一样(没有sign = 1或sign = -1也可)
- 5.5 不用optimize的查询方式
- 6. 各种MergeTree的关系总结
MergeTree允许存在相同primary key的数据,为了避免数据重复,使用ReplacingMergeTree对插入的数据进行去重
1.2 执行SQLclickhouse1 :)
clickhouse1 :) create table replace_merge_local on cluster sharding_ha(
:-] id UInt64,
:-] name String,
:-] create_time DateTime
:-] ) engine = ReplicatedReplacingMergeTree('/clickhouse/tables/replace_merge/{shard}', '{replica}', create_time)
:-] partition by toYYYYMM(create_time)
:-] order by (id, name)
:-] primary key id;
CREATE TABLE replace_merge_local ON CLUSTER sharding_ha
(
`id` UInt64,
`name` String,
`create_time` DateTime
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/replace_merge/{shard}', '{replica}', create_time)
PARTITION BY toYYYYMM(create_time)
PRIMARY KEY id
ORDER BY (id, name)
Query id: dff2f0df-3830-41fa-9c75-57b7ca5b5f0e
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 1 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 1 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.201 sec.
clickhouse1 :)
clickhouse1 :) create table replace_merge_all on cluster sharding_ha(
:-] id UInt64,
:-] name String,
:-] create_time DateTime
:-] ) engine = Distributed(sharding_ha, default, replace_merge_local, id);
CREATE TABLE replace_merge_all ON CLUSTER sharding_ha
(
`id` UInt64,
`name` String,
`create_time` DateTime
)
ENGINE = Distributed(sharding_ha, default, replace_merge_local, id)
Query id: a886349a-cd22-4979-81a2-992c2a3595ec
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.128 sec.
clickhouse1 :)
clickhouse1 :) insert into replace_merge_all(id, name, create_time) values(1, 'A', '2021-07-10 17:00:00'), (1, 'A', '2021-07-11 17:00:00'), (1, 'B', '2021-07-12 17:00:00'), (1, 'A', '2021-08-12 17:00:00'), (2, 'B', '2021-07-13 17:00:00'), (3, 'C', '2021-07-14 17:00:00');
INSERT INTO replace_merge_all (id, name, create_time) VALUES
Query id: 8c49d417-a466-4ec7-b77d-3812564d9b85
Ok.
6 rows in set. Elapsed: 0.023 sec.
clickhouse1 :)
clickhouse1 :) optimize table replace_merge_local on cluster sharding_ha final;
OPTIMIZE TABLE replace_merge_local ON CLUSTER sharding_ha FINAL
Query id: 4b5b1158-2822-4a4d-9a22-748716864355
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.295 sec.
clickhouse1 :)
clickhouse1 :) select * from replace_merge_all order by id, name;
SELECT *
FROM replace_merge_all
ORDER BY
id ASC,
name ASC
Query id: f7cd212d-f854-4d87-b038-f72c68fab75f
┌─id─┬─name─┬─────────create_time─┐
│ 1 │ A │ 2021-08-12 17:00:00 │
└────┴──────┴─────────────────────┘
┌─id─┬─name─┬─────────create_time─┐
│ 1 │ A │ 2021-07-11 17:00:00 │
│ 1 │ B │ 2021-07-12 17:00:00 │
│ 2 │ B │ 2021-07-13 17:00:00 │
│ 3 │ C │ 2021-07-14 17:00:00 │
└────┴──────┴─────────────────────┘
5 rows in set. Elapsed: 0.010 sec.
clickhouse1 :)
1.3 说明
- mergeTree的primary key不是唯一的
- replacingMergeTree在合并分区时,以partition为单元,不同partition可能存在相同的数据
- 按order by的字段进行去重,留最大的版本号(本示例为create_time),版本字段为整形、Date、DateTime, 默认留最后一条数据
有时我们并不关心明细数据,只需要数据求和后的结果,采用SummingMergeTree对插入的明细数据自动进行求和
2.2 执行SQL- 建表
clickhouse1 :)
clickhouse1 :) create table summing_table_local on cluster sharding_ha(
:-] id UInt64,
:-] name String,
:-] city String,
:-] value1 UInt64,
:-] value2 UInt64,
:-] create_time DateTime,
:-] nestMap Nested(
:-] id UInt64,
:-] nest_value1 UInt64,
:-] nest_timeType DateTime,
:-] nest_value2 UInt64
:-] )
:-] ) engine = ReplicatedSummingMergeTree('/clickhouse/tables/summing_table/{shard}', '{replica}', (value1, value2))
:-] partition by toYYYYMM(create_time)
:-] order by (id, city)
:-] primary key id;
CREATE TABLE summing_table_local ON CLUSTER sharding_ha
(
`id` UInt64,
`name` String,
`city` String,
`value1` UInt64,
`value2` UInt64,
`create_time` DateTime,
`nestMap` Nested(id UInt64, nest_value1 UInt64, nest_timeType DateTime, nest_value2 UInt64)
)
ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/summing_table/{shard}', '{replica}', (value1, value2))
PARTITION BY toYYYYMM(create_time)
PRIMARY KEY id
ORDER BY (id, city)
Query id: 387b475d-2621-4cb0-8017-26c0ccd8845c
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.326 sec.
clickhouse1 :)
clickhouse1 :) create table summing_table_all on cluster sharding_ha(
:-] id UInt64,
:-] name String,
:-] city String,
:-] value1 UInt64,
:-] value2 UInt64,
:-] create_time DateTime,
:-] nestMap Nested(
:-] id UInt64,
:-] nest_value1 UInt64,
:-] nest_timeType DateTime,
:-] nest_value2 UInt64
:-] )
:-] ) engine = Distributed(sharding_ha, default, summing_table_local, id);
CREATE TABLE summing_table_all ON CLUSTER sharding_ha
(
`id` UInt64,
`name` String,
`city` String,
`value1` UInt64,
`value2` UInt64,
`create_time` DateTime,
`nestMap` Nested(id UInt64, nest_value1 UInt64, nest_timeType DateTime, nest_value2 UInt64)
)
ENGINE = Distributed(sharding_ha, default, summing_table_local, id)
Query id: ea653602-019e-4b3f-bec2-2465aa2469c2
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.156 sec.
clickhouse1 :)
- 插入数据
clickhouse1 :)
clickhouse1 :) insert into summing_table_all(id, name, city, value1, value2, create_time, nestMap.id, nestMap.nest_value1, nestMap.nest_timeType, nestMap.nest_value2) values(1, 'A001', 'Beijing', 10, 60, '2021-06-10 17:00:00', [1, 1, 2], [1, 1, 5], ['2021-06-10 17:00:00', '2021-06-10 17:00:00', '2021-06-11 17:00:00'], [10, 10, 50]), (1, 'A002', 'Beijing', 10, 60, '2021-06-11 17:00:00', [1, 1, 2], [1, 1, 5], ['2021-06-10 17:00:00', '2021-06-10 17:00:00', '2021-06-11 17:00:00'], [10, 10, 50]), (1, 'A001', 'Beijing', 10, 60, '2021-07-10 17:00:00', [1, 1, 2], [1, 1, 5], ['2021-06-10 17:00:00', '2021-06-10 17:00:00', '2021-06-11 17:00:00'], [10, 10, 50]), (2, 'A001', 'Beijing', 10, 60, '2021-06-10 17:00:00', [1, 1, 2], [1, 1, 5], ['2021-06-10 17:00:00', '2021-06-10 17:00:00', '2021-06-11 17:00:00'], [10, 10, 50]);
INSERT INTO summing_table_all (id, name, city, value1, value2, create_time, nestMap.id, nestMap.nest_value1, nestMap.nest_timeType, nestMap.nest_value2) VALUES
Query id: 1c4039b9-d7d9-4f81-8bb9-53a24e6aa7e5
Ok.
4 rows in set. Elapsed: 0.032 sec.
clickhouse1 :)
- 查询数据
clickhouse1 :)
clickhouse1 :) optimize table summing_table_local on cluster sharding_ha final;
OPTIMIZE TABLE summing_table_local ON CLUSTER sharding_ha FINAL
Query id: adeedf8d-eff8-4c3f-99b7-ba41d83b8096
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.157 sec.
clickhouse1 :)
clickhouse1 :) select * from summing_table_all;
SELECT *
FROM summing_table_all
Query id: 187c3f8f-1729-474e-a134-9c666edad6cd
┌─id─┬─name─┬─city────┬─value1─┬─value2─┬─────────create_time─┬─nestMap.id─┬─nestMap.nest_value1─┬─nestMap.nest_timeType───────────────────────────────────────────────┬─nestMap.nest_value2─┐
│ 2 │ A001 │ Beijing │ 10 │ 60 │ 2021-06-10 17:00:00 │ [1,1,2] │ [1,1,5] │ ['2021-06-10 17:00:00','2021-06-10 17:00:00','2021-06-11 17:00:00'] │ [10,10,50] │
└────┴──────┴─────────┴────────┴────────┴─────────────────────┴────────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────┴─────────────────────┘
┌─id─┬─name─┬─city────┬─value1─┬─value2─┬─────────create_time─┬─nestMap.id─┬─nestMap.nest_value1─┬─nestMap.nest_timeType─────────────────────────┬─nestMap.nest_value2─┐
│ 1 │ A001 │ Beijing │ 20 │ 120 │ 2021-06-10 17:00:00 │ [1,2] │ [4,10] │ ['2021-06-10 17:00:00','2021-06-11 17:00:00'] │ [40,100] │
└────┴──────┴─────────┴────────┴────────┴─────────────────────┴────────────┴─────────────────────┴───────────────────────────────────────────────┴─────────────────────┘
┌─id─┬─name─┬─city────┬─value1─┬─value2─┬─────────create_time─┬─nestMap.id─┬─nestMap.nest_value1─┬─nestMap.nest_timeType───────────────────────────────────────────────┬─nestMap.nest_value2─┐
│ 1 │ A001 │ Beijing │ 10 │ 60 │ 2021-07-10 17:00:00 │ [1,1,2] │ [1,1,5] │ ['2021-06-10 17:00:00','2021-06-10 17:00:00','2021-06-11 17:00:00'] │ [10,10,50] │
└────┴──────┴─────────┴────────┴────────┴─────────────────────┴────────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────┴─────────────────────┘
3 rows in set. Elapsed: 0.024 sec.
clickhouse1 :)
2.3 说明
- SummingMergeTree在合并分区时,以partition为单元,聚合后不同partition可能存在相同的order by数据
- 以order by字段为分组字段,而不是primary key字段
- 可以在表引擎的参数设置求和字段,其余字段(除order by字段)取第一条数据;默认的求值字段为所有数值类型字段(除order by字段)
Nested
类型字段- Nested类型字段的字段名以Map结尾,则Nested字段里面会发生聚合,否则和普通字段一样取第一条数据
- 以第一个字段和以(Key、Id、Type)结尾为聚合字段,对所有数值类型字段求和;如果还有剩余字段,则不发生聚合,和普通字段一样取第一条数据
- 执行完
optimize tabe ...... final
之后,如果partition文件只有一条数据而不用合并,则Nested里面的数据也不会发生聚合求和
order by字段只能在原来的字段上,从后面减少
clickhouse1 :)
clickhouse1 :) alter table summing_table_local on cluster sharding_ha modify order by id;
ALTER TABLE summing_table_local ON CLUSTER sharding_ha
MODIFY ORDER BY id
Query id: 880097ed-ab71-402d-8df9-20a366b44395
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 3 │ 3 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.214 sec.
clickhouse1 :)
或者给表新增字段,再添加到order by字段后面, 不能添加到前面
clickhouse1 :)
clickhouse1 :) alter table summing_table_local on cluster sharding_ha add column order_col String, modify order by (id, order_col);
ALTER TABLE summing_table_local ON CLUSTER sharding_ha
ADD COLUMN `order_col` String, MODIFY ORDER BY (id, order_col)
Query id: fe736b19-bd53-42da-9ef2-17f38fad655b
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 1 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 1 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.184 sec.
clickhouse1 :)
3. AggregatingMergeTree
3.1 使用场景
SummingMergeTree的求和功能并不能满足我们的需求,AggregatingMergeTree可以使用功能更多的聚合函数
3.2 执行SQL- 创建表
clickhouse1 :)
clickhouse1 :) create table agg_table_local on cluster sharding_ha(
:-] id UInt32,
:-] city String,
:-] code AggregateFunction(uniq, String),
:-] value AggregateFunction(sum, UInt32),
:-] create_time DateTime
:-] ) engine = ReplicatedAggregatingMergeTree('/clickhouse/tables/agg_table/{shard}', '{replica}')
:-] partition by toYYYYMM(create_time)
:-] order by (id, city)
:-] primary key id;
CREATE TABLE agg_table_local ON CLUSTER sharding_ha
(
`id` UInt32,
`city` String,
`code` AggregateFunction(uniq, String),
`value` AggregateFunction(sum, UInt32),
`create_time` DateTime
)
ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/agg_table/{shard}', '{replica}')
PARTITION BY toYYYYMM(create_time)
PRIMARY KEY id
ORDER BY (id, city)
Query id: a6336a2b-f61b-4869-b4fc-5e59ea3b5efa
e
) engine = Distributed(sharding_ha, default, agg_table_local, id);
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 3 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 1 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.452 sec.
clickhouse1 :)
clickhouse1 :) create table agg_table_all on cluster sharding_ha(
:-] id UInt32,
:-] city String,
:-] code AggregateFunction(uniq, String),
:-] value AggregateFunction(sum, UInt32),
:-] create_time DateTime
:-] ) engine = Distributed(sharding_ha, default, agg_table_local, id);
CREATE TABLE agg_table_all ON CLUSTER sharding_ha
(
`id` UInt32,
`city` String,
`code` AggregateFunction(uniq, String),
`value` AggregateFunction(sum, UInt32),
`create_time` DateTime
)
ENGINE = Distributed(sharding_ha, default, agg_table_local, id)
Query id: 6a0710f9-ac44-4954-8ea9-afad23b76b1e
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.093 sec.
clickhouse1 :)
- 插入数据
insert into agg_table_all select 1, 'Beijing', uniqState('code1'), sumState(toUInt32(10)), '2021-07-16 17:00:00';
insert into agg_table_all select 1, 'Beijing', uniqState('code1'), sumState(toUInt32(20)), '2021-07-17 17:00:00';
insert into agg_table_all select 1, 'Beijing', uniqState('code2'), sumState(toUInt32(30)), '2021-07-18 17:00:00';
insert into agg_table_all select 2, 'Shanghai', uniqState('code1'), sumState(toUInt32(10)), '2021-08-16 17:00:00';
insert into agg_table_all select 2, 'Shanghai', uniqState('code1'), sumState(toUInt32(20)), '2021-08-17 17:00:00';
insert into agg_table_all select 2, 'Shanghai', uniqState('code2'), sumState(toUInt32(30)), '2021-08-18 17:00:00';
- 查询数据
clickhouse1 :)
clickhouse1 :) optimize table agg_table_local on cluster sharding_ha final;
OPTIMIZE TABLE agg_table_local ON CLUSTER sharding_ha FINAL
Query id: 8a3266de-856d-4c56-a4d6-e0c310e5fc45
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.485 sec.
clickhouse1 :)
clickhouse1 :) select * from agg_table_all;
SELECT *
FROM agg_table_all
Query id: a8b77c2d-c8e7-452d-b5d6-d676ff7d5243
┌─id─┬─city─────┬─code─┬─value─┬─────────create_time─┐
│ 2 │ Shanghai │ ߙcΘn │ < │ 2021-08-16 17:00:00 │
└────┴──────────┴──────┴───────┴─────────────────────┘
┌─id─┬─city────┬─code─┬─value─┬─────────create_time─┐
│ 1 │ Beijing │ ߙcΘn │ < │ 2021-07-16 17:00:00 │
└────┴─────────┴──────┴───────┴─────────────────────┘
2 rows in set. Elapsed: 0.010 sec.
clickhouse1 :)
clickhouse1 :) select id, city, uniqMerge(code), sumMerge(value), create_time from agg_table_all group by id, city, create_time;
SELECT
id,
city,
uniqMerge(code),
sumMerge(value),
create_time
FROM agg_table_all
GROUP BY
id,
city,
create_time
Query id: dcc7749c-9ff9-481a-b90a-2d1545a1b6b5
┌─id─┬─city─────┬─uniqMerge(code)─┬─sumMerge(value)─┬─────────create_time─┐
│ 2 │ Shanghai │ 2 │ 60 │ 2021-08-16 17:00:00 │
│ 1 │ Beijing │ 2 │ 60 │ 2021-07-16 17:00:00 │
└────┴──────────┴─────────────────┴─────────────────┴─────────────────────┘
2 rows in set. Elapsed: 0.016 sec.
clickhouse1 :)
3.3 说明
- AggregatingMergeTree在合并分区时,以partition为单元,聚合后不同partition可能存在相同的order by数据
- 以order by字段为分组字段,而不是primary key字段
- 聚合字段建表时用
AggregateFunction
定义,插入数据时用*State
转换为二进制,其余字段(除order by字段)取第一条数据 - 查询聚合后的数据默认会出现乱码的二进制形式;所以聚合字段要用
*Merge
转换,且用group by
语法 - 这里我不推荐配合物化视图使用;因为我们采用了副本形式的分布式集群;如果采用本地物化视图从本地源表同步数据,则副本会导致数据变多,如果采用all物化视图从all源表同步数据,则会导致数据同步的单点故障
我们对MergeTree进行一条数据的修改或删除,会直接去操作磁盘上的数据源文件,代价非常昂贵,而CollapsingMergeTree不管修改或删除,都是新增一条数据,等partition合并的时候,再根据sign标记进行真正的数据修改或删除
4.2 执行SQL- 建表语句
clickhouse1 :)
clickhouse1 :) drop table if exists collapse_table_local on cluster sharding_ha;
DROP TABLE IF EXISTS collapse_table_local ON CLUSTER sharding_ha
Query id: 0a24aa0e-82bc-4ee3-bfbf-012ab0ae2357
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.131 sec.
clickhouse1 :)
clickhouse1 :) drop table if exists collapse_table_all on cluster sharding_ha;
DROP TABLE IF EXISTS collapse_table_all ON CLUSTER sharding_ha
Query id: a1035950-e66c-4504-bb3e-019cedf1d8c1
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.119 sec.
clickhouse1 :)
clickhouse1 :) create table collapse_table_local on cluster sharding_ha(
:-] id UInt32,
:-] city String,
:-] create_time DateTime,
:-] sign Int8
:-] ) engine = ReplicatedCollapsingMergeTree('/clickhouse/tables/collapse_table/{shard}', '{replica}', sign)
:-] partition by toYYYYMM(create_time)
:-] order by (id, city)
:-] primary key id;
CREATE TABLE collapse_table_local ON CLUSTER sharding_ha
(
`id` UInt32,
`city` String,
`create_time` DateTime,
`sign` Int8
)
ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/collapse_table/{shard}', '{replica}', sign)
PARTITION BY toYYYYMM(create_time)
PRIMARY KEY id
ORDER BY (id, city)
Query id: 8b9161dd-3271-4746-b937-f2301f1ba761
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 2 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 2 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.178 sec.
clickhouse1 :)
clickhouse1 :) create table collapse_table_all on cluster sharding_ha(
:-] id UInt32,
:-] city String,
:-] create_time DateTime,
:-] sign Int8
:-] ) engine = Distributed(sharding_ha, default, collapse_table_local, id);
CREATE TABLE collapse_table_all ON CLUSTER sharding_ha
(
`id` UInt32,
`city` String,
`create_time` DateTime,
`sign` Int8
)
ENGINE = Distributed(sharding_ha, default, collapse_table_local, id)
Query id: 7113bd37-83b3-4c9f-aa9f-2e3a84974c91
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.122 sec.
clickhouse1 :)
4.3 说明
- CollapsingMergeTree在合并分区时,以partition为单元,聚合后不同partition可能存在相同的order by数据
- 用order by字段判定是否是对同一条数据操作,而不是primary key字段
- sign的1和-1的抵消规则请参考下一节
以下所有测试开始时表中都没有数据
4.4.1 sign = 1和sign = -1的条数相同, 且最后一条为sign = 1- 插入数据并optimize
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', -1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', -1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', 1);
optimize table collapse_table_local on cluster sharding_ha final;
- 查询结果
clickhouse1 :)
clickhouse1 :) select * from collapse_table_all;
SELECT *
FROM collapse_table_all
Query id: 9bc07741-f1bc-4ced-9029-14482278952c
┌─id─┬─city────┬─────────create_time─┬─sign─┐
│ 1 │ Beijing │ 2021-07-16 17:00:00 │ -1 │
│ 1 │ Beijing │ 2021-07-17 17:00:00 │ 1 │
└────┴─────────┴─────────────────────┴──────┘
2 rows in set. Elapsed: 0.011 sec.
clickhouse1 :)
- 规则结论 sign = -1的取第一条,sign = 1的取最后一条
- 插入数据并optimize
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', -1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', -1);
optimize table collapse_table_local on cluster sharding_ha final;
- 查询结果
clickhouse1 :)
clickhouse1 :) select * from collapse_table_all;
SELECT *
FROM collapse_table_all
Query id: f246a51f-ed4b-4dfc-bf71-e5f81d6d8730
Ok.
0 rows in set. Elapsed: 0.009 sec.
clickhouse1 :)
- 规则结论 没有数据
- 插入数据并optimize
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', -1);
optimize table collapse_table_local on cluster sharding_ha final;
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from collapse_table_all;
SELECT *
FROM collapse_table_all
Query id: dcbb1c99-3fe7-49d4-bcc4-99673c478687
┌─id─┬─city────┬─────────create_time─┬─sign─┐
│ 1 │ Beijing │ 2021-07-17 17:00:00 │ 1 │
└────┴─────────┴─────────────────────┴──────┘
1 rows in set. Elapsed: 0.019 sec.
clickhouse1 :)
- 规则结论 取最后一条sign = 1
- 插入数据并optimize
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', -1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', -1);
optimize table collapse_table_local on cluster sharding_ha final;
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from collapse_table_all;
SELECT *
FROM collapse_table_all
Query id: 0a48a570-6716-42f0-95f7-5a5c113638ee
┌─id─┬─city────┬─────────create_time─┬─sign─┐
│ 1 │ Beijing │ 2021-07-16 17:00:00 │ -1 │
└────┴─────────┴─────────────────────┴──────┘
1 rows in set. Elapsed: 0.010 sec.
clickhouse1 :)
- 规则结论 取第一条sign = -1
- 插入数据并optimize
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', -1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', 1);
optimize table collapse_table_local on cluster sharding_ha final;
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from collapse_table_all;
SELECT *
FROM collapse_table_all
Query id: a1783d7e-593f-46cc-83f1-20438dfafe0e
┌─id─┬─city────┬─────────create_time─┬─sign─┐
│ 1 │ Beijing │ 2021-07-16 17:00:00 │ -1 │
│ 1 │ Beijing │ 2021-07-16 17:00:00 │ 1 │
└────┴─────────┴─────────────────────┴──────┘
2 rows in set. Elapsed: 0.017 sec.
clickhouse1 :)
- 规则结论 不会发生折叠,数据无变化;可以使用VersionedCollapsingMergeTree解决
以下测试开始时表中没有数据
- 插入数据
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-17 17:00:00', 1);
insert into collapse_table_all(id, city, create_time, sign) values(1, 'Beijing', '2021-07-16 17:00:00', -1);
- 查询数据 如果使用optimize,则查询出最后一条sign = 1的数据
clickhouse1 :)
clickhouse1 :) select id, city, anyLastIf(create_time, sign = 1) as create_time, anyLastIf(sign, sign = 1) as sign2 from collapse_table_all group by id, city having sum(sign) > 0;
SELECT
id,
city,
anyLastIf(create_time, sign = 1) AS create_time,
anyLastIf(sign, sign = 1) AS sign2
FROM collapse_table_all
GROUP BY
id,
city
HAVING sum(sign) > 0
Query id: 8adfb3e7-0547-4238-8404-a4af5e6bf1ed
┌─id─┬─city────┬─────────create_time─┬─sign2─┐
│ 1 │ Beijing │ 2021-07-17 17:00:00 │ 1 │
└────┴─────────┴─────────────────────┴───────┘
1 rows in set. Elapsed: 0.026 sec.
clickhouse1 :)
和使用optimize方式,查询的结果一样的
5. VersionedCollapsingMergeTree 5.1 使用场景CollapsingMergeTree在多线程插入数据时,可能第一条数据是sign = -1,则会出现在partition合并时,并不会发生折叠,数据无变化
VersionedCollapsingMergeTree通过引入一个ver版本号字段,来解决这个问题
5.2 执行SQL- 建表语句
clickhouse1 :)
clickhouse1 :) drop table if exists ver_collapse_table_local on cluster sharding_ha;
DROP TABLE IF EXISTS ver_collapse_table_local ON CLUSTER sharding_ha
Query id: fc89f9f3-4589-4e38-acdc-bf5afb7e6937
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.119 sec.
clickhouse1 :)
clickhouse1 :) drop table if exists ver_collapse_table_all on cluster sharding_ha;
DROP TABLE IF EXISTS ver_collapse_table_all ON CLUSTER sharding_ha
Query id: f5c5a682-1670-45b7-aa6c-4f402bc26c36
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.121 sec.
clickhouse1 :)
clickhouse1 :) create table ver_collapse_table_local on cluster sharding_ha(
:-] id UInt32,
:-] city String,
:-] create_time DateTime,
:-] sign Int8,
:-] ver Int8
:-] ) engine = ReplicatedVersionedCollapsingMergeTree('/clickhouse/tables/ver_collapse_table/{shard}', '{replica}', sign, ver)
:-] partition by toYYYYMM(create_time)
:-] order by (id, city)
:-] primary key id;
CREATE TABLE ver_collapse_table_local ON CLUSTER sharding_ha
(
`id` UInt32,
`city` String,
`create_time` DateTime,
`sign` Int8,
`ver` Int8
)
ENGINE = ReplicatedVersionedCollapsingMergeTree('/clickhouse/tables/ver_collapse_table/{shard}', '{replica}', sign, ver)
PARTITION BY toYYYYMM(create_time)
PRIMARY KEY id
ORDER BY (id, city)
Query id: cb96d37e-f4c1-4262-9ace-cdb5c6c0dce7
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 1 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 1 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 1 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.177 sec.
clickhouse1 :)
clickhouse1 :) create table ver_collapse_table_all on cluster sharding_ha(
:-] id UInt32,
:-] city String,
:-] create_time DateTime,
:-] sign Int8,
:-] ver Int8
:-] ) engine = Distributed(sharding_ha, default, ver_collapse_table_local, id);
CREATE TABLE ver_collapse_table_all ON CLUSTER sharding_ha
(
`id` UInt32,
`city` String,
`create_time` DateTime,
`sign` Int8,
`ver` Int8
)
ENGINE = Distributed(sharding_ha, default, ver_collapse_table_local, id)
Query id: 92daa256-fc6e-4208-bcdc-1b5fd64f7c4b
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.122 sec.
clickhouse1 :)
5.3 说明
- VersionedCollapsingMergeTree在合并分区时,以partition为单元,聚合后不同partition可能存在相同的order by数据
- 用order by字段判定是否是对同一条数据操作,而不是primary key字段
- 同一条数据中,相同ver版本号的sign才能折叠,因为会将ver字段作为排序字段添加到order by后面,sign的1和-1的抵消规则请参考下一节
以下所有测试开始时表中都没有数据
5.4.1 sign = 1和sign = -1的条数相同- 插入数据并optimize
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-16 17:00:00', -1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-16 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-17 17:00:00', -1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-17 17:00:00', 1, 1);
optimize table ver_collapse_table_local on cluster sharding_ha final;
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from ver_collapse_table_all;
SELECT *
FROM ver_collapse_table_all
Query id: b7b13048-3136-4d9f-bb94-38cf6e0c1535
Ok.
0 rows in set. Elapsed: 0.036 sec.
clickhouse1 :)
- 规则结论 和sign的顺序无关,全部抵消,最后无数据
- 插入数据并optimize
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-16 17:00:00', -1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-16 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-17 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-18 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-19 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-17 17:00:00', -1, 1);
optimize table ver_collapse_table_local on cluster sharding_ha final;
- 查询数据
clickhouse1 :)
clickhouse1 :) select * from ver_collapse_table_all;
SELECT *
FROM ver_collapse_table_all
Query id: 2c409a16-1c4c-49bb-9898-54cd8185bd98
┌─id─┬─city────┬─────────create_time─┬─sign─┬─ver─┐
│ 1 │ Beijing │ 2021-07-17 17:00:00 │ 1 │ 1 │
│ 1 │ Beijing │ 2021-07-18 17:00:00 │ 1 │ 1 │
└────┴─────────┴─────────────────────┴──────┴─────┘
2 rows in set. Elapsed: 0.034 sec.
clickhouse1 :)
- 规则结论 数据从上到下,采取最近优先折叠抵消原则,且折叠抵消完后迭代进行折叠抵消,直到不能折叠抵消为止,折叠抵消完后数据剩余1行或多行
以下测试开始时表中没有数据
- 插入数据
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-16 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-17 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-18 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-19 17:00:00', 1, 1);
insert into ver_collapse_table_all(id, city, create_time, sign, ver) values(1, 'Beijing', '2021-07-16 17:00:00', -1, 1);
- 查询数据
clickhouse1 :)
clickhouse1 :) select id, city, anyLastIf(create_time, sign = 1) as create_time, anyLastIf(sign, sign = 1) as sign2, ver from ver_collapse_table_all group by id, city, ver having sum(sign) > 0;
SELECT
id,
city,
anyLastIf(create_time, sign = 1) AS create_time,
anyLastIf(sign, sign = 1) AS sign2,
ver
FROM ver_collapse_table_all
GROUP BY
id,
city,
ver
HAVING sum(sign) > 0
Query id: c89a91c7-af44-4a52-9313-572572a605a8
┌─id─┬─city────┬─────────create_time─┬─sign2─┬─ver─┐
│ 1 │ Beijing │ 2021-07-19 17:00:00 │ 1 │ 1 │
└────┴─────────┴─────────────────────┴───────┴─────┘
1 rows in set. Elapsed: 0.022 sec.
clickhouse1 :)
查询的数据只有一条,应该为一条或多条;当最后一条sign = 1后有一条sign = -1,结果的数据不对
目前采用这种方法,后面再做优化
6. 各种MergeTree的关系总结所有的MergeTree只是在partition合并的时候,处理逻辑不一样,其余都是一样的;如MergeTree partition合并时,将相同分区的数据按order by进行排序,而其它MergeTree也时基于这个原理的