- 1. 分布式的本地表
- 1.1 分布式的创建本地表
- 1.2 分布式的更改本地表表名
- 1.3 分布式的删除本地表
- 2. Distributed表
- 2.1 创建Distributed表
- 2.2 删除分布式表
- 2.3 Distributed表其它语法
- 3. Distributed表的增删改查
- 3.1 insert
- 3.2 select
- 3.3 分布式本地表mutation
- 3.3.1 分布式本地表update
- 3.3.2 分布式本地表delete
- 4. 数据写入时的分片规则
Clickhouse的集群部署可以参考我的Clickhouse版本21.6.5.37的分片和副本分布式安装
Distributed表需要和其它表引擎一起使用,本身不储存数据,只是作分布式本地表的一层透明代理
1. 分布式的本地表 1.1 分布式的创建本地表clickhouse1 :)
clickhouse1 :) create table distribute_test_local on cluster sharding_ha(
:-] id UInt64,
:-] name String
:-] ) engine = ReplicatedMergeTree('/clickhouse/tables/distribute_test/{shard}', '{replica}')
:-] order by id;
CREATE TABLE distribute_test_local ON CLUSTER sharding_ha
(
`id` UInt64,
`name` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/distribute_test/{shard}', '{replica}')
ORDER BY id
Query id: 23f5ef8b-02e5-4b34-b871-208cc4983325
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 1.098 sec.
clickhouse1 :)
这里我们用{shard}
和{replica}
两个动态宏变量代替了硬编码
clickhouse1 :)
clickhouse1 :) rename table distribute_test_local to distribute_test_local2 on cluster sharding_ha;
RENAME TABLE distribute_test_local TO distribute_test_local2 ON CLUSTER sharding_ha
Query id: 06884b49-2627-4b1a-a8d2-6df860a9cd8f
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 3 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.121 sec.
clickhouse1 :)
clickhouse1 :) rename table distribute_test_local2 to distribute_test_local on cluster sharding_ha;
RENAME TABLE distribute_test_local2 TO distribute_test_local ON CLUSTER sharding_ha
Query id: 265a5678-f51e-4bad-8b91-91ea220bcd2b
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 3 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.085 sec.
clickhouse1 :)
1.3 分布式的删除本地表
clickhouse1 :)
clickhouse1 :) drop table distribute_test_local on cluster sharding_ha;
DROP TABLE distribute_test_local ON CLUSTER sharding_ha
Query id: 61540ac6-654b-4319-98aa-3462bac7bbeb
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.247 sec.
clickhouse1 :)
2. Distributed表
2.1 创建Distributed表
clickhouse1 :)
clickhouse1 :) create table distribute_test_all on cluster sharding_ha(
:-] id UInt64,
:-] name String
:-] ) engine = Distributed(sharding_ha, default, distribute_test_local, rand());
CREATE TABLE distribute_test_all ON CLUSTER sharding_ha
(
`id` UInt64,
`name` String
)
ENGINE = Distributed(sharding_ha, default, distribute_test_local, rand())
Query id: 8bbd66c1-e1cc-44f3-b860-a2a892c718fa
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 2 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.300 sec.
clickhouse1 :)
说明:
- 这里数据写入时的分片规则我们采用了
rand()
随机的模式,具体可以参考本文的第4部分 - Distributed采用读时检测机制,对分布式表和本地表的创建顺序无要求
- 会在集群的所有节点上创建一张Distributed表,所以从每个节点执行读写请求都是一样的
clickhouse1 :)
clickhouse1 :) drop table distribute_test_all on cluster sharding_ha;
DROP TABLE distribute_test_all ON CLUSTER sharding_ha
Query id: ac692e90-472b-447e-9908-e4d79fa7b93a
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse1 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 1 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.263 sec.
clickhouse1 :)
2.3 Distributed表其它语法
Distributed表的RENAME和ALTER,其中ALTER并不包括分区的操作(ATTACH PARTITION、REPLACE PARTITION等),这些语法只作用Distributed表本身,不会作用于本地表
3. Distributed表的增删改查Distributed表支持insert、select,不支持mutation、delete,但可以用分布式操作本地表的方式来实现
3.1 insertclickhouse1 :)
clickhouse1 :) insert into distribute_test_all(id, name) values(1, '一');
INSERT INTO distribute_test_all (id, name) VALUES
Query id: e1bd2e12-1055-4095-b971-c7494c99d819
Ok.
1 rows in set. Elapsed: 0.069 sec.
clickhouse1 :)
- 一个Block数据块的大小由max_insert_block_size(默认1048576)控制,当一条insert语句插入的数据量小于max_insert_block_size,则insert插入的数据要么全部成功,要么全部失败;但此原则不适用于clickhouse-client命令行操作或insert select子句
clickhouse1 :)
clickhouse1 :) select * from distribute_test_all;
SELECT *
FROM distribute_test_all
Query id: 2097dcf0-d204-4670-af5c-9c221eec82d1
┌─id─┬─name─┐
│ 1 │ 一 │
└────┴──────┘
1 rows in set. Elapsed: 0.041 sec.
clickhouse1 :)
3.3 分布式本地表mutation
- update和delete命令是一个异步的操作,提交后,客户端不会收到是否成功的消息
- 可以通过system.mutations的is_done字段查看update和delete是否执行成功
- 只有等partition分区合并的时候,才会将不要的旧数据目录删除
clickhouse1 :)
clickhouse1 :) alter table distribute_test_local on cluster sharding_ha update name = 'yi' where id = 1;
ALTER TABLE distribute_test_local ON CLUSTER sharding_ha
UPDATE name = 'yi' WHERE id = 1
Query id: 1ee6903f-6816-4fb0-9b2c-049b612f4382
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.175 sec.
clickhouse1 :)
- primary key、partition by字段不能被update
clickhouse1 :)
clickhouse1 :) alter table distribute_test_local on cluster sharding_ha delete where id = 1;
ALTER TABLE distribute_test_local ON CLUSTER sharding_ha
DELETE WHERE id = 1
Query id: 64865460-0c0b-404b-b922-d02af2faefff
┌─host────────┬─port─┬─status─┬─error─┬─num_hosts_remaining─┬─num_hosts_active─┐
│ clickhouse2 │ 9000 │ 0 │ │ 3 │ 0 │
│ clickhouse3 │ 9000 │ 0 │ │ 2 │ 0 │
│ clickhouse1 │ 9000 │ 0 │ │ 1 │ 0 │
│ clickhouse4 │ 9000 │ 0 │ │ 0 │ 0 │
└─────────────┴──────┴────────┴───────┴─────────────────────┴──────────────────┘
4 rows in set. Elapsed: 0.342 sec.
clickhouse1 :)
4. 数据写入时的分片规则
在本文的2.1部分,我们采用的分片规则是rand()
,分片键可以是Int或UInt类型,如userid
, intHash64(userid)
; weight是整数类型,越大则落入的数据越多
集群分片和权重如下:
服务器分片weight落入此分片取模的值范围clickhouse10110 - 0clickhouse30211 - 1假设rand()
返回的值为66, 则
取
模
的
值
=
66
%
(
1
+
1
)
=
0
取模的值 = 66\%(1 + 1) = 0
取模的值=66%(1+1)=0, 落入分片01