- 一、Window分类
- 1.1、Global Window 和 Keyed Window
- 1.2、Time Window 和 Count Window
- 1.3、Time Window(时间窗口)
- 1.3.1、滚动窗口(Tumbling Window)
- 1.3.2、滑动窗口(Sliding Window)
- 1.3.3、会话窗口(Session Window)
- 1.4、Count Window(数量窗口)
- 二、Window的API
- 三、窗口聚合函数
- 3.1、ReduceFunction
- 3.2、AggregateFunction
- 3.3、ProcessFunction
- 返回总目录
Windows 计算是流式计算中非常常用的数据计算方式之一,通过按照固定时间或长度将数据流切分成不同的窗口,然后对数据进行相应的聚合运算,从而得到一定时间范围内的统计结果。 例如统计最近 5 分钟内某基站的呼叫数,此时基站的数据在不断地产生,但是通过5 分钟的窗口将数据限定在固定时间范围内,就可以对该范围内的有界数据执行聚合处理,得出最近 5 分钟的基站的呼叫数量。
还有视频讲解在我的B站-宝哥chbxw, 希望大家可以支持一下,谢谢。
一、Window分类 1.1、Global Window 和 Keyed Window在运用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Windows 也会有所不同。
- Keyed Window:上游数据集如果是 KeyedStream 类型,则调用 DataStream API 的 window()方法,数据会根据 Key 在不同的 Task 实例中并行分别计算,最后得出针对每个 Key 统计的结果。
- Global Window:如果是 Non-Keyed 类型,则调用 WindowsAll()方法,所有的数据都会在窗口算子中由到一个 Task 中计算,并得到全局统计结果。
基于业务数据的方面考虑,Flink 又支持两种类型的窗口,一种是基于时间的窗口叫Time Window。还有一种基于输入数据数量的窗口叫 Count Window
1.3、Time Window(时间窗口)根据不同的业务场景,Time Window 也可以分为三种类型,分别是滚动窗口(TumblingWindow)、滑动窗口(Sliding Window)和会话窗口(Session Window)
1.3.1、滚动窗口(Tumbling Window)滚动窗口是根据固定时间进行切分,且窗口和窗口之间的元素互不重叠。这种类型的窗口的最大特点是比较简单。只需要指定一个窗口长度(window size)。
滑动窗口也是一种比较常见的窗口类型,其特点是在滚动窗口基础之上增加了窗口滑动时间(Slide Time),且允许窗口数据发生重叠。当 Windows size 固定之后,窗口并不像滚动窗口按照 Windows Size 向前移动,而是根据设定的 Slide Time 向前滑动。窗口之间的数据重叠大小根据 Windows size 和 Slide time决定,当 Slide time 小于 Windows size便会发生窗口重叠,Slide size 大于 Windows size 就会出现窗口不连续,数据可能不能在任何一个窗口内计算,Slide size 和 Windows size 相等时,Sliding Windows 其实就是Tumbling Windows。
会话窗口(Session Windows)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是 Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows 不需要有固定 windows size 和 slide time,只需要定义 session gap,来规定不活跃数据的时间上限即可。
Count Window 也有滚动窗口、滑动窗口等。使用比较少
二、Window的API在以后的实际案例中 Keyed Window 使用最多,所以我们需要掌握 Keyed Window 的算子,在每个窗口算子中包含了 Windows Assigner、Windows Trigger(窗口触发器)、Evictor(数据剔除器)、Lateness(时延设定)、Output Tag(输出标签)以及 Windows Funciton等组成部分,其中 Windows Assigner 和 Windows Funciton 是所有窗口算子必须指定的属性,其余的属性都是根据实际情况选择指定。
stream.keyBy(...) // 是Keyed类型数据集
.window(...) //指定窗口分配器类型
[.trigger(...)] //指定触发器类型(可选)
[.evictor(...)] //指定evictor或者不指定(可选)
[.allowedLateness(...)] //指定是否延迟处理数据(可选)
[.sideOutputLateData(...)] //指定Output Lag(可选)
.reduce/aggregate/fold/apply() //指定窗口计算函数
[.getSideOutput(...)] //根据Tag输出数据(可选)
- Windows Assigner:指定窗口的类型,定义如何将数据流分配到一个或多个窗口;
- Windows Trigger:指定窗口触发的时机,定义窗口满足什么样的条件触发计算;
- Evictor:用于数据剔除;
- allowedLateness:标记是否处理迟到数据,当迟到数据到达窗口中是否触发计算;
- Output Tag:标记输出标签,然后在通过 getSideOutput 将窗口中的数据根据标签输出;
- Windows Funciton:定义窗口上数据处理的逻辑,例如对数据进行 sum 操作
如果定义了 Window Assigner 之后,下一步就可以定义窗口内数据的计算逻辑,这也就是 Window Function 的定义。Flink 中提供了四种类型的 Window Function,分别为ReduceFunction、AggregateFunction 以及 ProcessWindowFunction,(sum 和 max)等。 前三种类型的 Window Fucntion 按照计算原理的不同可以分为两大类:
- 增量聚合函数:对应有 ReduceFunction、AggregateFunction;
- 全量窗口函数,对应有 ProcessWindowFunction(还有 WindowFunction)。
增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高,性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口触发的时候,对所有的原始数据进行汇总计算。
3.1、ReduceFunctionReduceFunction 定义了对输入的两个相同类型的数据元素按照指定的计算方法进行聚合的逻辑,然后输出类型相同的一个结果元素。
package com.chb.flink.window
import com.chb.flink.source.{MyCustomerSource}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object TestReduceFunctionByWindow {
//每隔5秒统计每个基站的日志数量
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val data = streamEnv.addSource(new MyCustomerSource)
// 10:05~10:10 10:10~10:15 10.08
//开窗
data.map(log => ((log.sid, 1)))
.keyBy(_._1)
.timeWindow(Time.seconds(5)) //开窗
.reduce((t1, t2) => (t1._1, t1._2 + t2._2))
.print()
streamEnv.execute()
}
}
3.2、AggregateFunction
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对ReduceFunction 更加灵活,实现复杂度也相对较高。 AggregateFunction 接口中定义了三个需要复写的方法,
- add()定义数据的添加逻辑
- getResult 定义了根据 accumulator 计算结果的逻辑
- merge 方法定义合并 accumulator 的逻辑。
package com.chb.flink.window
import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestAggregatFunctionByWindow {
//每隔3秒计算最近5秒内,每个基站的日志数量
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
val stream = streamEnv.addSource(new MyCustomerSource)
//开窗
stream.map(log => ((log.sid, 1)))
.keyBy(_._1)
//timeWindow
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) //开窗,滑动窗口
.aggregate(new MyAggregateFunction, new MyWindowFunction)
.print()
streamEnv.execute()
}
/**
* 里面的add方法,是来一条数据执行一次,getResult 在窗口结束的时候执行一次
*/
class MyAggregateFunction extends AggregateFunction[(String, Int), Long, Long] {
override def createAccumulator(): Long = 0 //初始化一个累加器,开始的时候为0
override def add(value: (String, Int), accumulator: Long): Long = accumulator + value._2
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
//WindowFunction 输入数据来自于AggregateFunction ,在窗口结束的时候先执行AggregateFunction对象的getResult,然后在执行apply
class MyWindowFunction extends WindowFunction[Long, (String, Long), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {
out.collect((key, input.iterator.next())) //next得到第一个值,迭代器中只有一个值
}
}
}
3.3、ProcessFunction
前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数,虽然已经满足绝大多数场景,但在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数据 元 素 的 结 果 计 算 , 例 如 对 整 个 窗 口 数 据 排 序 取 TopN, 这 样 的 需 要 就 必 须 使 用ProcessWindowFunction。
package com.chb.flink.window
import com.chb.flink.source.{MyCustomerSource, StationLog}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TestProcessWindowFunctionByWindow {
//每隔5秒统计每个基站的日志数量
def main(args: Array[String]): Unit = {
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
// streamEnv.setParallelism(1)
//读取数据源
// val stream = streamEnv.addSource(new MyCustomerSource)
//读取数据源
val stream: DataStream[StationLog] = streamEnv.socketTextStream("10.0.0.201", 8888)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
//开窗
stream.map(log => ((log.sid, 1)))
.keyBy(_._1)
// .timeWindow(Time.seconds(5))//开窗
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.process(new ProcessWindowFunction[(String, Int), (String, Long), String, TimeWindow] { //一个窗口结束的时候调用一次(一个分组执行一次)
override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Long)]): Unit = {
println("start------------")
//注意:整个窗口的数据保存到Iterable,里面有很多行数据。Iterable的size就是日志的总条数
out.collect((key, elements.size))
println("--------------end")
}
})
.print()
streamEnv.execute()
}
}
返回总目录