您当前的位置: 首页 >  flink

蔚1

暂无认证

  • 5浏览

    0关注

    4728博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

透过窗口看无限数据流——Flink的Window全面解析

蔚1 发布时间:2020-05-08 23:31:17 ,浏览量:5

窗口是流式计算中非常常用的算子之一,通过窗口可以将无限流切分成有限流,然后在每个窗口之上使用计算函数,可以实现非常灵活的操作。Flink提供了丰富的窗口操作,除此之外,用户还可以根据自己的处理场景自定义窗口。

通过本文,你可以了解到:

  • 窗口的基本概念和简单使用
  • 内置 Window Assigners 的分类、源码及使用
  • Window Function 的分类及使用
  • 窗口的组成部分及生命周期源码解读
  • 完整的窗口使用 Demo 案例

窗口是流式计算中非常常用的算子之一,通过窗口可以将无限流切分成有限流,然后在每个窗口之上使用计算函数,可以实现非常灵活的操作。Flink 提供了丰富的窗口操作,除此之外,用户还可以根据自己的处理场景自定义窗口。通过本文,你可以了解到:

  • 窗口的基本概念和简单使用
  • 内置 Window Assigners 的分类、源码及使用
  • Window Function 的分类及使用
  • 窗口的组成部分及生命周期源码解读
  • 完整的窗口使用 Demo 案例
Quick Start 是什么

Window(窗口)是处理无界流的核心算子,Window 可以将数据流分为固定大小的"桶(buckets)"(即通过按照固定时间或长度将数据流切分成不同的窗口),在每一个窗口上,用户可以使用一些计算函数对窗口内的数据进行处理,从而得到一定时间范围内的统计结果。比如统计每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品,这样就可以使用一个小时的时间窗口将数据限定在固定时间范围内,然后可以对该范围内的有界数据执行聚合处理。

根据作用的数据流(DataStream、KeyedStream),Window 可以分为两种:Keyed Windows与Non-Keyed Windows。其中 Keyed Windows 是在 KeyedStream 上使用 window(…)操作,产生一个 WindowedStream。Non-Keyed Windows 是在 DataStream 上使用 windowAll(…)操作,产生一个 AllWindowedStream。具体的转换关系如下图所示。注意:一般不推荐使用AllWindowedStream,因为在普通流上进行窗口操作,会将所有分区的流都汇集到单个的 Task 中,即并行度为 1,从而会影响性能。

在这里插入图片描述

如何用

上面我们介绍了什么是窗口,那么该如何使用窗口呢?具体如下面的代码片段:

Keyed Windows
stream       .keyBy(...)               // keyedStream 上使用 window       .window(...)              // 必选: 指定窗口分配器( window assigner)      [.trigger(...)]            // 可选: 指定触发器(trigger),如果不指定,则使用默认值      [.evictor(...)]            // 可选: 指定清除器(evictor),如果不指定,则没有      [.allowedLateness(...)]    // 可选: 指定是否延迟处理数据,如果不指定,默认使用 0       [.sideOutputLateData(...)] // 可选: 配置 side output,如果不指定,则没有       .reduce/aggregate/fold/apply() // 必选: 指定窗口计算函数      [.getSideOutput(...)]      // 可选: 从 side output 中获取数据
Non-Keyed Windows
stream       .windowAll(...)           // 必选: 指定窗口分配器( window assigner)      [.trigger(...)]            // 可选: 指定触发器(trigger),如果不指定,则使用默认值      [.evictor(...)]            // 可选: 指定清除器(evictor),如果不指定,则没有      [.allowedLateness(...)]    // 可选: 指定是否延迟处理数据,如果不指定,默认使用 0      [.sideOutputLateData(...)] // 可选: 配置 side output,如果不指定,则没有       .reduce/aggregate/fold/apply() // 必选: 指定窗口计算函数      [.getSideOutput(...)]      // 可选: 从 side output 中获取数据
简写 window 操作

上面的代码片段中,要在 keyedStream 上使用 window(…)或者在 DataStream 上使用 windowAll(…),需要传入一个 window assigner 的参数,关于 window assigner 下文会进行详细解释。如下面代码片段:

// -------------------------------------------//  Keyed Windows// -------------------------------------------stream       .keyBy(id)                      .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S 的滚动窗口       .reduce(MyReduceFunction)// -------------------------------------------//  Non-Keyed Windows// -------------------------------------------stream                      .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S 的滚动窗口       .reduce(MyReduceFunction)

上面的代码可以简写为:

// -------------------------------------------//  Keyed Windows// -------------------------------------------stream       .keyBy(id)                      .timeWindow(Time.seconds(5)) // 5S 的滚动窗口       .reduce(MyReduceFunction)// -------------------------------------------//  Non-Keyed Windows// -------------------------------------------stream                      .timeWindowAll(Time.seconds(5)) // 5S 的滚动窗口       .reduce(MyReduceFunction)

关于上面的简写,以 KeyedStream 为例,对于看一下具体的 KeyedStream 源码片段,可以看出底层调用的还是非简写时的代码。关于 timeWindowAll()的代码也是一样的,可以参考 DataStream 源码,这里不再赘述。

// 会根据用户的使用的时间类型,调用不同的内置 window Assignerpublic WindowedStream timeWindow(Time size) {        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {            return window(TumblingProcessingTimeWindows.of(size));        } else {            return window(TumblingEventTimeWindows.of(size));        }    }
Window Assigners 分类

WindowAssigner 负责将输入的数据分配到一个或多个窗口,Flink 内置了许多 WindowAssigner,这些 WindowAssigner 可以满足大部分的使用场景。比如tumbling windows, sliding windows, session windows , global windows。如果这些内置的 WindowAssigner 不能满足你的需求,可以通过继承 WindowAssigner 类实现自定义的 WindowAssigner。

上面的 WindowAssigner 是基于时间的(time-based windows),除此之外,Flink 还提供了基于数量的窗口(count-based windows),即根据窗口的元素数量定义窗口大小,这种情况下,如果数据存在乱序,将导致窗口计算结果不确定。本文重点介绍基于时间的窗口使用,由于篇幅有限,关于基于数量的窗口将不做讨论。

在这里插入图片描述

使用介绍

下面将会对 Flink 内置的四种基于时间的 windowassigner,进行一一分析。

Tumbling Windows
  • 图解

Tumbling Windows(滚动窗口)是将数据分配到确定的窗口中,根据固定时间或大小进行切分,每个窗口有固定的大小且窗口之间不存在重叠(如下图所示)。这种比较简单,适用于按照周期统计某一指标的场景。

关于时间的选择,可以使用 Event Time 或者 Processing Time,分别对应的 window assigner 为:TumblingEventTimeWindows、TumblingProcessingTimeWindows。用户可以使用 window assigner 的 of(size)方法指定时间间隔,其中时间单位可以是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)等。

在这里插入图片描述

  • 使用
// 使用 EventTimedatastream           .keyBy(id)           .window(TumblingEventTimeWindows.of(Time.seconds(10)))           .process(new MyProcessFunction())// 使用 processing-timedatastream           .keyBy(id)           .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))           .process(new MyProcessFunction())
Sliding Windows
  • 图解

Sliding Windows(滑动窗口)在滚动窗口之上加了一个滑动窗口的时间,这种类型的窗口是会存在窗口重叠的(如下图所示)。滚动窗口是按照窗口固定的时间大小向前滚动,而滑动窗口是根据设定的滑动时间向前滑动。窗口之间的重叠部分的大小取决于窗口大小与滑动的时间大小,当滑动时间小于窗口时间大小时便会出现重叠。当滑动时间大于窗口时间大小时,会出现窗口不连续的情况,导致数据可能不属于任何一个窗口。当两者相等时,其功能就和滚动窗口相同了。滑动窗口的使用场景是:用户根据设定的统计周期来计算指定窗口时间大小的指标,比如每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品。

关于时间的选择,可以使用 Event Time 或者 Processing Time,分别对应的 window assigner 为:SlidingEventTimeWindows、SlidingProcessingTimeWindows。用户可以使用 window assigner 的 of(size)方法指定时间间隔,其中时间单位可以是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)等。

在这里插入图片描述

  • 使用
// 使用 EventTimedatastream           .keyBy(id)           .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))           .process(new MyProcessFunction())// 使用 processing-timedatastream           .keyBy(id)           .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))           .process(new MyProcessFunction())
Session Windows
  • 图解

Session Windows(会话窗口)主要是将某段时间内活跃度较高的数据聚合成一个窗口进行计算,窗口的触发的条件是 Session Gap,是指在规定的时间内如果没有数据活跃接入,则认为窗口结束,然后触发窗口计算结果。需要注意的是如果数据一直不间断地进入窗口,也会导致窗口始终不触发的情况。与滑动窗口、滚动窗口不同的是,Session Windows 不需要有固定窗口大小(window size)和滑动时间(slide time),只需要定义 session gap,来规定不活跃数据的时间上限即可。如下图所示。Session Windows 窗口类型比较适合非连续型数据处理或周期性产生数据的场景,根据用户在线上某段时间内的活跃度对用户行为数据进行统计。

关于时间的选择,可以使用 Event Time 或者 Processing Time,分别对应的 window assigner 为:EventTimeSessionWindows 和 ProcessTimeSessionWindows。用户可以使用 window assigner 的 withGap()方法指定时间间隔,其中时间单位可以是 Time.milliseconds(x)、Time.seconds(x)或 Time.minutes(x)等。

在这里插入图片描述

  • 使用
// 使用 EventTimedatastream           .keyBy(id)           .window((EventTimeSessionWindows.withGap(Time.minutes(15)))           .process(new MyProcessFunction())// 使用 processing-timedatastream           .keyBy(id)           .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))           .process(new MyProcessFunction())

注意:由于 session window 的开始时间与结束时间取决于接收的数据。windowassigner 不会立即分配所有的元素到正确的窗口,SessionWindow 会为每个接收的元素初始化一个以该元素的时间戳为开始时间的窗口,使用 session gap 作为窗口大小,然后再合并重叠部分的窗口。所以, session window 操作需要指定用于合并的 Trigger 和 Window Function,比如ReduceFunction, AggregateFunction, or ProcessWindowFunction

Global Windows
  • 图解

Global Windows(全局窗口)将所有相同的 key 的数据分配到单个窗口中计算结果,窗口没有起始和结束时间,窗口需要借助于 Triger 来触发计算,如果不对 Global Windows 指定 Triger,窗口是不会触发计算的。因此,使用 Global Windows 需要非常慎重,用户需要非常明确自己在整个窗口中统计出的结果是什么,并指定对应的触发器,同时还需要有指定相应的数据清理机制,否则数据将一直留在内存中。

在这里插入图片描述

  • 使用
datastream    .keyBy(id)    .window(GlobalWindows.create())    .process(new MyProcessFunction())
Window Functions 分类

Flink 提供了两大类窗口函数,分别为增量聚合函数和全量窗口函数。其中增量聚合函数的性能要比全量窗口函数高,因为增量聚合窗口是基于中间结果状态计算最终结果的,即窗口中只维护一个中间结果状态,不要缓存所有的窗口数据。相反,对于全量窗口函数而言,需要对所以进入该窗口的数据进行缓存,等到窗口触发时才会遍历窗口内所有数据,进行结果计算。如果窗口数据量比较大或者窗口时间较长,就会耗费很多的资源缓存数据,从而导致性能下降。

  • 增量聚合函数

    包括:ReduceFunction、AggregateFunction 和 FoldFunction

  • 全量窗口函数

    包括:ProcessWindowFunction

使用介绍 ReduceFunction

输入两个相同类型的数据元素按照指定的计算方法进行聚合,然后输出类型相同的一个结果元素。要求输入元素的数据类型与输出元素的数据类型必须一致。实现的效果是使用上一次的结果值与当前值进行聚合。具体使用案例如下:

public class ReduceFunctionExample {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 模拟数据源        SingleOutputStreamOperator input = env.fromElements(                Tuple3.of(1L, 10, 1588491228L),                Tuple3.of(1L, 15, 1588491229L),                Tuple3.of(1L, 20, 1588491238L),                Tuple3.of(1L, 25, 1588491248L),                Tuple3.of(2L, 10, 1588491258L),                Tuple3.of(2L, 30, 1588491268L),                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {            @Override            public long extractAscendingTimestamp(Tuple3 element) {                return element.f2 * 1000;            }        });        input                .map(new MapFunction() {                    @Override                    public Tuple2 map(Tuple3 value) {                        return Tuple2.of(value.f0, value.f1);                    }                })                .keyBy(0)                .window(TumblingEventTimeWindows.of(Time.seconds(10)))                .reduce(new ReduceFunction() {                    @Override                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {                        // 根据第一个元素分组,求第二个元素的累计和                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);                    }                }).print();        env.execute("ReduceFunctionExample");    }}
AggregateFunction

与 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算函数,相比 ReduceFunction,AggregateFunction 在窗口计算上更加灵活,但是实现稍微复杂,需要实现 AggregateFunction 接口,重写四个方法。其最大的优势就是中间结果的数据类型和最终的结果类型不依赖于输入的数据类型。关于 AggregateFunction 的源码,如下所示:

/** *  @param   输入元素的数据类型 * @param  中间聚合结果的数据类型 * @param  最终聚合结果的数据类型 */@PublicEvolvingpublic interface AggregateFunction extends Function, Serializable {    /**     * 创建一个新的累加器     */    ACC createAccumulator();    /**     * 将新的数据与累加器进行聚合,返回一个新的累加器     */    ACC add(IN value, ACC accumulator);    /**     从累加器中计算最终结果并返回     */    OUT getResult(ACC accumulator);    /**     * 合并两个累加器并返回结果     */    ACC merge(ACC a, ACC b);}

具体使用代码案例如下:

public class AggregateFunctionExample {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 模拟数据源        SingleOutputStreamOperator input = env.fromElements(                Tuple3.of(1L, 10, 1588491228L),                Tuple3.of(1L, 15, 1588491229L),                Tuple3.of(1L, 20, 1588491238L),                Tuple3.of(1L, 25, 1588491248L),                Tuple3.of(2L, 10, 1588491258L),                Tuple3.of(2L, 30, 1588491268L),                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {            @Override            public long extractAscendingTimestamp(Tuple3 element) {                return element.f2 * 1000;            }        });        input.keyBy(0)             .window(TumblingEventTimeWindows.of(Time.seconds(10)))             .aggregate(new MyAggregateFunction()).print();        env.execute("AggregateFunctionExample");    }    private static class MyAggregateFunction implements AggregateFunction {        /**         * 创建一个累加器,初始化值         * @return         */        @Override        public Tuple2 createAccumulator() {            return Tuple2.of(0L,0);        }        /**         *         * @param value 输入的元素值         * @param accumulator 中间结果值         * @return         */        @Override        public Tuple2 add(Tuple3 value, Tuple2 accumulator) {            return Tuple2.of(value.f0,value.f1 + accumulator.f1);        }        /**         * 获取计算结果值         * @param accumulator         * @return         */        @Override        public Tuple2 getResult(Tuple2 accumulator) {            return Tuple2.of(accumulator.f0,accumulator.f1);        }        /**         * 合并中间结果值         * @param a 中间结果值 a         * @param b 中间结果值 b         * @return         */        @Override        public Tuple2 merge(Tuple2 a, Tuple2 b) {            return Tuple2.of(a.f0,a.f1 + b.f1);        }    }}
FoldFunction

FoldFunction 定义了如何将窗口中的输入元素与外部的元素合并的逻辑,该接口已标记过时,建议用户使用 AggregateFunction 来替换使用 FoldFunction。

public class FoldFunctionExample {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 模拟数据源        SingleOutputStreamOperator input = env.fromElements(                Tuple3.of(1L, 10, 1588491228L),                Tuple3.of(1L, 15, 1588491229L),                Tuple3.of(1L, 20, 1588491238L),                Tuple3.of(1L, 25, 1588491248L),                Tuple3.of(2L, 10, 1588491258L),                Tuple3.of(2L, 30, 1588491268L),                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {            @Override            public long extractAscendingTimestamp(Tuple3 element) {                return element.f2 * 1000;            }        });        input.keyBy(0)             .window(TumblingEventTimeWindows.of(Time.seconds(10)))             .fold("用户",new FoldFunction() {                 @Override                 public String fold(String accumulator, Tuple3 value) throws Exception {                    // 为第一个元素的值拼接一个"用户"字符串,进行输出                     return accumulator + value.f0 ;                 }             }).print();        env.execute("FoldFunctionExample");    }}
ProcessWindowFunction

前面提到的 ReduceFunction 和 AggregateFunction 都是基于中间状态实现增量计算的窗口函数。有些时候需要使用整个窗口的所有数据进行计算,比如求中位数和众数。另外,ProcessWindowFunction 的 Context 对象可以访问窗口的一些元数据信息,比如窗口结束时间、水位线等。ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数据元素的结果计算。

在系统内部,由 ProcessWindowFunction 处理的窗口会将所有已分配的数据存储到 ListState 中,通过将数据收集起来且提供对于窗口的元数据及其他一些特性的访问和使用,应用场景比 ReduceFunction 和 AggregateFunction 更加广泛。关于 ProcessWindowFunction 抽象类的源码,如下所示:

/** * @param  输入的数据类型 * @param  输出的数据类型 * @param  key 的数据类型 * @param  window 的类型 */@PublicEvolvingpublic abstract class ProcessWindowFunction extends AbstractRichFunction {    private static final long serialVersionUID = 1L;    /**     * 计算窗口数据,输出 0 个或多个元素     * @param key 窗口的 key     * @param context 窗口的上下文     * @param elements 窗口内的所有元素     * @param out 输出元素的 collector 对象     * @throws Exception     */    public abstract void process(KEY key, Context context, Iterable elements, Collector out) throws Exception;    /**     * 当窗口被销毁时,删除状态     * @param context     * @throws Exception     */    public void clear(Context context) throws Exception {}    //context 可以访问窗口的元数据信息.    public abstract class Context implements java.io.Serializable {    //返回当前被计算的窗口        public abstract W window();    // 返回当前 processing time.         public abstract long currentProcessingTime();    // 返回当前 event-time 水位线.        public abstract long currentWatermark();    //每个 key 和每个 window 的状态访问器        public abstract KeyedStateStore windowState();    // 每个 key 的 global state 的状态访问器.        public abstract KeyedStateStore globalState();        /**         * 向 side output 输出数据         * @param outputTag the {@code OutputTag}  side output 输出的标识.         * @param value 输出的数据.         */        public abstract  void output(OutputTag outputTag, X value);    }}

具体的使用案例如下:

public class ProcessWindowFunctionExample {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 模拟数据源        SingleOutputStreamOperator input = env.fromElements(                Tuple3.of(1L, 10, 1588491228L),                Tuple3.of(1L, 15, 1588491229L),                Tuple3.of(1L, 20, 1588491238L),                Tuple3.of(1L, 25, 1588491248L),                Tuple3.of(2L, 10, 1588491258L),                Tuple3.of(2L, 30, 1588491268L),                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {            @Override            public long extractAscendingTimestamp(Tuple3 element) {                return element.f2 * 1000;            }        });        input.keyBy(t -> t.f0)             .window(TumblingEventTimeWindows.of(Time.seconds(10)))             .process(new MyProcessWindowFunction())             .print();    }    private static class MyProcessWindowFunction extends ProcessWindowFunction {        @Override        public void process(                Long aLong,                Context context,                Iterable elements,                Collector out) throws Exception {            int count = 0;            for (Tuple3 in: elements) {                count++;            }            // 统计每个窗口数据个数,加上窗口输出            out.collect(Tuple3.of(aLong,"" + context.window(),count));        }    }}
增量聚合函数和 ProcessWindowFunction 整合

ProcessWindowFunction 提供了很强大的功能,但是唯一的缺点就是需要更大的状态存储数据。在很多时候,增量聚合的使用是非常频繁的,那么如何实现既支持增量聚合又支持访问窗口元数据的操作呢?可以将 ReduceFunction 和 AggregateFunction 与 ProcessWindowFunction 整合在一起使用。通过这种组合方式,分配给窗口的元素会立即被执行计算,当窗口触发时,会把聚合的结果传给 ProcessWindowFunction,这样 ProcessWindowFunction 的 process 方法的 Iterable 参数被就只有一个值,即增量聚合的结果。

  • ReduceFunction 与 ProcessWindowFunction 组合
public class ReduceProcessWindowFunction {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 模拟数据源        SingleOutputStreamOperator input = env.fromElements(                Tuple3.of(1L, 10, 1588491228L),                Tuple3.of(1L, 15, 1588491229L),                Tuple3.of(1L, 20, 1588491238L),                Tuple3.of(1L, 25, 1588491248L),                Tuple3.of(2L, 10, 1588491258L),                Tuple3.of(2L, 30, 1588491268L),                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {            @Override            public long extractAscendingTimestamp(Tuple3 element) {                return element.f2 * 1000;            }        });        input.map(new MapFunction() {            @Override            public Tuple2 map(Tuple3 value) {                return Tuple2.of(value.f0, value.f1);            }        })             .keyBy(t -> t.f0)             .window(TumblingEventTimeWindows.of(Time.seconds(10)))             .reduce(new MyReduceFunction(),new MyProcessWindowFunction())             .print();        env.execute("ProcessWindowFunctionExample");    }    private static class MyReduceFunction implements ReduceFunction {        @Override        public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {            //增量求和            return Tuple2.of(value1.f0,value1.f1 + value2.f1);        }    }    private static class MyProcessWindowFunction extends ProcessWindowFunction {        @Override        public void process(Long aLong, Context ctx, Iterable elements, Collector out) throws Exception {            // 将求和之后的结果附带窗口结束时间一起输出            out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));        }    }}
  • AggregateFunction 与 ProcessWindowFunction 组合
public class AggregateProcessWindowFunction {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 模拟数据源        SingleOutputStreamOperator input = env.fromElements(                Tuple3.of(1L, 10, 1588491228L),                Tuple3.of(1L, 15, 1588491229L),                Tuple3.of(1L, 20, 1588491238L),                Tuple3.of(1L, 25, 1588491248L),                Tuple3.of(2L, 10, 1588491258L),                Tuple3.of(2L, 30, 1588491268L),                Tuple3.of(2L, 20, 1588491278L))                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {                    @Override                    public long extractAscendingTimestamp(Tuple3 element) {                        return element.f2 * 1000;                    }                });        input.keyBy(t -> t.f0)                .window(TumblingEventTimeWindows.of(Time.seconds(10)))                .aggregate(new MyAggregateFunction(),new MyProcessWindowFunction())                .print();        env.execute("AggregateFunctionExample");    }    private static class MyAggregateFunction implements AggregateFunction {        /**         * 创建一个累加器,初始化值         *         * @return         */        @Override        public Tuple2 createAccumulator() {            return Tuple2.of(0L, 0);        }        /**         * @param value       输入的元素值         * @param accumulator 中间结果值         * @return         */        @Override        public Tuple2 add(Tuple3 value, Tuple2 accumulator) {            return Tuple2.of(value.f0, value.f1 + accumulator.f1);        }        /**         * 获取计算结果值         *         * @param accumulator         * @return         */        @Override        public Tuple2 getResult(Tuple2 accumulator) {            return Tuple2.of(accumulator.f0, accumulator.f1);        }        /**         * 合并中间结果值         *         * @param a 中间结果值 a         * @param b 中间结果值 b         * @return         */        @Override        public Tuple2 merge(Tuple2 a, Tuple2 b) {            return Tuple2.of(a.f0, a.f1 + b.f1);        }    }    private static class MyProcessWindowFunction extends ProcessWindowFunction {        @Override        public void process(Long aLong, Context ctx, Iterable elements, Collector out) throws Exception {            // 将求和之后的结果附带窗口结束时间一起输出            out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));        }    }}
window 生命周期解读 生命周期图解

窗口从创建到执行窗口计算再到被清除,需要经过一系列的过程,这个过程就是窗口的生命周期。

首先,当一个元素进入窗口算子之前,会由 WindowAssigner 分配该元素进入哪个或哪几个窗口,如果窗口不存在,则创建窗口。

其次,数据进入了窗口,这时要看有没有使用增量聚合函数,如果使用了增量聚合函数 ReduceFunction 或 AggregateFunction,新加入窗口的元素会立即触发增量计算,计算的结果作为窗口的内容。如果没有使用增量聚合函数,则会将进入窗口的数据存储到 ListState 状态中,进一步等待窗口触发时,遍历窗口元素进行聚合计算。

然后,每个元素在进入窗口之后会传递至该窗口的触发器,触发器决定了窗口何时被执行计算及何时需要清除自身和保存的内容。触发器可以根据已分配的元素或注册的计时器来决定某些特定时刻执行窗口计算或清除窗口内容。

最后,触发器成功触发之后的操作取决于使用的窗口函数,如果使用的是增量聚合函数,如 ReduceFunction 或 AggregateFunction,则会直接输出聚合的结果。如果只包含一个全量窗口函数,如 ProcessWindowFunction,则会作用窗口的所有元素,执行计算,输出结果。如果组合使用了 ReduceFunction 和 ProcessWindowFunction,即组合使用了增量聚合窗口函数和全量窗口函数,全量窗口函数会作用于增量聚合函数的聚合值,然后再输出最终的结果。

  • 情况 1:仅使用增量聚合窗口函数

在这里插入图片描述

  • 情况 2:仅使用全量窗口函数

在这里插入图片描述

  • 情况 3:组合使用增量聚合窗口函数与全量窗口函数

在这里插入图片描述

分配器(Window Assigners)

WindowAssigner 的作用是将输入的元素分配到一个或多个窗口,当 WindowAssigner 将第一个元素分配到窗口时,就会创建该窗口,所以一个窗口一旦被创建,窗口中必然至少有一个元素。Flink 内置了很多 WindowAssigners,本文主要讨论基于时间的 WindowAssigners,这些分配器都继承了 WindowAssigner 抽象类。关于常用的分配器,上文已经做了详细解释。下面先来看一下继承关系图:

在这里插入图片描述

接下来,将会对 WindowAssigner 抽象类的源码进行分析,具体代码如下:

/** * WindowAssigner 分配一个元素到 0 个或多个窗口 * 在一个窗口算子内部,元素是按照 key 进行分组的(使用 KeyedStream), * 相同 key 和 window 的元素集合称之为一个 pane(格子) * @param  要分配元素的数据类型 * @param  window 的类型:TimeWindow、GlobalWindow */@PublicEvolvingpublic abstract class WindowAssigner implements Serializable {    private static final long serialVersionUID = 1L;    /**     * 返回一个向其分配元素的窗口集合     * @param element 待分配的元素     * @param timestamp 元素的时间戳     * @param context WindowAssignerContext 对象     * @return     */    public abstract Collection assignWindows(T element, long timestamp, WindowAssignerContext context);    /**     * 返回一个与该 WindowAssigner 相关的默认 trigger(触发器)     * @param env 执行环境     * @return     */    public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env);    /**     * 返回一个窗口序列化器     * @param executionConfig     * @return     */    public abstract TypeSerializer getWindowSerializer(ExecutionConfig executionConfig);    /**     * 如果元素是基于 event time 分配到窗口的,则返回 true     * @return     */    public abstract boolean isEventTime();    /**     * 该 Context 允许访问当前的处理时间 processing time     */    public abstract static class WindowAssignerContext {        /**         * 返回当前的处理时间         */        public abstract long getCurrentProcessingTime();    }}
触发器(Triggers)

数据接入窗口后,窗口是否触发 WindowFunciton 计算,取决于窗口是否满足触发条件。Triggers 就是决定窗口何时触发计算并输出结果的条件,Triggers 可以根据时间或者具体的数据条件进行触发,比如进入窗口元素的个数或者进入窗口的某些特定的元素值等。前面讨论的内置 WindowAssigner 都有各自默认的触发器,当使用的是 Processing Time 时,则当处理时间超过窗口结束时间时会被触发。当使用 Event Time 时,当水位线超过窗口结束时间时会被触发。

Flink 在内部提供很多内置的触发器,常用的主要有 EventTimeTrigger、ProcessTimeTrigger 以及 CountTrigger 等。每种每种触发器都对应于不同的 Window Assigner,例如 Event Time 类型的 Windows 对应的触发器是 EventTimeTrigger,其基本原理是判断当前的 Watermark 是否超过窗口的 EndTime,如果超过则触发对窗口内数据的计算,反之不触发计算。关于上面分析的内置 WindowAssigner 的默认 trigger,可以从各自的源码中看到,具体罗列如下:

分配器对应的源码默认触发器TumblingEventTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }EventTimeTriggerTumblingProcessingTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }ProcessingTimeTriggerSlidingEventTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }EventTimeTriggerSlidingProcessingTimeWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }ProcessingTimeTriggerEventTimeSessionWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }EventTimeTriggerProcessingTimeSessionWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }ProcessingTimeTriggerGlobalWindowspublic Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return new NeverTrigger(); }NeverTrigger

这些 Trigger 都继承了 Trigger 抽象类,具体的继承关系,如下图:

在这里插入图片描述

关于这些内置的 Trigger 的具体解释如下:

Trigger解释EventTimeTrigger当前的 Watermark 是否超过窗口的 EndTime,如果超过则触发对窗口内数据的计算,反之不触发计算;ProcessTimeTrigger当前的 Processing Time 是否超过窗口的 EndTime,如果超过则触发对窗口内数据的计算,反之不触发计算;ContinuousEventTimeTrigger根据间隔时间周期性触发窗口或者 Window 的结束时间小于当前 EventTime,触发窗口计算;ContinuousProcessingTimeTrigger根据间隔时间周期性触发窗口或者 Window 的结束时间小于当前 ProcessTime,触发窗口计算;CountTrigger根据窗口的数据条数是否超过设定的阈值确定是否触发窗口计算;DeltaTrigger根据窗口的数据计算出来的 Delta 指标是否超过指定的阈值,判断是否触发窗口计算PurgingTrigger可以将任意触发器作为参数转换为 Purge 类型触发器,计算完成后数据将被清理。

关于抽象类 Trigger 的源码解释如下:

/** * @param  元素的数据类型 * @param  Window 的类型 */@PublicEvolvingpublic abstract class Trigger implements Serializable {    private static final long serialVersionUID = -4104633972991191369L;    /**     * 每个元素被分配到窗口时都会调用该方法,返回一个 TriggerResult 枚举     * 该枚举包含很多触发的类型:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE     *     * @param element   进入窗口的元素     * @param timestamp 进入窗口元素的时间戳     * @param window    窗口     * @param ctx       上下文对象,可以注册计时器(timer)回调函数     * @return     * @throws Exception     */    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;    /**     * 当使用 TriggerContext 注册的 processing-time 计时器被触发时,会调用该方法     *     * @param time   触发计时器的时间戳     * @param window 计时器触发的 window     * @param ctx    上下文对象,可以注册计时器(timer)回调函数     * @return     * @throws Exception     */    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;    /**     * 当使用 TriggerContext 注册的 event-time 计时器被触发时,会调用该方法     *     * @param time   触发计时器的时间戳     * @param window 计时器触发的 window     * @param ctx    上下文对象,可以注册计时器(timer)回调函数     * @return     * @throws Exception     */    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;    /**     * 如果触发器支持合并触发器状态,将返回 true     *     * @return     */    public boolean canMerge() {        return false;    }    /**     * 当多个窗口被合并成一个窗口时,会调用该方法     *     * @param window 合并之后的 window     * @param ctx    上下文对象,可以注册计时器回调函数,也可以访问状态     * @throws Exception     */    public void onMerge(W window, OnMergeContext ctx) throws Exception {        throw new UnsupportedOperationException("This trigger does not support merging.");    }    /**     * 清除所有 Trigger 持有的窗口状态     * 当窗口被销毁时,调用该方法     *     * @param window     * @param ctx     * @throws Exception     */    public abstract void clear(W window, TriggerContext ctx) throws Exception;    /**     * Context 对象,传给 Trigger 的方法参数中,用于注册计时器回调函数和处理状态     */    public interface TriggerContext {        // 返回当前处理时间        long getCurrentProcessingTime();        MetricGroup getMetricGroup();        // 返回当前水位线时间戳        long getCurrentWatermark();        // 注册一个 processing-time 的计时器        void registerProcessingTimeTimer(long time);        // 注册一个 EventTime 计时器        void registerEventTimeTimer(long time);        //  删除一个 processing-time 的计时器        void deleteProcessingTimeTimer(long time);        // 删除一个 EventTime 计时器        void deleteEventTimeTimer(long time);        /**         * 提取状态当前 Trigger 的窗口和 Key 的状态         */         S getPartitionedState(StateDescriptor stateDescriptor);        // 与 getPartitionedState 功能相同,该方法已被标记过时        @Deprecated         ValueState getKeyValueState(String name, Class stateType, S defaultState);        // 同 getPartitionedState 功能,该方法已被标记过时        @Deprecated         ValueState getKeyValueState(String name, TypeInformation stateType, S defaultState);    }    // TriggerContext 的扩展    public interface OnMergeContext extends TriggerContext {        // 合并每个 window 的状态,状态必须支持合并                    
关注
打赏
1560489824
查看更多评论
0.0939s