您当前的位置: 首页 >  flink

Bulut0907

暂无认证

  • 5浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

使用Flink1.14.4 + Hudi0.11.0构建数据湖

Bulut0907 发布时间:2022-06-07 09:20:35 ,浏览量:5

目录
  • 1. Flink集成Hudi
  • 2. 创建表
  • 3. alter表
  • 4. 插入数据
  • 5. 查询数据
  • 6. update数据
  • 7. Streaming查询
  • 8. Delete数据

1. Flink集成Hudi

0.11.0版本的Hudi,可以和Scala2.12版本的Flink1.14.x一起使用。Hudi使用Flink需要4个Slot,可以增加每个taskmanager的taskmanager.numberOfTaskSlots参数;也可以设置workers文件,在同一个服务器启动多个taskmanager

将Hudi的依赖包(已包含flink-parquet和flink-avro formats)下载放到Flink集群所有服务器的lib目录下,然后重启Flink

[root@bigdata001 ~]# wget -P /root/flink-1.14.4/lib https://repo.maven.apache.org/maven2/org/apache/hudi/hudi-flink-bundle_2.12/0.11.0/hudi-flink1.14-bundle_2.12-0.11.0.jar
[root@bigdata001 ~]#
[root@bigdata001 ~]# scp /root/flink-1.14.4/lib/hudi-flink1.14-bundle_2.12-0.11.0.jar root@flink2:/root/flink-1.14.4/lib
hudi-flink1.14-bundle_2.12-0.11.0.jar                                                                                                       100%   29MB  79.3MB/s   00:00    
[root@bigdata001 ~]# scp /root/flink-1.14.4/lib/hudi-flink1.14-bundle_2.12-0.11.0.jar root@flink3:/root/flink-1.14.4/lib
hudi-flink1.14-bundle_2.12-0.11.0.jar                                                                                                        100%   29MB  81.5MB/s   00:00    
[root@bigdata001 ~]#

然后启动SQL Client就可以了

2. 创建表
Flink SQL> create table my_user_old(
> user_id bigint primary key not enforced,
> user_name string,
> birthday date,
> country string
> ) partitioned by (birthday, country) with (
> 'connector' = 'hudi',
> 'path' = 'hdfs://nnha/user/hudi/warehouse/hudi_db/my_user',
> 'table.type' = 'MERGE_ON_READ'
> );
[INFO] Execute statement succeed.

Flink SQL>
  • table.type:默认是COPY_ON_WRITE
3. alter表
Flink SQL> alter table my_user_old rename to my_user;
[INFO] Execute statement succeed.

Flink SQL>

改变的只是Flink的元数据,并没有改变HDFS上的路径和数据

4. 插入数据
Flink SQL> insert into my_user(user_id, user_name, birthday, country) 
> values(1, 'zhang_san', date '2022-02-01', 'china'), 
> (2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c807e7af16ccdd4dbdf3d3a3a34b3ef9


Flink SQL> 

此时HDFS上的文件结构如下:

hudi_db/my_user
				|---.hoodie
							|---.aux
									|---.bootstrap
												|---.fileids(目录)
												|---.partitions(目录)
									|---ckp_meta
									   			|---20220326233339019.COMPLETED
									   			|---20220326233339019.INFLIGHT
									|---view_storage_conf.properties
							|---.temp(目录)
							|---20220326233339019.deltacommit
							|---20220326233339019.deltacommit.inflight
							|---20220326233339019.deltacommit.requested
							|---archived(目录)
							|---hoodie.properties
				|---2022-02-01/china
				    				|---.eaf848c8-95e3-4a83-b6da-37d288c6287a_20220326233339019.log.1_3-4-0
				    				|---.hoodie_partition_metadata
				|---2022-02-02/japan
									|---.8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859_20220326233339019.log.1_0-4-0
									|---.hoodie_partition_metadata
5. 查询数据
Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.

Flink SQL> select * from my_user;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op |              user_id |                      user_name |   birthday |                        country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I |                    2 |                          li_si | 2022-02-02 |                          japan |
| +I |                    1 |                      zhang_san | 2022-02-01 |                          china |
+----+----------------------+--------------------------------+------------+--------------------------------+
Received a total of 2 rows

Flink SQL>

查询数据时,会对表的数据进行merge

6. update数据
Flink SQL> insert into my_user(user_id, user_name, birthday, country) 
> values (2, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7f6e582d6bd9d665514968ec837e5ba5


Flink SQL>
Flink SQL> select * from my_user;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op |              user_id |                      user_name |   birthday |                        country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I |                    2 |                        wang_wu | 2022-02-02 |                          japan |
| +I |                    1 |                      zhang_san | 2022-02-01 |                          china |
+----+----------------------+--------------------------------+------------+--------------------------------+
Received a total of 2 rows

Flink SQL>
  • update数据根据分区字段和primary key确定数据的唯一性的
  • 对于HDFS数据的save mode还是append,每一次write都是由instant time表示的commit

此时HDFS上的文件结构如下:

hudi_db/my_user
				|---.hoodie
							|---.aux
									|---.bootstrap
												|---.fileids(目录)
												|---.partitions(目录)
									|---ckp_meta
									   			|---20220326233339019.COMPLETED
									   			|---20220326233339019.INFLIGHT
									|---view_storage_conf.properties
							|---.temp(目录)
							|---20220326233339019.deltacommit
							|---20220326233339019.deltacommit.inflight
							|---20220326233339019.deltacommit.requested
							|---20220326234620162.deltacommit
							|---20220326234620162.deltacommit.inflight
							|---20220326234620162.deltacommit.requested
							|---archived(目录)
							|---hoodie.properties
				|---2022-02-01/china
				    				|---.eaf848c8-95e3-4a83-b6da-37d288c6287a_20220326233339019.log.1_3-4-0
				    				|---.hoodie_partition_metadata
				|---2022-02-02/japan
									|---.8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859_20220326233339019.log.1_0-4-0
									|---.hoodie_partition_metadata

查看更新数据的文件内容如下:

[root@bigdata001 ~]# hadoop fs -cat /user/hudi/warehouse/hudi_db/my_user/2022-02-02/japan/.8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859_20220326233339019.log.1_0-4-0
#HUDI#z­{"type":"record","name":"record","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"user_id","type":"long"},{"name":"user_name","type":["null","string"],"default":null},{"name":"birthday","type":["null",{"type":"int","logicalType":"date"}],"default":null},{"name":"country","type":["null","string"],"default":null}]}20220326233339019"20220326233339019*20220326233339019_0_2user_id:2 2022-02-02/japanH8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859 
li_si¢© 
japan#HUDI#|­{"type":"record","name":"record","fields":[{"name":"_hoodie_commit_time","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_commit_seqno","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_record_key","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_partition_path","type":["null","string"],"doc":"","default":null},{"name":"_hoodie_file_name","type":["null","string"],"doc":"","default":null},{"name":"user_id","type":"long"},{"name":"user_name","type":["null","string"],"default":null},{"name":"birthday","type":["null",{"type":"int","logicalType":"date"}],"default":null},{"name":"country","type":["null","string"],"default":null}]}20220326234620162"20220326234620162*20220326234620162_0_1user_id:2 2022-02-02/japanH8ffd5d1e-abd4-4ad7-ae43-c3c9aa34a859wang_wu¢© 
japan[root@bigdata001 ~]#
7. Streaming查询

需要将hive-exec-2.3.1.jar(因为Hudi集成Hive的是2.3.1版本)放到Flink集群所以服务器的lib目录下,然后重启Flink集群。不然会报如下错误

2022-03-27 12:57:19,913 INFO  org.apache.hudi.common.table.HoodieTableMetaClient           [] - Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=PARQUET) from hdfs://nnha/user/hudi/warehouse/hudi_db/my_user
2022-03-27 12:57:19,915 INFO  org.apache.hudi.common.table.timeline.HoodieActiveTimeline   [] - Loaded instants upto : Option{val=[20220326234620162__deltacommit__COMPLETED]}
2022-03-27 12:57:19,918 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: split_monitor(table=[my_user], fields=[user_id, user_name, birthday, country]) (1/1)#1047 (4e14ec1889d5400fa3442363f51fa4b3) switched from RUNNING to FAILED with failure cause: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat
......省略部分......
	at org.apache.hudi.sink.partitioner.profile.WriteProfiles.getCommitMetadata(WriteProfiles.java:194)
	at org.apache.hudi.source.IncrementalInputSplits.lambda$inputSplits$0(IncrementalInputSplits.java:183)
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.apache.hudi.source.IncrementalInputSplits.inputSplits(IncrementalInputSplits.java:183)
	at org.apache.hudi.source.StreamReadMonitoringFunction.monitorDirAndForwardSplits(StreamReadMonitoringFunction.java:195)
	at org.apache.hudi.source.StreamReadMonitoringFunction.run(StreamReadMonitoringFunction.java:168)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

需要在Flink的Catalog重新创建表

Flink SQL> create table my_user(
> user_id bigint primary key not enforced,
> user_name string,
> birthday date,
> country string
> ) partitioned by (birthday, country) with (
> 'connector' = 'hudi',
> 'path' = 'hdfs://nnha/user/hudi/warehouse/hudi_db/my_user',
> 'table.type' = 'MERGE_ON_READ',
> 'read.streaming.enabled' = 'true',  
> 'read.streaming.start-commit' = '20220326233339019', 
> 'read.streaming.check-interval' = '10' 
> );
[INFO] Execute statement succeed.

Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.

Flink SQL>
  • read.streaming.start-commit:从某次commit的instant time之后开始读取数据,但该commit读取不到
  • read.streaming.check-interval:读取新的commit的时间间隔,默认60秒

查询数据

Flink SQL> select * from my_user;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op |              user_id |                      user_name |   birthday |                        country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I |                    2 |                        wang_wu | 2022-02-02 |                          japan |

8. Delete数据

支持Delete数据

关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0451s