目录
1. Flink集成Hudi
- 1. Flink集成Hudi
- 2. 创建表
- 3. alter表
- 4. 插入数据
- 5. 查询数据
- 6. update数据
- 7. Streaming查询
- 8. Delete数据
0.11.0版本的Hudi,可以和Scala2.12版本的Flink1.14.x一起使用。Hudi使用Flink需要4个Slot,可以增加每个taskmanager的taskmanager.numberOfTaskSlots
参数;也可以设置workers文件,在同一个服务器启动多个taskmanager
将Hudi的依赖包(已包含flink-parquet和flink-avro formats)下载放到Flink集群所有服务器的lib目录下,然后重启Flink
[root@bigdata001 ~]# wget -P /root/flink-1.14.4/lib https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.11.0/hudi-flink1.14-bundle_2.12-0.11.0.jar
[root@bigdata001 ~]#
[root@bigdata001 ~]# scp /root/flink-1.14.4/lib/hudi-flink1.14-bundle_2.12-0.11.0.jar root@flink2:/root/flink-1.14.4/lib
hudi-flink1.14-bundle_2.12-0.11.0.jar 100% 29MB 79.3MB/s 00:00
[root@bigdata001 ~]# scp /root/flink-1.14.4/lib/hudi-flink1.14-bundle_2.12-0.11.0.jar root@flink3:/root/flink-1.14.4/lib
hudi-flink1.14-bundle_2.12-0.11.0.jar 100% 29MB 81.5MB/s 00:00
[root@bigdata001 ~]#
然后启动SQL Client就可以了
2. 创建表Flink SQL> create table my_user_old(
> user_id bigint primary key not enforced,
> user_name string,
> birthday date,
> country string
> ) partitioned by (birthday, country) with (
> 'connector' = 'hudi',
> 'path' = 'hdfs://nnha/user/hudi/warehouse/hudi_db/my_user',
> 'table.type' = 'MERGE_ON_READ'
> );
[INFO] Execute statement succeed.
Flink SQL>
- table.type:默认是COPY_ON_WRITE
Flink SQL> alter table my_user_old rename to my_user;
[INFO] Execute statement succeed.
Flink SQL>
改变的只是Flink的元数据,并没有改变HDFS上的路径和数据
4. 插入数据Flink SQL> insert into my_user(user_id, user_name, birthday, country)
> values(1, 'zhang_san', date '2022-02-01', 'china'),
> (2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c807e7af16ccdd4dbdf3d3a3a34b3ef9
Flink SQL>
此时HDFS上的文件结构如下:
hudi_db/my_user
|---.hoodie
|---.aux
|---.bootstrap
|---.fileids(目录)
|---.partitions(目录)
|---ckp_meta
|---20220326233339019.COMPLETED
|---20220326233339019.INFLIGHT
|---view_storage_conf.properties
|---.temp(目录)
|---20220326233339019.deltacommit
|---20220326233339019.deltacommit.inflight
|---20220326233339019.deltacommit.requested
|---archived(目录)
|---hoodie.properties
|---2022-02-01/china
|---.eaf848c8-95e3-4a83-b6da-37d288c6287a_20220326233339019.log.1_3-4-0
|---.hoodie_partition_metadata
|---2022-02-02/japan
|---.8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859_20220326233339019.log.1_0-4-0
|---.hoodie_partition_metadata
5. 查询数据
Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.
Flink SQL> select * from my_user;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 2 | li_si | 2022-02-02 | japan |
| +I | 1 | zhang_san | 2022-02-01 | china |
+----+----------------------+--------------------------------+------------+--------------------------------+
Received a total of 2 rows
Flink SQL>
查询数据时,会对表的数据进行merge
6. update数据Flink SQL> insert into my_user(user_id, user_name, birthday, country)
> values (2, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7f6e582d6bd9d665514968ec837e5ba5
Flink SQL>
Flink SQL> select * from my_user;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 2 | wang_wu | 2022-02-02 | japan |
| +I | 1 | zhang_san | 2022-02-01 | china |
+----+----------------------+--------------------------------+------------+--------------------------------+
Received a total of 2 rows
Flink SQL>
- update数据根据分区字段和primary key确定数据的唯一性的
- 对于HDFS数据的save mode还是append,每一次write都是由instant time表示的commit
此时HDFS上的文件结构如下:
hudi_db/my_user
|---.hoodie
|---.aux
|---.bootstrap
|---.fileids(目录)
|---.partitions(目录)
|---ckp_meta
|---20220326233339019.COMPLETED
|---20220326233339019.INFLIGHT
|---view_storage_conf.properties
|---.temp(目录)
|---20220326233339019.deltacommit
|---20220326233339019.deltacommit.inflight
|---20220326233339019.deltacommit.requested
|---20220326234620162.deltacommit
|---20220326234620162.deltacommit.inflight
|---20220326234620162.deltacommit.requested
|---archived(目录)
|---hoodie.properties
|---2022-02-01/china
|---.eaf848c8-95e3-4a83-b6da-37d288c6287a_20220326233339019.log.1_3-4-0
|---.hoodie_partition_metadata
|---2022-02-02/japan
|---.8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859_20220326233339019.log.1_0-4-0
|---.hoodie_partition_metadata
查看更新数据的文件内容如下:
[root@bigdata001 ~]# hadoop fs -cat /user/hudi/warehouse/hudi_db/my_user/2022-02-02/japan/.8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859_20220326233339019.log.1_0-4-0
#HUDI#z{"type":"record","name":"record","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"user_id","type":"long"},{"name":"user_name","type":["null","string"],"default":null},{"name":"birthday","type":["null",{"type":"int","logicalType":"date"}],"default":null},{"name":"country","type":["null","string"],"default":null}]}20220326233339019"20220326233339019*20220326233339019_0_2user_id:2 2022-02-02/japanH8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859
li_si¢©
japan#HUDI#|{"type":"record","name":"record","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"user_id","type":"long"},{"name":"user_name","type":["null","string"],"default":null},{"name":"birthday","type":["null",{"type":"int","logicalType":"date"}],"default":null},{"name":"country","type":["null","string"],"default":null}]}20220326234620162"20220326234620162*20220326234620162_0_1user_id:2 2022-02-02/japanH8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859wang_wu¢©
japan[root@bigdata001 ~]#
7. Streaming查询
需要将hive-exec-2.3.1.jar(因为Hudi集成Hive的是2.3.1版本)放到Flink集群所以服务器的lib目录下,然后重启Flink集群。不然会报如下错误
2022-03-27 12:57:19,913 INFO org.apache.hudi.common.table.HoodieTableMetaClient [] - Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from hdfs://nnha/user/hudi/warehouse/hudi_db/my_user
2022-03-27 12:57:19,915 INFO org.apache.hudi.common.table.timeline.HoodieActiveTimeline [] - Loaded instants upto : Option{val=[20220326234620162__deltacommit__COMPLETED]}
2022-03-27 12:57:19,918 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: split_monitor(table=[my_user], fields=[user_id, user_name, birthday, country]) (1/1)#1047 (4e14ec1889d5400fa3442363f51fa4b3) switched from RUNNING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat
......省略部分......
at org.apache.hudi.sink.partitioner.profile.WriteProfiles.getCommitMetadata(WriteProfiles.java:194)
at org.apache.hudi.source.IncrementalInputSplits.lambda$inputSplits$0(IncrementalInputSplits.java:183)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.hudi.source.IncrementalInputSplits.inputSplits(IncrementalInputSplits.java:183)
at org.apache.hudi.source.StreamReadMonitoringFunction.monitorDirAndForwardSplits(StreamReadMonitoringFunction.java:195)
at org.apache.hudi.source.StreamReadMonitoringFunction.run(StreamReadMonitoringFunction.java:168)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
需要在Flink的Catalog重新创建表
Flink SQL> create table my_user(
> user_id bigint primary key not enforced,
> user_name string,
> birthday date,
> country string
> ) partitioned by (birthday, country) with (
> 'connector' = 'hudi',
> 'path' = 'hdfs://nnha/user/hudi/warehouse/hudi_db/my_user',
> 'table.type' = 'MERGE_ON_READ',
> 'read.streaming.enabled' = 'true',
> 'read.streaming.start-commit' = '20220326233339019',
> 'read.streaming.check-interval' = '10'
> );
[INFO] Execute statement succeed.
Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.
Flink SQL>
- read.streaming.start-commit:从某次commit的instant time之后开始读取数据,但该commit读取不到
- read.streaming.check-interval:读取新的commit的时间间隔,默认60秒
查询数据
Flink SQL> select * from my_user;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 2 | wang_wu | 2022-02-02 | japan |
8. Delete数据
支持Delete数据