目录
1. 表和视图
- 1. 表和视图
- 2. Table API Connectors
- 2.1 filesystem、print、blackhole
- 3. timestamp和timestamp_ltz
表分为临时表和永久表,相同名称下,临时表的优先级比永久表高 永久表需要数据库保存元数据,例如Hive数据库
连接外部数据系统通常用createTemporaryTable,中间结果表通常用createTemporatyView,如下所示:
tEnv.createTemporaryTable("table_name", tableDescriptor)
tEnv.createTemporaryView("table_name", table)
2. Table API Connectors
2.1 filesystem、print、blackhole
添加pom.xml依赖
org.apache.flink
flink-csv
1.14.3
provided
org.apache.hadoop
hadoop-client
3.3.1
provided
程序如下:
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.functions.sink.DiscardingSink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, FormatDescriptor, Schema, TableDescriptor, long2Literal, row, string2Literal}
import org.apache.flink.types.Row
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.ipc.StandbyException
import scala.util.control.Breaks.{break, breakable}
object flink_test {
// 获取Active HDFS Uri
def getActiveHdfsUri() = {
val hadoopConf = new Configuration()
val hdfsUris = Array(
"hdfs://192.168.23.101:8020",
"hdfs://192.168.23.102:8020",
"hdfs://192.168.23.103:8020"
)
var hdfsCli: FileSystem = null
var hdfsCapacity: Long = -1L
var activeHdfsUri: String = null
breakable {
for (hdfsUri {}
}
}
}
activeHdfsUri
}
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
val hdfsFilePath = s"${getActiveHdfsUri()}/test/test.txt"
// HDFS表
val fileSystemTable = tEnv.from(
TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("amount", DataTypes.BIGINT())
.build()
)
.option("path", hdfsFilePath)
.format(FormatDescriptor
.forFormat("csv")
.option("field-delimiter", ",")
.build()
).build()
)
tEnv.createTemporaryView("fileSystemTable", fileSystemTable)
// print表
tEnv.createTemporaryTable("printSink",
TableDescriptor.forConnector("print")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("amount", DataTypes.BIGINT())
.build()
).build()
)
// 读取HDFS表数据用print输出, 输出结果和转换成DataStream进行print一样
fileSystemTable.executeInsert("printSink")
// blackhole表
tEnv.executeSql("create temporary table blackholeSink with ('connector' = 'blackhole') like printSink")
// 读取HDFS表数据到blackhole
tEnv.executeSql("insert into blackholeSink select * from fileSystemTable")
// 转换为DataStream, 输出到blackhole
val fileSystemDatastream = tEnv.toDataStream(fileSystemTable)
fileSystemDatastream.addSink(new DiscardingSink[Row]())
senv.execute()
}
}
执行结果如下:
6> +I[zhang_san, 30]
4> +I[li_si, 40]
3. timestamp和timestamp_ltz
- timestamp(p) p指小数秒的精度,范围为0-9,默认是6
val table = tEnv.sqlQuery("select timestamp '1970-01-01 00:00:04.001'")
table.execute().print()
输出如下:
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 00:00:04.001 |
+----+-------------------------+
- timestamp_ltz(p) 用于描述时间线上的绝对时间点, 使用long保存从epoch至今的毫秒数,使用int保存毫秒中的纳秒数 无法通过字符串来指定, 可以通过一个long类型的epoch时间来转化。在同一个时间点, 全世界所有的机器上执行System.currentTimeMillis()都会返回同样的值
tEnv.executeSql("create view t1 as select to_timestamp_ltz(4001, 3)")
val table = tEnv.sqlQuery("select * from t1")
table.execute().print()
输出如下:
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 1970-01-01 08:00:04.001 |
+----+-------------------------+
- 各种当前时间函数
tEnv.executeSql("create view myView1 as select localtime, localtimestamp, current_date, current_time, current_timestamp, current_row_timestamp(), now(), proctime()")
val table = tEnv.sqlQuery("select * from myView1")
table.printSchema()
table.execute().print()
输出如下:
(
`localtime` TIME(0) NOT NULL,
`localtimestamp` TIMESTAMP(3) NOT NULL,
`current_date` DATE NOT NULL,
`current_time` TIME(0) NOT NULL,
`current_timestamp` TIMESTAMP_LTZ(3) NOT NULL,
`EXPR$5` TIMESTAMP_LTZ(3) NOT NULL,
`EXPR$6` TIMESTAMP_LTZ(3) NOT NULL,
`EXPR$7` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*
)
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| op | localtime | localtimestamp | current_date | current_time | current_timestamp | EXPR$5 | EXPR$6 | EXPR$7 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+
| +I | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 | 12:59:06 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.859 | 2022-02-07 12:59:06.862 |
+----+-----------+-------------------------+--------------+--------------+-------------------------+-------------------------+-------------------------+-------------------------+