您当前的位置: 首页 >  sql

Bulut0907

暂无认证

  • 4浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink Table和SQL的表和视图、Connectors和timestamp数据类型

Bulut0907 发布时间:2022-02-14 12:54:13 ,浏览量:4

目录
  • 1. 表和视图
  • 2. Table API Connectors
    • 2.1 filesystem、print、blackhole
  • 3. timestamp和timestamp_ltz

1. 表和视图

表分为临时表和永久表,相同名称下,临时表的优先级比永久表高 永久表需要数据库保存元数据,例如Hive数据库

连接外部数据系统通常用createTemporaryTable,中间结果表通常用createTemporatyView,如下所示:

tEnv.createTemporaryTable("table_name", tableDescriptor)
tEnv.createTemporaryView("table_name", table)
2. Table API Connectors 2.1 filesystem、print、blackhole

添加pom.xml依赖

        
            org.apache.flink
            flink-csv
            1.14.3
            provided
        


        
            org.apache.hadoop
            hadoop-client
            3.3.1
            provided
        

程序如下:

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, FormatDescriptor, Schema, TableDescriptor, long2Literal, row, string2Literal}
import org.apache.flink.types.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.ipc.StandbyException

import scala.util.control.Breaks.{break, breakable}




object flink_test {

  // 获取Active HDFS Uri
  def getActiveHdfsUri() = {
    val hadoopConf = new Configuration()
    val hdfsUris = Array(
      "hdfs://192.168.23.101:8020",
      "hdfs://192.168.23.102:8020",
      "hdfs://192.168.23.103:8020"
    )
    var hdfsCli: FileSystem = null
    var hdfsCapacity: Long = -1L
    var activeHdfsUri: String = null

    breakable {
      for (hdfsUri  {}
        }

      }
    }

    activeHdfsUri

  }

  def main(args: Array[String]): Unit = {


    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    val hdfsFilePath = s"${getActiveHdfsUri()}/test/test.txt"

    // HDFS表
    val fileSystemTable = tEnv.from(
      TableDescriptor.forConnector("filesystem")
        .schema(Schema.newBuilder()
          .column("name", DataTypes.STRING())
          .column("amount", DataTypes.BIGINT())
          .build()
        )
        .option("path", hdfsFilePath)
        .format(FormatDescriptor
          .forFormat("csv")
          .option("field-delimiter", ",")
          .build()
        ).build()
    )
    tEnv.createTemporaryView("fileSystemTable", fileSystemTable)

    // print表
    tEnv.createTemporaryTable("printSink",
      TableDescriptor.forConnector("print")
        .schema(Schema.newBuilder()
          .column("name", DataTypes.STRING())
          .column("amount", DataTypes.BIGINT())
          .build()
        ).build()
    )

    // 读取HDFS表数据用print输出, 输出结果和转换成DataStream进行print一样
    fileSystemTable.executeInsert("printSink")

    // blackhole表
    tEnv.executeSql("create temporary table blackholeSink with ('connector' = 'blackhole') like printSink")

    // 读取HDFS表数据到blackhole
    tEnv.executeSql("insert into blackholeSink select * from fileSystemTable")

    // 转换为DataStream, 输出到blackhole
    val fileSystemDatastream = tEnv.toDataStream(fileSystemTable)
    fileSystemDatastream.addSink(new DiscardingSink[Row]())

    senv.execute()

  }
}

执行结果如下:

6> +I[zhang_san, 30]
4> +I[li_si, 40]
3. timestamp和timestamp_ltz
  1. timestamp(p) p指小数秒的精度,范围为0-9,默认是6
    val table = tEnv.sqlQuery("select timestamp '1970-01-01 00:00:04.001'")

    table.execute().print()

输出如下:

+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 00:00:04.001 |
+----+-------------------------+
  1. timestamp_ltz(p) 用于描述时间线上的绝对时间点, 使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数 无法通过字符串来指定, 可以通过一个long类型的epoch时间来转化。在同一个时间点, 全世界所有的机器上执行System.currentTimeMillis()都会返回同样的值
    tEnv.executeSql("create view t1 as select to_timestamp_ltz(4001, 3)")
    val table = tEnv.sqlQuery("select * from t1")

    table.execute().print()

输出如下:

+----+-------------------------+
| op |                  EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 08:00:04.001 |
+----+-------------------------+
  1. 各种当前时间函数
    tEnv.executeSql("create view myView1 as select localtime, localtimestamp, current_date, current_time, current_timestamp, current_row_timestamp(), now(), proctime()")
    val table = tEnv.sqlQuery("select * from myView1")
    table.printSchema()

    table.execute().print()

输出如下:

(
  `localtime` TIME(0) NOT NULL,
  `localtimestamp` TIMESTAMP(3) NOT NULL,
  `current_date` DATE NOT NULL,
  `current_time` TIME(0) NOT NULL,
  `current_timestamp` TIMESTAMP_LTZ(3) NOT NULL,
  `EXPR$5` TIMESTAMP_LTZ(3) NOT NULL,
  `EXPR$6` TIMESTAMP_LTZ(3) NOT NULL,
  `EXPR$7` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*
)
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | localtime |          localtimestamp | current_date | current_time |       current_timestamp |                  EXPR$5 |                  EXPR$6 |                  EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I |  12:59:06 | 2022-02-07 12:59:06.859 |   2022-02-07 |     12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.862 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0397s