您当前的位置: 首页 >  sql

Bulut0907

暂无认证

  • 5浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink Catalog(Memory、Jdbc、Hive Catalog)的使用及Catalog SQL应用

Bulut0907 发布时间:2022-02-26 10:59:00 ,浏览量:5

目录
  • 1. calalog类型
    • 1.1 GenericInMemoryCatalog
    • 1.2 JdbcCatalog
    • 1.3 HiveCatalog
  • 2. catalog SQL语句使用
    • 2.1 数据库
    • 2.2 表

1. calalog类型 1.1 GenericInMemoryCatalog

默认的类型。对meta-object名称是大小写敏感的。默认的catalog为default_catalog,该catalog下有一个默认的数据库default_database

1.2 JdbcCatalog

目前只支持Postgres数据库

1.3 HiveCatalog

将所有的meta-object名称保存为小写

有两个作用:

  1. 保存Flink的metadata。此metadata并不能被Hive使用
  2. 读写已经存在的Hive metadata,即读写Hive中的数据。该方法创建的表,需要设置属性'connector'='hive'

HiveCatalog的安装

  1. 添加依赖到Flink所有服务器的lib目录下,然后重启Flink
[root@flink1 ~]# 
[root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.3/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar
[root@flink1 ~]# 
[root@flink1 ~]# scp /root/flink-1.14.3/lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar root@flink2:/root/flink-1.14.3/lib/
flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar                                                                                           100%   46MB  75.9MB/s   00:00    
[root@flink1 ~]# scp /root/flink-1.14.3/lib/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar root@flink3:/root/flink-1.14.3/lib/
flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar                                                                                           100%   46MB  75.2MB/s   00:00    
[root@flink1 ~]# 
  1. 先在Flink集群所有服务器创建Hive配置目录hive_conf,然后将Hive集群的conf配置目录下的所有文件,复制到Flink集群的hive_conf
[root@flink1 ~]# 
[root@flink1 ~]# mkdir /root/flink-1.14.3/hive_conf
[root@flink1 ~]# 



[root@hive1 ~]# 
[root@hive1 ~]# scp /root/apache-hive-3.1.2-bin/conf/* root@192.168.23.101:/root/flink-1.14.3/hive_conf

添加Hive所有服务器的IP映射,到Flink集群所有服务器的/etc/hosts

  1. 在Hive数据库创建Flink metadata数据库
0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> create database flink;
INFO  : Compiling command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e): create database flink
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:null, properties:null)
INFO  : Completed compiling command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e); Time taken: 1.006 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e): create database flink
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=root_20220211131805_c4a64ff2-5277-4c6a-8e59-cf9ea8e5fd6e); Time taken: 3.957 seconds
INFO  : OK
INFO  : Concurrency mode is disabled, not creating a lock manager
No rows affected (5.604 seconds)
0: jdbc:hive2://hive1:10000>
  1. 在Flink中创建临时的catalog,并使用catalog。退出SQL-Client此catalog就不存在了
Flink SQL> create catalog my_hive with(
> 'type' = 'hive',
> 'default-database' = 'flink',
> 'hive-conf-dir' = '/root/flink-1.14.3/hive_conf'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|         my_hive |
+-----------------+
2 rows in set

Flink SQL>

Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.

Flink SQL> 

也可以将hive_conf目录上传到Flink集群所在的HDFS的根目录下,然后就可以配置'hive-conf-dir' = 'hdfs://nnha/hive_conf'

在Flink中创建永久的catalog,在Flink集群所有服务器新增如下文件,文件内容如下

[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml 
execution:
    type: streaming
    current-catalog: default_catalog
    current-database: default_database
    
catalogs:
  - name: my_hive
    type: hive
    default-database: flink
    hive-conf-dir: /root/flink-1.14.3/hive_conf

[root@flink1 ~]# 
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml

测试的时候,运行show catalogs并没有my_hive catalog,查看flink日志也没有报错。只能用上面介绍的创建临时catalog的方法

  1. 在Flink的Hive catalog中创建表,Hive的flink数据库也会有表信息
Flink SQL> create table blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.

Flink SQL>
0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> use flink;
0: jdbc:hive2://hive1:10000> 
0: jdbc:hive2://hive1:10000> show tables;
+------------------+
|     tab_name     |
+------------------+
| blackhole_table  |
+------------------+
0: jdbc:hive2://hive1:10000>
2. catalog SQL语句使用 2.1 数据库
Flink SQL> create database my_hive.flink;
[INFO] Execute statement succeed.

Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+--------------------+
|      database name |
+--------------------+
|            default |
|              flink |
|            test_db |
+--------------------+
3 rows in set

Flink SQL> drop database my_hive.flink;
[INFO] Execute statement succeed.

Flink SQL> 

通过Scala API切换catalog和database

    tEnv.useCatalog("my_hive")
    tEnv.useDatabase("flink")
2.2 表
Flink SQL> create table my_hive.flink.blackhole_table(
> name string
> ) with ('connector' = 'blackhole');
[INFO] Execute statement succeed.

Flink SQL>
Flink SQL> alter table my_hive.flink.blackhole_table rename to blackhole_table_new;
[INFO] Execute statement succeed.

Flink SQL>
Flink SQL> use catalog my_hive;
[INFO] Execute statement succeed.

Flink SQL> use flink;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+---------------------+
|          table name |
+---------------------+
| blackhole_table_new |
+---------------------+
1 row in set

Flink SQL> drop table my_hive.flink.blackhole_table_new;
[INFO] Execute statement succeed.

Flink SQL> 
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0411s