Flink的版本号为:1.12 根据最新的版本来研究下Flink的批流统一
其实我最想解决的就是Flink能否像Hive 一样来处理大批量数据拆分计算,最后合并。
虽然我知道Flink跟MapReduce都是运行于Yarn的,Hive是基于MapReduce来做大批量任务分布式计算的。
参考网站:
- Apache Flink 1.12 Documentation: Table API & SQL
Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
- Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
- Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。
Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。-------------可以相互结合使用cuiyaonan2000@163.com
注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。
Planner ---------这个相当于HivePlanner 的作用主要是把关系型的操作翻译成可执行的、经过优化的 Flink 任务。-----------这个才是想要的东西
从1.9开始,Flink 提供了两个 Table Planner 实现来执行 Table API 和 SQL 程序:Blink Planner 和 Old Planner,Old Planner 在1.9之前就已经存在了。
对于生产环境,我们建议使用在1.11版本之后已经变成默认的Blink Planner。
如果是Java则需要引入如下的jar
org.apache.flink
flink-table-api-java-bridge_2.11
1.12.0
provided
org.apache.flink
flink-table-planner-blink_2.11
1.12.0
provided
org.apache.flink
flink-streaming-scala_2.11
1.12.0
provided
org.apache.flink
flink-table-common
1.12.0
provided
概念与通用 API-------核心操作对象Table
Table API 和 SQL 集成在同一套 API 中。这套 API 的核心概念是Table------------------所以我们使用planner就是操作的table,table的操作才分解成Flink的算子。这样子我们就可以做大批量分布式计算cuiyaonan2000@163.com
Table 可以用作查询的输入和输出。
本文介绍了 Table API 和 SQL 查询程序的通用结构、如何注册 Table
、如何查询 Table
以及如何输出 Table
。
- Blink 将批处理作业视作流处理的一种特例。严格来说,
Table
和DataSet
之间不支持相互转换,并且批处理作业也不会转换成DataSet
程序而是转换成DataStream
程序,流处理作业也一样。 - Blink 计划器不支持
BatchTableSource
,而是使用有界的StreamTableSource
来替代。-------------Bilnk没有使用批量数据源,而是使用了流数据源 - 旧计划器和 Blink 计划器中
FilterableTableSource
的实现是不兼容的。旧计划器会将PlannerExpression
下推至FilterableTableSource
,而 Blink 计划器则是将Expression
下推。 - 基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 配置 )
PlannerConfig
在两种计划器中的实现(CalciteConfig
)是不同的。--------------PlannerConfig
区分Blink Planner 和 Old Planner ,所以配置不一样- Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),
TableEnvironment
和StreamTableEnvironment
都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。----Blink的sink是有向无环图 - 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE table1 ... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("table1").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable");
tableResult...
创建 TableEnvironment 的方式
TableEnvironment
是 Table API 和 SQL 的核心概念。它负责:
- 在内部的 catalog 中注册
Table ----在catalog内创建table
- 注册外部的 catalog
----在catalog外创建table
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数 (scalar、table 或 aggregation)
- 将
DataStream
或DataSet
转换成Table
- 持有对
ExecutionEnvironment
或StreamExecutionEnvironment
的引用
Table
总是与特定的 TableEnvironment
绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
在 Catalog 中创建表-------------在Catalog外创建呢
TableEnvironment
维护着一个由标识符(identifier)创建的表 catalog 的映射。
标识符由三个部分组成:catalog 名称、数据库名称,表名称。如果 catalog 或者数据库没有指明,就会使用当前默认值。 --------即TableEnviroment 下对应多个Catalog,每个catalog对应多个数据库,每个数据库对应多个表 cuiyaonan2000@163.com
表的类型Table
可以是虚拟的(视图VIEWS
):视图VIEWS
可以从已经存在的Table
中创建,一般是 Table API 或者 SQL 的查询结果- 也可以是常规的(表
TABLES
):表TABLES
描述的是外部数据,例如文件、数据库表或者消息队列。
- 临时表:与单个 Flink 会话(session)的生命周期相关,永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。
- 永久表:在多个 Flink 会话和群集(cluster)中可见。临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
可以使用与已存在的永久表相同的标识符去注册临时表。临时表会屏蔽永久表,并且只要临时表存在,永久表就无法访问。所有使用该标识符的查询都将作用于临时表。----------即如果临时表的catalog 名称、数据库名称,表名称与永久表的一样,则只会使用临时表cuiyaonan2000@163.com
创建虚拟表// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table projTable = tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", projTable);
//另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统
//例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("MyTable")
catalog 名称、数据库名称,表名称的应用-------------创建虚拟表,永久表都需要指定
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("exampleView", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.exampleView", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.exampleView", table);
查询表操作
Table API 方式
- Table API 是基于
Table
类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。 - 这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。
- 一些关系操作由多个方法调用组成,例如
table.groupBy(...).select()
,其中groupBy(...)
指定table
的分组,而select(...)
在table
分组上的投影。
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("Orders");
// compute revenue for all customers from France
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName")
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
// emit or convert Table
// execute query
SQL方式
/ get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
//下面的示例演示了如何指定查询并将结果作为 Table 对象返回。
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
//如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中。
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
"INSERT INTO RevenueFrance " +
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
混用 Table API 和 SQL
Table API 和 SQL 查询的混用非常简单因为它们都返回 Table
对象:
- 可以在 SQL 查询返回的
Table
对象上定义 Table API 查询。 - 在
TableEnvironment
中注册的结果表可以在 SQL 查询的FROM
子句中引用,通过这种方法就可以在 Table API 查询的结果上定义 SQL 查询。
Table
通过写入 TableSink
输出。TableSink
是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。
批处理 Table
只能写入 BatchTableSink
,而流处理 Table
需要指定写入 AppendStreamTableSink
,RetractStreamTableSink
或者 UpsertStreamTableSink
。
方法 Table.executeInsert(String tableName)
将 Table
发送至已注册的 TableSink
。该方法通过名称在 catalog 中查找 TableSink
并确认Table
schema 和 TableSink
schema 一致。
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.build();
tableEnv.createTemporaryTable("CsvSinkTable", TableDescriptor.forConnector("filesystem")
.schema(schema)
.option("path", "/path/to/file")
.format(FormatDescriptor.forFormat("csv")
.option("field-delimiter", "|")
.build())
.build());
// compute a result Table using Table API operators and/or SQL queries
Table result = ...;
// Prepare the insert into pipeline
TablePipeline pipeline = result.insertInto("CsvSinkTable");
// Print explain details
pipeline.printExplain();
// emit the result Table to the registered TableSink
pipeline.execute();
Blink Planner的通用方法
不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
- 优化逻辑执行计划
- 翻译成 DataStream 程序
Table API 或者 SQL 查询在下列情况下会被翻译:
- 当
TableEnvironment.executeSql()
被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。 - 当
Table.executeInsert()
被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。 - 当
Table.execute()
被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。 - 当
StatementSet.execute()
被调用时。Table
(通过StatementSet.addInsert()
输出给某个Sink
)和 INSERT 语句 (通过调用StatementSet.addInsertSql()
)会先被缓存到StatementSet
中,StatementSet.execute()
方法被调用时,所有的 sink 会被优化成一张有向无环图。 - 当
Table
被转换成DataStream
时(参阅与 DataStream 和 DataSet API 结合)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用StreamExecutionEnvironment.execute()
时被执行。
视图
在 TableEnvironment
中可以将 DataStream
或 DataSet
注册成视图。结果视图的 schema 取决于注册的 DataStream
或 DataSet
的数据类型。请参阅文档 数据类型到 table schema 的映射获取详细信息。
注意: 通过 DataStream
或 DataSet
创建的视图只能注册成临时视图。
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream stream = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("myTable", stream);
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("myTable2", stream, $("myLong"), $("myString"));
将 DataStream 或 DataSet 转换成表
与在 TableEnvironment
注册 DataStream
或 DataSet
不同,DataStream 和 DataSet 还可以直接转换成 Table
。如果你想在 Table API 的查询中使用表,这将非常便捷
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, $("myLong"), $("myString"));
将表转换成 DataStream 或 DataSet
Table
可以被转换成 DataStream
或 DataSet
。
通过这种方式,定制的 DataSet 或 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。(注意这里的定制的,相当于我们要有个编解码的类来实现将表转换成DataStream或DataSet cuiyaonan2000@163.com)
将 Table
转换为 DataStream
或者 DataSet
时,你需要指定生成的 DataStream
或者 DataSet
的数据类型,即,Table
的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row
。以下列表概述了不同选项的功能:
- Row: 字段按位置映射,字段数量任意,支持
null
值,无类型安全(type-safe)检查。 - POJO: 字段按名称映射(POJO 必须按
Table
中字段名称命名),字段数量任意,支持null
值,无类型安全检查。 - Case Class: 字段按位置映射,不支持
null
值,有类型安全检查。 - Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持
null
值,无类型安全检查。 - Atomic Type:
Table
必须有一个字段,不支持null
值,有类型安全检查。
后面的就在实践中增加吧