您当前的位置: 首页 >  hive

Bulut0907

暂无认证

  • 2浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

使用Hive3.1.2 + Iceberg0.13.1构建数据湖

Bulut0907 发布时间:2022-04-11 09:35:05 ,浏览量:2

目录
  • 1. Hive开启Iceberg的支持
  • 2. Catalog管理
  • 3. 数据库的创建
  • 4. 表的创建和删除
    • 4.1 外部表
    • 4.2 create table
    • 4.3 删除表
  • 5. 插入数据
  • 6. 查询数据
  • 7. 字段类型转换及开启

1. Hive开启Iceberg的支持

Hive可以读写Iceberg的表

添加依赖jar包

[root@hive1 ~]# 
[root@hive1 ~]# wget https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive-runtime/0.13.1/iceberg-hive-runtime-0.13.1.jar
[root@hive1 ~]#
[root@hive1 ~]# beeline -u jdbc:hive2://hive1:10000 -n root -p root123
0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> add jar /root/iceberg-hive-runtime-0.13.1.jar;
INFO  : Added [/root/iceberg-hive-runtime-0.13.1.jar] to class path
INFO  : Added resources: [/root/iceberg-hive-runtime-0.13.1.jar]
No rows affected (0.312 seconds)
0: jdbc:hive2://hive1:10000>

添加以下配置到hive-site.xml

    
        iceberg.engine.hive.enabled
        true
        Hive是否开启Iceberg的支持
    
2. Catalog管理

Hive本身没有Catalog的概念,但是Iceberg有Catalog。所以Hive将Catalog的信息用键值对的属性来实现,这样建表的时候就可以直接使用创建的Catalog

Hive集成Iceberg支持Hive Catalog和Hadoop Catalog

创建Hive Catalog

0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> set iceberg.catalog.hive_catalog.type=hive;
No rows affected (0.093 seconds)
0: jdbc:hive2://hive1:10000> set iceberg.catalog.hive_catalog.uri=thrift://hive1:9083;
No rows affected (0.052 seconds)
0: jdbc:hive2://hive1:10000> set iceberg.catalog.hive_catalog.clients=5;
No rows affected (0.058 seconds)
0: jdbc:hive2://hive1:10000> set iceberg.catalog.hive_catalog.warehouse=hdfs://nnha/user/iceberg/warehouse;
No rows affected (0.044 seconds)
0: jdbc:hive2://hive1:10000> 

创建Hadoop Catalog

0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> set iceberg.catalog.hadoop_catalog.type=hadoop;
No rows affected (0.082 seconds)
0: jdbc:hive2://hive1:10000> set iceberg.catalog.hadoop_catalog.warehouse=hdfs://nnha/user/iceberg/warehouse;
No rows affected (0.046 seconds)
0: jdbc:hive2://hive1:10000> 
3. 数据库的创建

1. Hive Catalog下的数据库 对于其它系统将该Hive作为Catalog,创建的数据库,则可以直接使用该数据库,而不用创建。因为Hive和Iceberg的数据库能直接对应上

2. Hadoop Catalog下的数据库 因为Hive没有Catalog的概念,所以不能通过上面的方式创建的Catalog自动发现数据库。所以需要创建Hive数据库和Iceberg的数据库对应。例如下面:

0: jdbc:hive2://hive1:10000> create schema iceberg_db
. . . . . . . . . . . . . .> location 'hdfs://nnha/user/iceberg/warehouse/iceberg_db/';
0: jdbc:hive2://hive1:10000>
4. 表的创建和删除 4.1 外部表

对于已经通过其它系统创建的Iceberg表,可以通过在Hive中,创建外部表,来读写Iceberg表

1. Hive Catalog下的表 对于其它系统将该Hive作为Catalog,创建的数据库表,则可以直接使用该表,而不用创建。因为Hive和Iceberg的表能直接对应上

2. Hadoop Catalog下的表

创建Hive的表和Iceberg的表对应上。查询的数据结果和Iceberg中的表结果一样

0: jdbc:hive2://hive1:10000> create external table iceberg_db.my_user
. . . . . . . . . . . . . .> stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
. . . . . . . . . . . . . .> tblproperties('iceberg.catalog'='hadoop_catalog');
0: jdbc:hive2://hive1:10000>
0: jdbc:hive2://hive1:10000> select * from iceberg_db.my_user;
+------------------+--------------------+-------------------+------------------+
| my_user.user_id  | my_user.user_name  | my_user.birthday  | my_user.country  |
+------------------+--------------------+-------------------+------------------+
| 6                | zhang_san          | 2022-02-01        | china            |
| 5                | zhao_liu           | 2022-02-02        | japan            |
| 2                | zhang_san          | 2022-02-01        | china            |
| 1                | zhang_san          | 2022-02-01        | china            |
+------------------+--------------------+-------------------+------------------+
6 rows selected (7.855 seconds)
0: jdbc:hive2://hive1:10000>

如果创建表,不指定iceberg.catalog表属性,则默认使用Hive Catalog,元数据储存到当前Hive的元数据位置,表数据储存到当前Hive的warehouse中

3. location_based_table表

对于通过其它系统创建的Hadoop Catalog表,可以在Hive中不通过Catalog,直接使用HDFS路径创建location_based_table类型的表

0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> create external table default.my_user
. . . . . . . . . . . . . .> stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
. . . . . . . . . . . . . .> location 'hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user'
. . . . . . . . . . . . . .> tblproperties('iceberg.catalog'='location_based_table');
No rows affected (1.184 seconds)
0: jdbc:hive2://hive1:10000> select * from default.my_user;
+-------------------+---------------------+--------------------+-------------------+
| my_user2.user_id  | my_user2.user_name  | my_user2.birthday  | my_user2.country  |
+-------------------+---------------------+--------------------+-------------------+
| 6                 | zhang_san           | 2022-02-01         | china             |
| 5                 | zhao_liu            | 2022-02-02         | japan             |
| 2                 | zhang_san           | 2022-02-01         | china             |
| 1                 | zhang_san           | 2022-02-01         | china             |
+-------------------+---------------------+--------------------+-------------------+
6 rows selected (1.994 seconds)
0: jdbc:hive2://hive1:10000> 

不会在HDFS上创建/user/hive/warehouse/default.db/my_user目录

4.2 create table

可以通过Hive直接创建Iceberg表。默认的iceberg.catalog是Hive Catalog

0: jdbc:hive2://hive1:10000> create table iceberg_db.student(
. . . . . . . . . . . . . .> id bigint,
. . . . . . . . . . . . . .> name string
. . . . . . . . . . . . . .> ) partitioned by (birthday date, country string)
. . . . . . . . . . . . . .> stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
No rows affected (5.966 seconds)
0: jdbc:hive2://hive1:10000>

查看HDFS路径如下。也会有Iceberg表的metadata元数据

[root@hive1 ~]# hadoop fs -ls /user/iceberg/warehouse/iceberg_db/student/metadata
Found 1 items
-rw-r--r--   1 root supergroup       2154 2022-02-17 11:25 /user/iceberg/warehouse/iceberg_db/student/metadata/00000-268013a1-d3a9-4c41-b02d-c46cbb6fcb53.metadata.json
[root@hive1 ~]#

指定iceberg.catalog

0: jdbc:hive2://hive1:10000> create table iceberg_db.employee(
. . . . . . . . . . . . . .> id bigint,
. . . . . . . . . . . . . .> name string
. . . . . . . . . . . . . .> ) partitioned by (birthday date, country string)
. . . . . . . . . . . . . .> stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
. . . . . . . . . . . . . .> location 'hdfs://nnha/user/iceberg/warehouse/iceberg_db/employee'
. . . . . . . . . . . . . .> tblproperties('iceberg.catalog'='hadoop_catalog');
No rows affected (0.324 seconds)
0: jdbc:hive2://hive1:10000>
0: jdbc:hive2://hive1:10000> show create table iceberg_db.student;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `iceberg_db.student`(                 |
|   `id` bigint COMMENT 'from deserializer',         |
|   `name` string COMMENT 'from deserializer',       |
|   `birthday` date COMMENT 'from deserializer',     |
|   `country` string COMMENT 'from deserializer')    |
| ROW FORMAT SERDE                                   |
|   'org.apache.iceberg.mr.hive.HiveIcebergSerDe'    |
| STORED BY                                          |
|   'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'  |
|                                                    |
| LOCATION                                           |
|   'hdfs://nnha/user/iceberg/warehouse/iceberg_db/student' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'engine.hive.enabled'='true',                    |
|   'external.table.purge'='TRUE',                   |
|   'metadata_location'='hdfs://nnha/user/iceberg/warehouse/iceberg_db/student/metadata/00000-268013a1-d3a9-4c41-b02d-c46cbb6fcb53.metadata.json',  |
|   'table_type'='ICEBERG',                          |
|   'transient_lastDdlTime'='1645068339',            |
|   'uuid'='29e4e983-6c71-473c-980a-6c391f3cd993')   |
+----------------------------------------------------+
20 rows selected (0.318 seconds)
0: jdbc:hive2://hive1:10000>

虽然Iceberg的表是分区表,但是查看Hive表结构是看不到分区信息的。且目前不支持计算列作为分区列

查看HDFS路径如下。和Hive Catalog的表还是有区别的

[root@hive1 ~]# hadoop fs -ls /user/iceberg/warehouse/iceberg_db/employee/metadata
Found 2 items
-rw-r--r--   1 root supergroup       2119 2022-02-17 12:51 /user/iceberg/warehouse/iceberg_db/employee/metadata/v1.metadata.json
-rw-r--r--   1 root supergroup          1 2022-02-17 12:51 /user/iceberg/warehouse/iceberg_db/employee/metadata/version-hint.text
[root@hive1 ~]#
4.3 删除表
0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> drop table if exists default.my_user;
No rows affected (0.215 seconds)
0: jdbc:hive2://hive1:10000> 

删除表,需要注意HDFS上的目录是否删除

5. 插入数据

hive-site.xml配置 iceberg.mr.commit.table.thread.pool.size:默认10,向多个表插入数据的线程池大小 iceberg.mr.commit.file.thread.pool.size:默认10,当向多个表插入数据,写入多个文件的线程池大小

因为如果插入'2022-02-01'到date类型的birthday字段,会报错误java.lang.IllegalStateException: Not an instance of java.lang.Integer: 2022-02-01,这是Iceberg的Bug,所以这里插入null。也可以在建表的时候使用字符串类型来代替

0: jdbc:hive2://hive1:10000> insert into iceberg_db.student(id, name, birthday, country) 
. . . . . . . . . . . . . .> values(1, 'zhang_san', null, 'china'),
. . . . . . . . . . . . . .> (2, 'zhang_san', null, 'china');
No rows affected (135.859 seconds)
0: jdbc:hive2://hive1:10000> insert into iceberg_db.student(id, name, birthday, country) 
. . . . . . . . . . . . . .> select 3, 'zhang_san', null, 'china';
No rows affected (121.076 seconds)
0: jdbc:hive2://hive1:10000> 

对于insert into db.tb2(col1, col2) select col1, col2 from db.tb1;

  1. 如果db.tb1的表格式是parquet格式,则会报错:java.lang.UnsupportedOperationException: Parquet support not yet supported for Pig and Hive,可以通过建表时指定表属性tblproperties('write.format.default'='orc')来解决。
  2. 第二个报错如下,这个是Iceberg的Bug,暂时还没解决,只能将db.tb1的数据按行插入到db.tb2
Error: java.io.IOException: java.lang.NullPointerException
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerChain.handleRecordReaderCreationException(HiveIOExceptionHandlerChain.java:97)
	at org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil.handleRecordReaderCreationException(HiveIOExceptionHandlerUtil.java:57)
	at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:420)
	at org.apache.hadoop.hive.ql.io.CombineHiveInputFormat.getRecordReader(CombineHiveInputFormat.java:702)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.(MapTask.java:176)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:445)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:350)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:178)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:172)
Caused by: java.lang.NullPointerException
	at org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat$VectorizedOrcRecordReader.(VectorizedOrcInputFormat.java:78)
	at org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat.getRecordReader(VectorizedOrcInputFormat.java:188)
	at org.apache.iceberg.mr.hive.vector.HiveVectorizedReader.reader(HiveVectorizedReader.java:111)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:65)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:77)
	at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:196)
	at org.apache.iceberg.mr.mapreduce.IcebergInputFormat$IcebergRecordReader.newOrcIterable(IcebergInputFormat.java:398)
	at org.apache.iceberg.mr.mapreduce.IcebergInputFormat$IcebergRecordReader.openTask(IcebergInputFormat.java:288)
	at org.apache.iceberg.mr.mapreduce.IcebergInputFormat$IcebergRecordReader.open(IcebergInputFormat.java:308)
	at org.apache.iceberg.mr.mapreduce.IcebergInputFormat$IcebergRecordReader.initialize(IcebergInputFormat.java:231)
	at org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader.(AbstractMapredIcebergRecordReader.java:40)
	at org.apache.iceberg.mr.hive.vector.HiveIcebergVectorizedRecordReader.(HiveIcebergVectorizedRecordReader.java:40)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.apache.iceberg.common.DynConstructors$Ctor.newInstanceChecked(DynConstructors.java:60)
	at org.apache.iceberg.common.DynConstructors$Ctor.newInstance(DynConstructors.java:73)
	at org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getRecordReader(HiveIcebergInputFormat.java:114)
	at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:417)
	... 9 more

插入数据到多个表

from db.tb1 
insert into db.tb2 select col1, col2 
insert into db.tb3 select col3, col4; 

向一个表插入数据完成,才会向另一个表插入数据。所以只能保证一个表的数据插入原子性,不能保证整个操作的数据插入原子性

6. 查询数据

特点

  1. 支持where条件的谓词下推
  2. 支持select字段的投影
  3. 支持Hive的Mapreduce和Tez计算引擎

hive-site.xml配置 iceberg.mr.reuse.containers:默认false,查询avro格式的数据是否再次使用容器 iceberg.mr.case.sensitive:默认true,select查询是否大小写敏感

7. 字段类型转换及开启

hive-site.xml配置 iceberg.mr.schema.auto.conversion:默认false,是否开启Hive和Iceberg之间的字段类型转换

当通过Hive创建Iceberg表,和向Iceberg表写入数据时,会使用下面的字段类型转换规则

HiveIcebergNotesbooleanbooleanshortintegerauto-conversionbyteintegerauto-conversionintegerintegerlonglongfloatfloatdoubledoubledatedatetimestamptimestamp without timezonetimestamplocaltztimestamp with timezoneHive 3 onlyinterval_year_monthnot supportedinterval_day_timenot supportedcharstringauto-conversionvarcharstringauto-conversionstringstringbinarybinarydecimaldecimalstructstructlistlistmapmapunionnot supported
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0361s