您当前的位置: 首页 >  sql

Bulut0907

暂无认证

  • 5浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink Table和SQL的TableEnvironment的创建、程序结构、简单使用示例

Bulut0907 发布时间:2021-12-30 16:54:37 ,浏览量:5

目录
  • 1. pom.xml依赖
  • 2. 使用Table处理有界和无界数据流例子
  • 3. 使用SQL处理有界和无界数据流例子
  • 3. Table API和SQL的概念
    • 3.1 TableEnvironment的创建
    • 3.2 Table API和SQL的程序结构
    • 3.3 Table API和SQL的简单使用
      • 3.3.1 聚合查询

1. pom.xml依赖

    org.apache.flink
    flink-table-api-scala-bridge_2.11
    1.14.3
    provided




    org.apache.flink
    flink-table-planner_2.11
    1.14.3
    provided




    org.apache.flink
    flink-table-common
    1.14.3
    provided

2. 使用Table处理有界和无界数据流例子
  1. 处理有界数据流

import org.apache.flink.table.api.DataTypes.{ROW, FIELD, BIGINT, STRING, INT}
import org.apache.flink.table.api.{$, EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal, AnyWithOperations}



object BatchTableTest {

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance()
      .inBatchMode().build()

    val tEnv = TableEnvironment.create(settings)

    // 定义数据类型
    val MyOrder = ROW(FIELD("id", BIGINT()),
      FIELD("product", STRING()),
      FIELD("amount", INT()))

    val table = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
      row(2L, "Tesla", 8),
      row(2L, "Tesla", 8),
      row(3L, "BYD", 20))

    val filtered = table.where($("amount").isGreaterOrEqual(8))

    // 调用execute,数据被collect到Job Manager
    filtered.execute().print()


  }
}

结果如下:

+----------------------+--------------------------------+-------------+
|                    2 |                          Tesla |           8 |
|                    2 |                          Tesla |           8 |
|                    3 |                            BYD |          20 |
+----------------------+--------------------------------+-------------+
3 rows in set
  1. 处理无界数据流

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{$, AnyWithOperations, EnvironmentSettings, ExplainDetail, TableEnvironment, string2Literal}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object StreamTableTest {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()
      .inStreamingMode().build()

    val tEnv = StreamTableEnvironment.create(senv, bsSettings)
    // 此方式定义的tEnv不能使用fromDataStream函数
    // val tEnv = TableEnvironment.create(bsSettings)

    var dataStream: DataStream[String] =senv.addSource(new WordSourceFunction())
    val table = tEnv.fromDataStream(dataStream).as("word")

    val filtered = table.where($("word").like("%t%"))
    val explain = filtered.explain(ExplainDetail.JSON_EXECUTION_PLAN)
    println(explain)

    tEnv.toDataStream(filtered).print("table")

    senv.execute()

  }
}


  • statementSet.explain(explainDetail)返回多个Sink的执行计划结果
  • tEnv.explainSql(sql, explainDetail)
  • tEnv.executeSql(“explaine select * from test”).print()

结果如下:

== Abstract Syntax Tree ==
LogicalFilter(condition=[LIKE($0, _UTF-16LE'%t%')])
+- LogicalProject(word=[AS($0, _UTF-16LE'word')])
   +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_1]])

== Optimized Physical Plan ==
Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])
+- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_1]], fields=[f0])

== Optimized Execution Plan ==
Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])
+- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_1]], fields=[f0])

== Physical Execution Plan ==
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 3,
    "type" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
    "pact" : "Operator",
    "contents" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
    "pact" : "Operator",
    "contents" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
table:3> +I[batch]
table:4> +I[batch]
table:5> +I[batch]
table:6> +I[table]
table:7> +I[stream]
......省略部分......
3. 使用SQL处理有界和无界数据流例子
  1. 处理有界数据流

import org.apache.flink.table.api.DataTypes.{BIGINT, FIELD, INT, ROW, STRING}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, row}
import org.apache.flink.table.api.{long2Literal, string2Literal, int2Literal}


object SqlBatchDemo {

  def main(args: Array[String]): Unit = {

    val settings = EnvironmentSettings.newInstance()
      .inBatchMode().build()
    val tEnv = TableEnvironment.create(settings)

    val MyOrder = ROW(FIELD("id", BIGINT()),
      FIELD("product", STRING()),
      FIELD("amount", INT())
    )
    val input = tEnv.fromValues(MyOrder, row(1L, "BMW", 1),
      row(2L, "Tesla", 8),
      row(2L, "Tesla", 8),
      row(3L, "BYD", 20))

    tEnv.createTemporaryView("myOrder",input)
    val table = tEnv.sqlQuery("select product, sum(amount) as amount from myOrder group by product")
    table.execute().print()

  }

}

结果如下:

+--------------------------------+-------------+
|                        product |      amount |
+--------------------------------+-------------+
|                            BMW |           1 |
|                          Tesla |          16 |
|                            BYD |          20 |
+--------------------------------+-------------+
3 rows in set
  1. 处理无界数据流


import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.table.api.{EnvironmentSettings}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment


object SqlStreamDemo {

  def main(args: Array[String]): Unit = {

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val bsSettings = EnvironmentSettings.newInstance()
      .inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(senv,bsSettings)

    val stream:DataStream[String] = senv.addSource(new WordSourceFunction())
    val table = tEnv.fromDataStream(stream).as("word")

    val result = tEnv.sqlQuery("select * from " + table + " where word like '%t%'")
    tEnv.toDataStream(result).print()

    println(senv.getExecutionPlan)
    senv.execute()


  }

}

结果如下:

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 3,
    "type" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
    "pact" : "Operator",
    "contents" : "DataSteamToTable(stream=default_catalog.default_database.Unregistered_DataStream_Source_1, type=STRING, rowtime=false, watermark=false)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
    "pact" : "Operator",
    "contents" : "Calc(select=[f0 AS word], where=[LIKE(f0, _UTF-16LE'%t%')])",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "TableToDataSteam(type=ROW NOT NULL, rowtime=false)",
    "pact" : "Operator",
    "contents" : "TableToDataSteam(type=ROW NOT NULL, rowtime=false)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 6,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 8,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "REBALANCE",
      "side" : "second"
    } ]
  } ]
}
1> +I[batch]
2> +I[stream]
3> +I[stream]
4> +I[table]
5> +I[batch]
......省略部分......
3. Table API和SQL的概念 3.1 TableEnvironment的创建
  • Table API和SQL都是基于Table接口,catalog相同
  • Table API和SQL都先用Apache Calcite来解析优化等,再用Blink planner进行解析优化等,Stream模式和Batch模式最后都转化成DataStream API的Transformation
  1. EnvironmentSettings方式
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

val settings = EnvironmentSettings
  .newInstance()
  .inStreamingMode()    // 用于stream模式
  //.inBatchMode()      // 用于batch模式
  .build()

val tEnv = TableEnvironment.create(settings)
  • 不能和DataStream进行交互
  • 不支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
  1. StreamTableEnvironment方式
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
  • 可以和DataStream进行交互
  • 支持用户自定义聚合函数(UDAF)、用户自定义表值函数(UDTF)
3.2 Table API和SQL的程序结构

mysql中表user1的结构和数据,和表user2的结构如下:

mysql> 
mysql> show create table flink_test.user1;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                         |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user1 | CREATE TABLE `user1` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)

mysql> 
mysql> select * from flink_test.user1;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | yi   |    1 |
|  2 | er   |    2 |
|  1 | san  |    3 |
+----+------+------+
3 rows in set (0.00 sec)

mysql> 
mysql> show create table flink_test.user2;
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table                                                                                                                                                                                         |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| user2 | CREATE TABLE `user2` (
  `id` bigint NOT NULL,
  `name` varchar(128) DEFAULT NULL,
  `age` int DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.01 sec)

mysql>

添加pom.xml依赖

        
            org.apache.flink
            flink-connector-jdbc_2.11
            1.14.3
            provided
        

示例代码如下:



import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TableSqlTest {

  def main(args: Array[String]): Unit = {


    // 定义Table环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    // 表定义了primary key,则以upsert(更新插入)方式插入数据
    val table_str =
      """
        |create temporary table %s(
        |  id bigint,
        |  name string,
        |  age int,
        |  primary key (id) not enforced
        |) with (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://192.168.23.33:3306/flink_test',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = '%s',
        |   'username' = 'root',
        |   'password' = 'Root_123'
        |)
        |""".stripMargin
    // 在catalog注册表
    tEnv.executeSql(table_str.format("user1", "user1"))
    tEnv.executeSql(table_str.format("user2", "user2"))

    // =====================读取源表数据=====================
    val user1 = tEnv.from("user1")   // 方式一
    // val user1 = tEnv.sqlQuery("select * from user1 limit 2")   // 方式二


    // =====================向目标表插入数据=====================
    // user1.executeInsert("user2")     // 方式一

    val stmtSet = tEnv.createStatementSet()
    stmtSet.addInsert("user2", user1)    // 方式二
    // stmtSet.addInsertSql("insert into user2 select * from user1 limit 2")   // 方式三
    stmtSet.execute()

    
    
  }

}
  • Batch模式只能插入BatchTableSink
  • Streaming模式可以插入AppendStreamTableSink、RetractStreamTableSink、UpsertStreamTableSink

执行程序两次,查询mysql中的表user2数据

mysql> 
mysql> select * from user2;
+----+------+------+
| id | name | age  |
+----+------+------+
|  1 | yi   |    1 |
|  2 | er   |    2 |
|  3 | san  |    3 |
+----+------+------+
3 rows in set (0.00 sec)

mysql> 
3.3 Table API和SQL的简单使用 3.3.1 聚合查询

代码示例:


import org.apache.flink.table.api.Expressions.{$, lit, row}
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object TableSqlTest {

  def main(args: Array[String]): Unit = {
    
    // 定义Table环境
    val settings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()
    val tEnv = TableEnvironment.create(settings)

    val table = tEnv.fromValues(
      row(10, "A"),
      row(20, "A"),
      row(100, "B"),
      row(200, "B")
    ).as("amount", "name")
    tEnv.createTemporaryView("tmp_table", table)

    val table_result = table
      .filter($("amount").isGreater(lit(0)))
      .groupBy($("name"))
      .select($("name"), $("amount").sum().as("amount"))

    table_result.execute().print()

    val sql_result = tEnv.sqlQuery("select name, sum(amount) as amount from tmp_table where amount > 0 group by name")
    sql_result.execute().print()
    
  }

}

执行结果:

+----+--------------------------------+-------------+
| op |                           name |      amount |
+----+--------------------------------+-------------+
| +I |                              A |          10 |
| -U |                              A |          10 |
| +U |                              A |          30 |
| +I |                              B |         100 |
| -U |                              B |         100 |
| +U |                              B |         300 |
+----+--------------------------------+-------------+
6 rows in set
+----+--------------------------------+-------------+
| op |                           name |      amount |
+----+--------------------------------+-------------+
| +I |                              A |          10 |
| -U |                              A |          10 |
| +U |                              A |          30 |
| +I |                              B |         100 |
| -U |                              B |         100 |
| +U |                              B |         300 |
+----+--------------------------------+-------------+
6 rows in set

  • 聚合方式为迭代聚合,其中-U为更新前的值,+U为更新后的值
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0389s