目录
1. pom.xml依赖
- 1. pom.xml依赖
- 2. 使用Table处理有界和无界数据流例子
- 3. 使用SQL处理有界和无界数据流例子
- 3. Table API和SQL的概念
- 3.1 TableEnvironment的创建
- 3.2 Table API和SQL的程序结构
- 3.3 Table API和SQL的简单使用
- 3.3.1 聚合查询
org.apache.flink
flink-table-api-scala-bridge_2.11
1.14.3
provided
org.apache.flink
flink-table-planner_2.11
1.14.3
provided
org.apache.flink
flink-table-common
1.14.3
provided
2. 使用Table处理有界和无界数据流例子
- 处理有界数据流
import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT}
import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations}
object BatchTableTest {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance()
.inBatchMode().build()
val tEnv = TableEnvironment.create(settings)
// 定义数据类型
val MyOrder = ROW(FIELD("id", BIGINT()),
FIELD("product", STRING()),
FIELD("amount", INT()))
val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
row(2L, "Tesla", 8),
row(2L, "Tesla", 8),
row(3L, "BYD", 20))
val filtered = table.where($("amount").isGreaterOrEqual(8))
// 调用execute,数据被collect到Job Manager
filtered.execute().print()
}
}
结果如下:
+----------------------+--------------------------------+-------------+
| 2 | Tesla | 8 |
| 2 | Tesla | 8 |
| 3 | BYD | 20 |
+----------------------+--------------------------------+-------------+
3 rows in set
- 处理无界数据流
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object StreamTableTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(senv, bsSettings)
// 此方式定义的tEnv不能使用fromDataStream函数
// val tEnv = TableEnvironment.create(bsSettings)
var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction())
val table = tEnv.fromDataStream(dataStream).as("word")
val filtered = table.where($("word").like("%t%"))
val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN)
println(explain)
tEnv.toDataStream(filtered).print("table")
senv.execute()
}
}
- statementSet.explain(explainDetail)返回多个Sink的执行计划结果
- tEnv.explainSql(sql, explainDetail)
- tEnv.executeSql(“explaine select * from test”).print()
结果如下:
== Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')])
+- LogicalProject(word=[AS($0, _UTF-16LE'word')])
+- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_1]])
== Optimized Physical Plan ==
Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])
+- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_1]], fields=[f0])
== Optimized Execution Plan ==
Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])
+- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_1]], fields=[f0])
== Physical Execution Plan ==
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 3,
"type" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
"pact" : "Operator",
"contents" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
"pact" : "Operator",
"contents" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 3,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
table:3> +I[batch]
table:4> +I[batch]
table:5> +I[batch]
table:6> +I[table]
table:7> +I[stream]
......省略部分......
3. 使用SQL处理有界和无界数据流例子
- 处理有界数据流
import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal}
object SqlBatchDemo {
def main(args: Array[String]): Unit = {
val settings = EnvironmentSettings.newInstance()
.inBatchMode().build()
val tEnv = TableEnvironment.create(settings)
val MyOrder = ROW(FIELD("id", BIGINT()),
FIELD("product", STRING()),
FIELD("amount", INT())
)
val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
row(2L, "Tesla", 8),
row(2L, "Tesla", 8),
row(3L, "BYD", 20))
tEnv.createTemporaryView("myOrder",input)
val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product")
table.execute().print()
}
}
结果如下:
+--------------------------------+-------------+
| product | amount |
+--------------------------------+-------------+
| BMW | 1 |
| Tesla | 16 |
| BYD | 20 |
+--------------------------------+-------------+
3 rows in set
- 处理无界数据流
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object SqlStreamDemo {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance()
.inStreamingMode().build()
val tEnv = StreamTableEnvironment.create(senv,bsSettings)
val stream:DataStream[String] = senv.addSource(new WordSourceFunction())
val table = tEnv.fromDataStream(stream).as("word")
val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'")
tEnv.toDataStream(result).print()
println(senv.getExecutionPlan)
senv.execute()
}
}
结果如下:
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 3,
"type" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
"pact" : "Operator",
"contents" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
"pact" : "Operator",
"contents" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
"parallelism" : 1,
"predecessors" : [ {
"id" : 3,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "TableToDataSteam(type=ROW NOT NULL, rowtime=false)",
"pact" : "Operator",
"contents" : "TableToDataSteam(type=ROW NOT NULL, rowtime=false)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 6,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 8,
"predecessors" : [ {
"id" : 5,
"ship_strategy" : "REBALANCE",
"side" : "second"
} ]
} ]
}
1> +I[batch]
2> +I[stream]
3> +I[stream]
4> +I[table]
5> +I[batch]
......省略部分......
3. Table API和SQL的概念
3.1 TableEnvironment的创建
- Table API和SQL都是基于Table接口,catalog相同
- Table API和SQL都先用Apache Calcite来解析优化等,再用Blink planner进行解析优化等,Stream模式和Batch模式最后都转化成DataStream API的Transformation
- EnvironmentSettings方式
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode() // 用于stream模式
//.inBatchMode() // 用于batch模式
.build()
val tEnv = TableEnvironment.create(settings)
- 不能和DataStream进行交互
- 不支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
- StreamTableEnvironment方式
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
- 可以和DataStream进行交互
- 支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
mysql中表user1的结构和数据,和表user2的结构如下:
mysql>
mysql> show create table flink_test.user1;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user1 | CREATE TABLE `user1` (
`id` bigint NOT NULL,
`name` varchar(128) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
mysql>
mysql> select * from flink_test.user1;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | yi | 1 |
| 2 | er | 2 |
| 1 | san | 3 |
+----+------+------+
3 rows in set (0.00 sec)
mysql>
mysql> show create table flink_test.user2;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user2 | CREATE TABLE `user2` (
`id` bigint NOT NULL,
`name` varchar(128) DEFAULT NULL,
`age` int DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.01 sec)
mysql>
添加pom.xml依赖
org.apache.flink
flink-connector-jdbc_2.11
1.14.3
provided
示例代码如下:
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TableSqlTest {
def main(args: Array[String]): Unit = {
// 定义Table环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
// 表定义了primary key,则以upsert(更新插入)方式插入数据
val table_str =
"""
|create temporary table %s(
| id bigint,
| name string,
| age int,
| primary key (id) not enforced
|) with (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://192.168.23.33:3306/flink_test',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = '%s',
| 'username' = 'root',
| 'password' = 'Root_123'
|)
|""".stripMargin
// 在catalog注册表
tEnv.executeSql(table_str.format("user1", "user1"))
tEnv.executeSql(table_str.format("user2", "user2"))
// =====================读取源表数据=====================
val user1 = tEnv.from("user1") // 方式一
// val user1 = tEnv.sqlQuery("select * from user1 limit 2") // 方式二
// =====================向目标表插入数据=====================
// user1.executeInsert("user2") // 方式一
val stmtSet = tEnv.createStatementSet()
stmtSet.addInsert("user2", user1) // 方式二
// stmtSet.addInsertSql("insert into user2 select * from user1 limit 2") // 方式三
stmtSet.execute()
}
}
- Batch模式只能插入BatchTableSink
- Streaming模式可以插入AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink
执行程序两次,查询mysql中的表user2数据
mysql>
mysql> select * from user2;
+----+------+------+
| id | name | age |
+----+------+------+
| 1 | yi | 1 |
| 2 | er | 2 |
| 3 | san | 3 |
+----+------+------+
3 rows in set (0.00 sec)
mysql>
3.3 Table API和SQL的简单使用
3.3.1 聚合查询
代码示例:
import org.apache.flink.table.api.Expressions.{$, lit, row}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
object TableSqlTest {
def main(args: Array[String]): Unit = {
// 定义Table环境
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build()
val tEnv = TableEnvironment.create(settings)
val table = tEnv.fromValues(
row(10, "A"),
row(20, "A"),
row(100, "B"),
row(200, "B")
).as("amount", "name")
tEnv.createTemporaryView("tmp_table", table)
val table_result = table
.filter($("amount").isGreater(lit(0)))
.groupBy($("name"))
.select($("name"), $("amount").sum().as("amount"))
table_result.execute().print()
val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name")
sql_result.execute().print()
}
}
执行结果:
+----+--------------------------------+-------------+
| op | name | amount |
+----+--------------------------------+-------------+
| +I | A | 10 |
| -U | A | 10 |
| +U | A | 30 |
| +I | B | 100 |
| -U | B | 100 |
| +U | B | 300 |
+----+--------------------------------+-------------+
6 rows in set
+----+--------------------------------+-------------+
| op | name | amount |
+----+--------------------------------+-------------+
| +I | A | 10 |
| -U | A | 10 |
| +U | A | 30 |
| +I | B | 100 |
| -U | B | 100 |
| +U | B | 300 |
+----+--------------------------------+-------------+
6 rows in set
- 聚合方式为迭代聚合,其中-U为更新前的值,+U为更新后的值