您当前的位置: 首页 >  ar

Bulut0907

暂无认证

  • 2浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Iceberg数据湖的Partition分区和Schema、Partition、Sort order更新

Bulut0907 发布时间:2022-04-24 09:26:00 ,浏览量:2

目录
  • 1. Partitioning分区
  • 2. Schema、Partition、Sort order更新
    • 2.1 Schema更新
    • 2.2 Partition更新
    • 2.3 Sort order更新

1. Partitioning分区
  1. 写入数据 Iceberg支持计算列作为分区字段。对于timestamp类型,计算列的分区粒度为year、month、day、hour。如下面的示例,order_time字段类型为timestamp(没有time zone)
    val partitionSpec: PartitionSpec = PartitionSpec.builderFor(schema)
      // 从timestamp类型字段,解析int类型的小时作为分区字段
      .hour("order_time")
      .build()

插入一条order_time = '2022-02-28 09:15:30’的数据。插入数据不需要对计算分区列进行计算,会自动进行计算

Flink SQL> insert into partition_test(user_name, order_time) values('zhang_san', timestamp '2022-02-28 09:15:30');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: cbb5878901e2beffe685a520bfa6dd11


Flink SQL>

HDFS上的分区目录如下:

partition_test/data/order_time_hour=2022-02-28-09
  1. 查询数据 对于查询数据,先看如下查询:
Flink SQL> select * from partition_test where order_time between timestamp '2022-02-28 09:10:00' and timestamp '2022-02-28 09:20:00' limit 3;
+----+--------------------------------+----------------------------+
| op |                      user_name |                 order_time |
+----+--------------------------------+----------------------------+
| +I |                      zhang_san | 2022-02-28 09:15:30.000000 |
+----+--------------------------------+----------------------------+
Received a total of 1 row

Flink SQL>

Iceberg会自动发现order_time字段和order_time_hour分区字段的关系,在查询时,会计算出需要扫描的分区目录为order_time_hour=2022-02-28-09,而不是扫描整个表

2. Schema、Partition、Sort order更新 2.1 Schema更新

可以使用SQL对表或复合类型字段进行Schema更新

支持以下Schema更新

  • Add:对表或struct类型字段,添加一个字段
  • Drop:对表或struct类型字段,删除一个已存在的字段。字段删除不能回滚,除非字段可以为空,或current-snapshot-id没有被改变
  • Rename:对表或struct类型字段,重命名一个已存在的字段,但不改变字段唯一ID
  • Update:对表的列、struct的字段、map的key、map的value、list的元素,数据类型进行更新
  • Reorder – 对表或struct类型字段,进行字段顺序的改变

因为Iceberg底层会对每一个字段(包含复合类型的嵌套字段),都绑定一个唯一字段ID。所有Schema更新只需要修改metadata就可以了,不用修改表data files

2.2 Partition更新

可以对表的Partition进行更新。更新前的HDFS分区目录对应更新前的Partition元数据;更新后的HDFS分区目录,会根据更新后的Partition元数据进行自动创建。而不会产生HDFS上表data目录的数据迁移

同时Iceberg select查询只引用表的列,而不引用分区列,所以Partition更新不会让Iceberg select查询产生语法错误

但对于一个select查询,如果和Partition更新前 + Partition更新后,都有where条件数据交集关系,则会产生两个独立的执行plan,如下图所示:

Partition更新读取 下面是通过Java/Scala API,进行Partition更新的一个示例

    import org.apache.iceberg.expressions.Expressions
    import org.apache.iceberg.Table

    val table:Table = null

    // =========Partition更新==========
    table.updateSpec()
      // 添加分区字段user_id,且一个分区内分桶数量为10
      .addField(Expressions.bucket("user_id", 10))
      // 删除分区字段country
      .removeField("country")
      // .commit()
2.3 Sort order更新

可以对表的Partition进行更新。更新前的HDFS分区目录对应更新前的Partition元数据;更新后的HDFS分区目录,会根据更新后的Partition元数据进行自动创建。而不会产生HDFS上表data目录的数据迁移

和Partiton更新类似,可以对表的Sort order进行更新。更新前的HDFS数据按更新前的Sort order进行写入;更新后的HDFS数据按更新后的Sort order进行写入。而不会产生HDFS上表data目录的数据迁移

下面是通过Java/Scala API,进行Partition更新的一个示例

    import org.apache.iceberg.Table
    import org.apache.iceberg.NullOrder

    val table:Table = null
    table.replaceSortOrder()
      .asc("user_id", NullOrder.NULLS_LAST)
      .desc("user_name", NullOrder.NULLS_FIRST)
      // .commit()
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0739s