- 1. Partitioning分区
- 2. Schema、Partition、Sort order更新
- 2.1 Schema更新
- 2.2 Partition更新
- 2.3 Sort order更新
- 写入数据 Iceberg支持计算列作为分区字段。对于timestamp类型,计算列的分区粒度为year、month、day、hour。如下面的示例,order_time字段类型为timestamp(没有time zone)
val partitionSpec: PartitionSpec = PartitionSpec.builderFor(schema)
// 从timestamp类型字段,解析int类型的小时作为分区字段
.hour("order_time")
.build()
插入一条order_time = '2022-02-28 09:15:30’的数据。插入数据不需要对计算分区列进行计算,会自动进行计算
Flink SQL> insert into partition_test(user_name, order_time) values('zhang_san', timestamp '2022-02-28 09:15:30');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: cbb5878901e2beffe685a520bfa6dd11
Flink SQL>
HDFS上的分区目录如下:
partition_test/data/order_time_hour=2022-02-28-09
- 查询数据 对于查询数据,先看如下查询:
Flink SQL> select * from partition_test where order_time between timestamp '2022-02-28 09:10:00' and timestamp '2022-02-28 09:20:00' limit 3;
+----+--------------------------------+----------------------------+
| op | user_name | order_time |
+----+--------------------------------+----------------------------+
| +I | zhang_san | 2022-02-28 09:15:30.000000 |
+----+--------------------------------+----------------------------+
Received a total of 1 row
Flink SQL>
Iceberg会自动发现order_time字段和order_time_hour分区字段的关系,在查询时,会计算出需要扫描的分区目录为order_time_hour=2022-02-28-09,而不是扫描整个表
2. Schema、Partition、Sort order更新 2.1 Schema更新可以使用SQL对表或复合类型字段进行Schema更新
支持以下Schema更新
- Add:对表或struct类型字段,添加一个字段
- Drop:对表或struct类型字段,删除一个已存在的字段。字段删除不能回滚,除非字段可以为空,或current-snapshot-id没有被改变
- Rename:对表或struct类型字段,重命名一个已存在的字段,但不改变字段唯一ID
- Update:对表的列、struct的字段、map的key、map的value、list的元素,数据类型进行更新
- Reorder – 对表或struct类型字段,进行字段顺序的改变
因为Iceberg底层会对每一个字段(包含复合类型的嵌套字段),都绑定一个唯一字段ID。所有Schema更新只需要修改metadata就可以了,不用修改表data files
2.2 Partition更新可以对表的Partition进行更新。更新前的HDFS分区目录对应更新前的Partition元数据;更新后的HDFS分区目录,会根据更新后的Partition元数据进行自动创建。而不会产生HDFS上表data目录的数据迁移
同时Iceberg select查询只引用表的列,而不引用分区列,所以Partition更新不会让Iceberg select查询产生语法错误
但对于一个select查询,如果和Partition更新前 + Partition更新后,都有where条件数据交集关系,则会产生两个独立的执行plan,如下图所示:
下面是通过Java/Scala API,进行Partition更新的一个示例
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.Table
val table:Table = null
// =========Partition更新==========
table.updateSpec()
// 添加分区字段user_id,且一个分区内分桶数量为10
.addField(Expressions.bucket("user_id", 10))
// 删除分区字段country
.removeField("country")
// .commit()
2.3 Sort order更新
可以对表的Partition进行更新。更新前的HDFS分区目录对应更新前的Partition元数据;更新后的HDFS分区目录,会根据更新后的Partition元数据进行自动创建。而不会产生HDFS上表data目录的数据迁移
和Partiton更新类似,可以对表的Sort order进行更新。更新前的HDFS数据按更新前的Sort order进行写入;更新后的HDFS数据按更新后的Sort order进行写入。而不会产生HDFS上表data目录的数据迁移
下面是通过Java/Scala API,进行Partition更新的一个示例
import org.apache.iceberg.Table
import org.apache.iceberg.NullOrder
val table:Table = null
table.replaceSortOrder()
.asc("user_id", NullOrder.NULLS_LAST)
.desc("user_name", NullOrder.NULLS_FIRST)
// .commit()