您当前的位置: 首页 >  ar

Bulut0907

暂无认证

  • 4浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink DataStream的timestamp和watermark

Bulut0907 发布时间:2021-12-04 22:45:33 ,浏览量:4

目录
  • 1. 认识timestamp
  • 2. 认识watermark
    • 2.1 watermark的生成
    • 2.2 Idle Sources的处理
    • 2.3 watermark策略和kafka connector

1. 认识timestamp

元素的timestamp为EventTime或ProcessingTime

  1. EventTime 默认TimeCharacteristic,processing time也可以在该时间语义下工作,所有不需要执行senv.setStreamTimeCharacteristic ;可以处理乱序数据

  2. ProcessingTime Flink分布式部署时,注意每台服务器间的时间同步

2. 认识watermark
  • 为什么要有watermak: 因为乱序数据的存在,基于时间的算子(如window中元素的计算),需要基于watermark来触发计算,然后再用watermark来标识该算子不再处理timestamp小于watermark的元素
  • watermark的传递:operator将接收到的watermark,以广播的形式向下游operator传递
    1. 单并行度:如果operator新接收的watermark比原来的watermark大,则覆盖原来的watermark
    2. 多并行度:operator接收多个分区的watermark,取最小的watermark, 如果该watermark比原来的watermark大,则覆盖原来的watermark
  • 当close source时(如stop kafka):source operator会emit一个值为Long.MAX_VALUE的watermark,用来结束程序
  • 多次指定WatermarkStrategy:新的WatermarkStrategy覆盖旧的
2.1 watermark的生成
  1. 继承WatermarkStrategy
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._

class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] {
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = {

    fdf.parse(element._3).getTime

  }

}

class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] {
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  var maxTimestamp: Long = _
  val maxOutofOrderness = 0

  // 1. 根据某个特殊的event emit watermark
  override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = {

    maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp)

  }
  
  // 2. 定期emit watermark,通过senv.getConfig.setAutoWatermarkInterval(200L)设置emit间隔时间,默认200ms
  override def onPeriodicEmit(output: WatermarkOutput): Unit = {

    output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
  }
}


class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] {

  override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = {

    new RecordTimestampAssigner()
  }

  override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = {
    new PeriodWatermarkGenerator()

  }

}

  1. 使用Flink内置的watermark
  • 对于kafka数据源,可以不使用withTimestampAssigner指定timestamp生成器
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
import java.time.Duration

class RecordTimestampAssigner extends SerializableTimestampAssigner[(String, Int, String)] {
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = {

    fdf.parse(element._3).getTime

  }

}

object WatermarkTest {

  def main(args: Array[String]): Unit = {

    val myWatermarkStrategy:WatermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness(Duration.ofSeconds(10L))
      .withTimestampAssigner(new RecordTimestampAssigner())

  }
}
2.2 Idle Sources的处理
  1. 部分分区长时间没有数据 当数据源有的分区一直有数据,有的分区长时间没有数据,会造成合并的operator的watermark一直不更新,可以通过WatermarkStrategy.withIdleness(Duration.ofMinutes(1L))来解决,当一个分区超过规定时间没有数据,合并的operator的watermark更新将忽略该分区的watermark

  2. 数据源长时间没有数据 如果数据源长时间没有数据,会导致watermark一直不更新,window函数不会被触发执行,可以通过sourceContext.markAsTemporarilyIdle()将数据源暂时标记为idle, emit当前服务器的时间作为watermark, 参考示例如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random

class WordSourceFunction extends SourceFunction[String] {

  private var is_running = true
  private val words = Array("hello", "world", "flink", "stream", "batch", "table", "sql")


  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {

    var always_null_word = 0L
    while (is_running) {
      val index = Random.nextInt(words.size)
      val word = words(index)

      if (word != null) {
        always_null_word = 0L
        sourceContext.collect(words(index))
      } else {
        always_null_word += 1
        if (always_null_word > 1000) {
          sourceContext.markAsTemporarilyIdle()
        }
      }

      // 1秒
      Thread.sleep(1000)
    }

  }

  override def cancel(): Unit = {
    is_running = false
  }

}
2.3 watermark策略和kafka connector

当使用kafka作为数据源时,可以直接指定WatermarkStrategy,更精确的生成timestamp和watermark,示例代码和图如下:

  1. kafka发送数据
[root@bigdata001 ~]#
[root@bigdata001 ~]# /root/kafka_2.13-2.8.0/bin/kafka-console-producer.sh --bootstrap-server bigdata001:9092,bigdata002:9092,bigdata003:9092 --topic flink_test
>A,10,2021-09-08 01:00:05
>A,20,2021-09-08 01:00:06
>A,30,2021-09-08 01:00:12

  1. 程序代码
package datastreamApi

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import java.time.Duration
import java.util.Properties

class RecordTimestampAssigner extends SerializableTimestampAssigner[String] {
  val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  override def extractTimestamp(element: String, recordTimestamp: Long): Long = {

    fdf.parse(element.split(",").apply(2)).getTime

  }

}

object WatermakTest {


  def main(args: Array[String]): Unit = {

    val prop = new Properties()
    prop.put("bootstrap.servers", "192.168.xxx.xxx:9092")
    prop.put("group.id", "test")

    val kafkaSource = new FlinkKafkaConsumer[String]("flink_test", new SimpleStringSchema(), prop)
      .assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10L))
          .withTimestampAssigner(new RecordTimestampAssigner())
      )

    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val kafkaInput = senv.addSource(kafkaSource)
    kafkaInput.print("kafkaInput")

    senv.execute("WatermakTest")
  }

}
  1. 执行结果:
kafkaInput:2> A,10,2021-09-08 01:00:05
kafkaInput:1> A,20,2021-09-08 01:00:06
kafkaInput:2> A,30,2021-09-08 01:00:12

kafka source指定watermark

关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0406s