目录
1. 认识timestamp
- 1. 认识timestamp
- 2. 认识watermark
- 2.1 watermark的生成
- 2.2 Idle Sources的处理
- 2.3 watermark策略和kafka connector
元素的timestamp为EventTime或ProcessingTime
-
EventTime 默认TimeCharacteristic,processing time也可以在该时间语义下工作,所有不需要执行
senv.setStreamTimeCharacteristic
;可以处理乱序数据 -
ProcessingTime Flink分布式部署时,注意每台服务器间的时间同步
- 为什么要有watermak: 因为乱序数据的存在,基于时间的算子(如window中元素的计算),需要基于watermark来触发计算,然后再用watermark来标识该算子不再处理timestamp小于watermark的元素
- watermark的传递:operator将接收到的watermark,以广播的形式向下游operator传递
- 单并行度:如果operator新接收的watermark比原来的watermark大,则覆盖原来的watermark
- 多并行度:operator接收多个分区的watermark,取最小的watermark, 如果该watermark比原来的watermark大,则覆盖原来的watermark
- 当close source时(如stop kafka):source operator会emit一个值为Long.MAX_VALUE的watermark,用来结束程序
- 多次指定WatermarkStrategy:新的WatermarkStrategy覆盖旧的
- 继承WatermarkStrategy
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = {
fdf.parse(element._3).getTime
}
}
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
var maxTimestamp: Long = _
val maxOutofOrderness = 0
// 1. 根据某个特殊的event emit watermark
override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = {
maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp)
}
// 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200ms
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
}
}
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = {
new RecordTimestampAssigner()
}
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = {
new PeriodWatermarkGenerator()
}
}
- 使用Flink内置的watermark
- 对于kafka数据源,可以不使用withTimestampAssigner指定timestamp生成器
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
import java.time.Duration
class RecordTimestampAssigner extends SerializableTimestampAssigner[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = {
fdf.parse(element._3).getTime
}
}
object WatermarkTest {
def main(args: Array[String]): Unit = {
val myWatermarkStrategy:WatermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(10L))
.withTimestampAssigner(new RecordTimestampAssigner())
}
}
2.2 Idle Sources的处理
-
部分分区长时间没有数据 当数据源有的分区一直有数据,有的分区长时间没有数据,会造成合并的operator的watermark一直不更新,可以通过
WatermarkStrategy.withIdleness(Duration.ofMinutes(1L))
来解决,当一个分区超过规定时间没有数据,合并的operator的watermark更新将忽略该分区的watermark -
数据源长时间没有数据 如果数据源长时间没有数据,会导致watermark一直不更新,window函数不会被触发执行,可以通过
sourceContext.markAsTemporarilyIdle()
将数据源暂时标记为idle, emit当前服务器的时间作为watermark, 参考示例如下:
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
class WordSourceFunction extends SourceFunction[String] {
private var is_running = true
private val words = Array("hello", "world", "flink", "stream", "batch", "table", "sql")
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
var always_null_word = 0L
while (is_running) {
val index = Random.nextInt(words.size)
val word = words(index)
if (word != null) {
always_null_word = 0L
sourceContext.collect(words(index))
} else {
always_null_word += 1
if (always_null_word > 1000) {
sourceContext.markAsTemporarilyIdle()
}
}
// 1秒
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
is_running = false
}
}
2.3 watermark策略和kafka connector
当使用kafka作为数据源时,可以直接指定WatermarkStrategy,更精确的生成timestamp和watermark,示例代码和图如下:
- kafka发送数据
[root@bigdata001 ~]#
[root@bigdata001 ~]# /root/kafka_2.13-2.8.0/bin/kafka-console-producer.sh --bootstrap-server bigdata001:9092,bigdata002:9092,bigdata003:9092 --topic flink_test
>A,10,2021-09-08 01:00:05
>A,20,2021-09-08 01:00:06
>A,30,2021-09-08 01:00:12
- 程序代码
package datastreamApi
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import java.time.Duration
import java.util.Properties
class RecordTimestampAssigner extends SerializableTimestampAssigner[String] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(element: String, recordTimestamp: Long): Long = {
fdf.parse(element.split(",").apply(2)).getTime
}
}
object WatermakTest {
def main(args: Array[String]): Unit = {
val prop = new Properties()
prop.put("bootstrap.servers", "192.168.xxx.xxx:9092")
prop.put("group.id", "test")
val kafkaSource = new FlinkKafkaConsumer[String]("flink_test", new SimpleStringSchema(), prop)
.assignTimestampsAndWatermarks(
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L))
.withTimestampAssigner(new RecordTimestampAssigner())
)
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaInput = senv.addSource(kafkaSource)
kafkaInput.print("kafkaInput")
senv.execute("WatermakTest")
}
}
- 执行结果:
kafkaInput:2> A,10,2021-09-08 01:00:05
kafkaInput:1> A,20,2021-09-08 01:00:06
kafkaInput:2> A,30,2021-09-08 01:00:12