目录
- 1. 表metadata API
- 2. 表Scanning
- 2.1 File Level
- 2.2 Row level
- 3. 表update操作
- 4. Transactions
- 5. Types数据类型
- 5.1 基础数据类型
- 5.2 集合数据类型
- 6. Expressions表达式
- 7. Iceberg各模块说明
下面以Hadoop Catalog为例进行讲解
1. 表metadata APIimport org.apache.hadoop.conf.Configuration
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.io.{FileIO, LocationProvider}
import org.apache.iceberg.{PartitionSpec, Schema, Snapshot, Table}
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
object flink_test {
def main(args: Array[String]): Unit = {
// =======初始化Hadoop Catalog=============
val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)
// =============加载一个已经存在的表=========
// 参数分别是数据库名和表名
val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
val table: Table = hadoopCatalog.loadTable(tableName)
// ==============表metadata==================
// 返回表的Schema
val schema: Schema = table.schema()
// 返回表的PartitionSpec
val partitionSpec: PartitionSpec = table.spec()
// 返回map形式的key:value属性,本示例返回结果为:{write.format.default=parquet, write.parquet.compression-codec=gzip}
val properties: java.util.Map[String, String] = table.properties()
// 返回表当前的Snapshot
val currentSnapshot: Snapshot = table.currentSnapshot()
// 根据snapshot id返回对应的Snapshot
val defineSanpshot: Snapshot = table.snapshot(138573494821828246L)
// 返回表的所有Snapshot
val snapshots: Seq[Snapshot] = table.snapshots().toSeq
// 返回表在HDFS上的路径,本示例结果为:hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user
val location: String = table.location()
// 将表更新到最新的version
table.refresh()
// 使用FileIO读写table files
val fileIO: FileIO = table.io()
// 使用LocationProvider,为data和metadata files创建path
val locationProvider: LocationProvider = table.locationProvider()
}
}
2. 表Scanning
my_user表的数据如下:
+------------------+--------------------+-------------------+------------------+
| my_user.user_id | my_user.user_name | my_user.birthday | my_user.country |
+------------------+--------------------+-------------------+------------------+
| 1 | zhang_san | 2022-02-01 | china |
| 2 | zhang_san | 2022-02-01 | china |
| 6 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
+------------------+--------------------+-------------------+------------------+
2.1 File Level
pom.xml添加依赖如下:
org.apache.avro
avro
1.10.1
示例程序如下:
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
object flink_test {
def main(args: Array[String]): Unit = {
// =======初始化Hadoop Catalog=============
val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)
// =============加载一个已经存在的表=========
// 参数分别是数据库名和表名
val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
val table: Table = hadoopCatalog.loadTable(tableName)
// ==============表Scanning==================
// TableScan是一个不可变的对象
val tableScan: TableScan =
table.newScan()
.filter(Expressions.equal("user_id", 2))
.select("user_id", "user_name")
// .asOfTime(timestampMillis:Long) // 从指定时间戳开始读取数据
// .useSnapshot(snapshotId:Long) // 从指定snapshot id开始读取数据
// 返回files
val fileScanTaskSeq: Seq[FileScanTask] = tableScan.planFiles().toSeq
// 返回tasks
val combinedScanTaskSeq: Seq[CombinedScanTask] = tableScan.planTasks().toSeq
// 返回读projection
val scanSchema: Schema = tableScan.schema()
}
}
2.2 Row level
pom.xml添加如下依赖
org.apache.hadoop
hadoop-mapreduce-client-core
3.3.1
org.apache.iceberg
iceberg-data
0.13.1
org.apache.iceberg
iceberg-parquet
0.13.1
示例程序如下:
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.data.IcebergGenerics.ScanBuilder
import org.apache.iceberg.data.{IcebergGenerics, Record}
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog
import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable`
object flink_test {
def main(args: Array[String]): Unit = {
// =======初始化Hadoop Catalog=============
val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)
// =============加载一个已经存在的表=========
// 参数分别是数据库名和表名
val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
val table: Table = hadoopCatalog.loadTable(tableName)
// ==============表Scanning==================
val scanBuilder: ScanBuilder = IcebergGenerics.read(table)
val recordSeq: Seq[Record] =
scanBuilder.where(Expressions.equal("user_id", 2))
.select("user_id", "user_name")
.build()
.toSeq
}
}
3. 表update操作
表update操作返回的类,都是PendingUpdate[T]的子类,最后都需要调用PendingUpdate.commit()进行update操作提交
示例程序如下:
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hadoop.HadoopCatalog
import org.apache.iceberg.types.Types
object flink_test {
def main(args: Array[String]): Unit = {
// =======初始化Hadoop Catalog=============
val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)
// =============加载一个已经存在的表=========
// 参数分别是数据库名和表名
val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
val table: Table = hadoopCatalog.loadTable(tableName)
// ==============表update操作==================
// 更新表的schema
table.updateSchema()
.addColumn("age", Types.IntegerType.get())
// .commit()
// 更新表的properties属性
val updateProperties: UpdateProperties = table.updateProperties()
// 更新表的根路径
val updateLocation: UpdateLocation = table.updateLocation()
// 添加data files到表
val appendFiles: AppendFiles = table.newAppend()
// 添加data files到表, 但不会compact metadata
val fastAppendFiles: AppendFiles = table.newFastAppend()
// 添加data files到表, 且删除被覆盖的files
val overwriteFiles: OverwriteFiles = table.newOverwrite()
// 删除data files
val deleteFiles: DeleteFiles = table.newDelete()
// rewrite data files, 用new versions替换已经存在的files
val rewriteFiles: RewriteFiles = table.newRewrite()
// 创建一个新的表级别事务
val transaction: Transaction = table.newTransaction()
// 为了更快的scan planning,用clustering files重写manifest
val rewriteManifests: RewriteManifests = table.rewriteManifests()
// 对表snapshot进行管理,比如将表state回退到某个snapshot id
val manageSnapshots: ManageSnapshots = table.manageSnapshots()
}
}
4. Transactions
作用:在一个原子性的操作中,对一个表的多个改变进行commit
示例程序如下:
import org.apache.hadoop.conf.Configuration
import org.apache.iceberg._
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.expressions.Expressions
import org.apache.iceberg.hadoop.HadoopCatalog
object flink_test {
def main(args: Array[String]): Unit = {
// =======初始化Hadoop Catalog=============
val warehousePath: String = "hdfs://nnha/user/iceberg/warehouse"
val hadoopCatalog: HadoopCatalog = new HadoopCatalog(new Configuration(), warehousePath)
// =============加载一个已经存在的表=========
// 参数分别是数据库名和表名
val tableName: TableIdentifier = TableIdentifier.of("iceberg_db", "my_user")
val table: Table = hadoopCatalog.loadTable(tableName)
// ==============Transactions==================
val transaction: Transaction = table.newTransaction()
// 提交一个delete操作到Transaction
transaction
.newDelete()
.deleteFromRowFilter(Expressions.equal("user_id", 2))
// .commit()
// transaction.newAppend().appendFile(DataFile).commit()
// 提交所有操作到表
// transaction.commitTransaction()
}
}
5. Types数据类型
5.1 基础数据类型
import org.apache.iceberg.types.Types
import org.apache.iceberg.types.Types.{DecimalType, IntegerType}
// 没有参数的使用get()
val integerType:IntegerType = Types.IntegerType.get()
// 有参数的使用of(params...)
val decimalType:DecimalType = Types.DecimalType.of(5, 2) // 第一个参数表示精度,第二个参数表示小数位数
5.2 集合数据类型
注意:集合类型的嵌套字段必须指定唯一字段ID,且嵌套字段可以为optional可选的
import org.apache.iceberg.types.Types
import org.apache.iceberg.types.Types.{ListType, MapType, StructType}
// struct
val structType:StructType = StructType.of(
Types.NestedField.required(1, "id", Types.IntegerType.get(), "人员ID"),
Types.NestedField.optional(2, "name", Types.StringType.get())
)
// map
val mapType:MapType = MapType.ofOptional(
1, 2,
Types.StringType.get(),
Types.IntegerType.get()
)
// array
val listType:ListType = ListType.ofRequired(
1, Types.IntegerType.get()
)
6. Expressions表达式
Expressions表达式用于表数据Scans
Expression创建后是unbound状态,之后会和Expression对应的字段ID进行绑定,并将表达式的字段数据值,转换为字段对应的数据类型。比如Expressions.equal(“user_name”, “zhang_san”),会和user_name字段的字段ID进行绑定,并将zhang_san转换为user_name对应的字段类型,即字符串类型
示例程序如下:
import org.apache.iceberg.expressions.{Expressions, False, True}
// 返回数据类型为org.apache.iceberg.expressions.UnboundPredicate[T],
// 是org.apache.iceberg.expressions.Expression的子类
Expressions.isNull("user_name")
Expressions.notNull("user_name")
Expressions.equal("user_name", "zhang_san")
Expressions.notEqual("user_name", "zhang_san")
Expressions.lessThan("user_id", 3)
Expressions.lessThanOrEqual("user_id", 3)
Expressions.greaterThan("user_id", 3)
Expressions.greaterThanOrEqual("user_id", 3)
Expressions.in("user_id", 1, 2)
Expressions.notIn("user_id", 1, 2)
Expressions.startsWith("user_name", "zhang")
Expressions.notStartsWith("user_name", "zhang")
// 与或非, 返回数据类型为org.apache.iceberg.expressions.Expression
Expressions.and(
Expressions.isNull("user_name"),
Expressions.greaterThan("user_id", 1)
)
Expressions.or(
Expressions.isNull("user_name"),
Expressions.greaterThan("user_id", 1)
)
Expressions.not(
Expressions.isNull("user_name")
)
// 返回数据类型为org.apache.iceberg.expressions.Expression的子类
val alwaysTrue:True = Expressions.alwaysTrue()
val alwaysFalse:False = Expressions.alwaysFalse()
7. Iceberg各模块说明
Iceberg table相关模块如下:
- iceberg-common:为其它模块提供实用的classes
- iceberg-api:包含公共的Iceberg API,比如expressions, types, tables, and operations
- iceberg-arrow:Iceberg tables使用Apache Arrow作为内存中的数据format,而iceberg-arrow模块实现了Iceberg type系统,以便能读写Iceberg tables中的数据
- iceberg-core:包含Iceberg API的实现,和对Avro data files的支持。是数据计算引擎依赖的模块
- iceberg-parquet:可选的,当表使用parquet格式需要用到
- iceberg-orc:可选的,当表使用orc格式需要用到
- iceberg-hive-metastore:对Iceberg tables使用Hive作为Catalog的实现
Iceberg数据计算引擎和相关工具模块如下:
- iceberg-spark:Spark的Iceberg数据源API实现,底层依赖iceberg-spark-runtime-3.1
- iceberg-flink:Flink的Table和DataStream关于Iceberg的API实现,底层依赖iceberg-flink-runtime
- iceberg-hive3:对Hive3特殊的序列化解序列化的实现,特殊的序列化解序列化包括Timestamp、TimestampWithZone、Date object inspectors。底层依赖iceberg-hive-runtime
- iceberg-mr:对MapReduce、Hive InputFormats和SerDes的Iceberg实现。当使用hive时,底层依赖iceberg-hive-runtime
- iceberg-data:JVM应用客户端读Iceberg的实现
- iceberg-runtime:生成一个runtime jar包,为Spark集成Iceberg tables提供支持