目录
1. calalog类型
1.1 GenericInMemoryCatalog
- 1. calalog类型
- 1.1 GenericInMemoryCatalog
- 1.2 JdbcCatalog
- 1.3 HiveCatalog
- 2. catalog SQL语句使用
- 2.1 数据库
- 2.2 表
默认的类型。对meta-object名称是大小写敏感的。默认的catalog为default_catalog,该catalog下有一个默认的数据库default_database
1.2 JdbcCatalog目前只支持Postgres数据库
1.3 HiveCatalog将所有的meta-object名称保存为小写
有两个作用:
- 保存Flink的metadata。此metadata并不能被Hive使用
- 读写已经存在的Hive metadata,即读写Hive中的数据。该方法创建的表,需要设置属性
'connector'='hive'
HiveCatalog的安装
- 添加依赖到Flink所有服务器的lib目录下,然后重启Flink
[root@flink1 ~]#
[root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.3/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar
[root@flink1 ~]#
[root@flink1 ~]# scp /root/flink-1.14.3/lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar root@flink2:/root/flink-1.14.3/lib/
flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar 100% 46MB 75.9MB/s 00:00
[root@flink1 ~]# scp /root/flink-1.14.3/lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar root@flink3:/root/flink-1.14.3/lib/
flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar 100% 46MB 75.2MB/s 00:00
[root@flink1 ~]#
- 先在Flink集群所有服务器创建Hive配置目录hive_conf,然后将Hive集群的conf配置目录下的所有文件,复制到Flink集群的hive_conf
[root@flink1 ~]#
[root@flink1 ~]# mkdir /root/flink-1.14.3/hive_conf
[root@flink1 ~]#
[root@hive1 ~]#
[root@hive1 ~]# scp /root/apache-hive-3.1.2-bin/conf/* root@192.168.23.101:/root/flink-1.14.3/hive_conf
添加Hive所有服务器的IP映射,到Flink集群所有服务器的/etc/hosts
- 在Hive数据库创建Flink metadata数据库
0: jdbc:hive2://hive1:10000>
0: jdbc:hive2://hive1:10000> create database flink;
INFO : Compiling command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e): create database flink
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Semantic Analysis Completed (retrial = false)
INFO : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO : Completed compiling command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e); Time taken: 1.006 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e): create database flink
INFO : Starting task [Stage-0:DDL] in serial mode
INFO : Completed executing command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e); Time taken: 3.957 seconds
INFO : OK
INFO : Concurrency mode is disabled, not creating a lock manager
No rows affected (5.604 seconds)
0: jdbc:hive2://hive1:10000>
- 在Flink中创建临时的catalog,并使用catalog。退出SQL-Client此catalog就不存在了
Flink SQL> create catalog my_hive with(
> 'type' = 'hive',
> 'default-database' = 'flink',
> 'hive-conf-dir' = '/root/flink-1.14.3/hive_conf'
> );
[INFO] Execute statement succeed.
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| my_hive |
+-----------------+
2 rows in set
Flink SQL>
Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.
Flink SQL>
也可以将hive_conf目录上传到Flink集群所在的HDFS的根目录下,然后就可以配置'hive-conf-dir' = 'hdfs://nnha/hive_conf'
在Flink中创建永久的catalog,在Flink集群所有服务器新增如下文件,文件内容如下
[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml
execution:
type: streaming
current-catalog: default_catalog
current-database: default_database
catalogs:
- name: my_hive
type: hive
default-database: flink
hive-conf-dir: /root/flink-1.14.3/hive_conf
[root@flink1 ~]#
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml
测试的时候,运行show catalogs
并没有my_hive catalog,查看flink日志也没有报错。只能用上面介绍的创建临时catalog的方法
- 在Flink的Hive catalog中创建表,Hive的flink数据库也会有表信息
Flink SQL> create table blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.
Flink SQL>
0: jdbc:hive2://hive1:10000>
0: jdbc:hive2://hive1:10000> use flink;
0: jdbc:hive2://hive1:10000>
0: jdbc:hive2://hive1:10000> show tables;
+------------------+
| tab_name |
+------------------+
| blackhole_table |
+------------------+
0: jdbc:hive2://hive1:10000>
2. catalog SQL语句使用
2.1 数据库
Flink SQL> create database my_hive.flink;
[INFO] Execute statement succeed.
Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.
Flink SQL> show databases;
+--------------------+
| database name |
+--------------------+
| default |
| flink |
| test_db |
+--------------------+
3 rows in set
Flink SQL> drop database my_hive.flink;
[INFO] Execute statement succeed.
Flink SQL>
通过Scala API切换catalog和database
tEnv.useCatalog("my_hive")
tEnv.useDatabase("flink")
2.2 表
Flink SQL> create table my_hive.flink.blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.
Flink SQL>
Flink SQL> alter table my_hive.flink.blackhole_table rename to blackhole_table_new;
[INFO] Execute statement succeed.
Flink SQL>
Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.
Flink SQL> use flink;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+---------------------+
| table name |
+---------------------+
| blackhole_table_new |
+---------------------+
1 row in set
Flink SQL> drop table my_hive.flink.blackhole_table_new;
[INFO] Execute statement succeed.
Flink SQL>