在使用flink的时候,我们在整合Kafka作为Source的时候需要设置watermarket.否则就不能持续行的去消费.举一反三,watermarket在其它技术整合的时候也许要进行设置.cuiyaonan2000@163.com.
- window:是Flink将流数据或者批量数据根据时间(开始时间,结束时间)划分的多个段叫做bucket.(批量数据对于flink是特殊的流式数据,至于为什么将数据流划分成段可以自己百度)
- watermark:中文译名是水位线,它的存在是为了解决flink将数据划分成段所带来的的问题.水位线其实就是时间戳,用于表示flink接收到的数据属于哪个一个窗口.
- allowLateNess: 的作用是将窗口的结束时间在延迟指定的时间.
- sideOutPut: 的作用是是将在窗口关闭后,到达的时间范围内的数据导出到指定的地方
参考资料:
- [白话解析] Flink的Watermark机制 - 罗西的思考 - 博客园
- Kafka | Apache Flink
- 生成 Watermark | Apache Flink -----------最新版本的flink 在代码使用上改动较大,但是原理思路是一样的
对于Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于Spark和Flink都产生了窗口计算。
举例(注意如果watermark超过了窗口的结束时间,上一个窗口就会被删除cuiyaonan2000@163.com)
使用基于事件时间的窗口策略
每5分钟创建一个不重叠(或翻滚)的窗口并允许延迟1分钟。
假定目前是12:00。 当具有落入该间隔的时间戳的第一个元素到达时,Flink将为12:00到12:05之间的间隔创建一个新窗口,当水位线(watermark)到12:06时间戳时将删除它
关于window的一些设置如下:
- Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。
- Trigger:触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于“当窗口中的元素数量大于4”时,或“当水位线通过窗口结束时”。
- Evictor:它可以在 触发器触发后 & 应用函数之前和/或之后 从窗口中删除元素。
- 窗口还拥有函数,比如 ProcessWindowFunction,ReduceFunction,AggregateFunction或FoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。
创建窗口的意思是将元数据按照某个字段进行分组,分组与不分组的差别很大.分组可以并发处理,不分组则不能并发处理cuiyaonan2000@163.com ---忘接了当时为什么这么写,不分组也可以设置并行度,当时可能是想说 分布后可以拆分多个流的意思????????
-
对于Keyed流,可以将传入事件的任何属性用作key。 拥有Keyed stream将允许窗口计算由多个任务并行执行,因为每个逻辑Keyed流可以独立于其余任务进行处理。 相同Key的所有元素将被发送到同一个任务。
-
在Non-Keyed流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为1。
参考代码如下:将传递过来的Person的Name作为任务分组的条件.这样在同一个窗口期内相同姓名的人就是一个单独的任务处理
滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。
滑动窗口滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。
例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。(故此元素会被重复消费cuiyaonan2000@163.com)
会话窗口(感觉更有效)会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。
例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
TimeFilnk中的时间分类主要有如下三种
- Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述(包含在原始数据中的时间戳)。
- Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间(Flink 接收到数据的时间)。
- Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间(算子处理数据的时间)
是数据流入到具体某个算子时候相应的系统时间。
这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。
ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境或者异步环境中,ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。因为它容易受到从记录到达系统的速度(例如从消息队列)到记录在系统内的operator之间流动的速度的影响(停电,调度或其他)。
Ingestion TimeIngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。每个记录将当前时间作为时间戳,并且后续基于时间的操作(如时间窗口)引用该时间戳。
提取时间在概念上位于事件时间和处理时间之间。与处理时间相比,它稍早一些。IngestionTime与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),所以同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。
与事件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水位线。
在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水位线生成功能。-----此类型的时间戳不用配置watermark
Event Time事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。
在进入Apache Flink框架之前EventTime通常要嵌入到记录中(原始数据需要包含时间戳),并且EventTime也可以从记录中提取出来。
事件时间程序必须指定如何生成事件时间的Watermarks
,这是表示事件时间进度的机制。----即告诉Filnk每条数据的时间戳cuiyaonan2000@163.com
现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。
比如:
有1~10个事件。
乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10
为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。
如何设置时间类型该时间的设置在1.12中已经被废弃了,默认不需要设置.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
Watermark(触发window内数据被处理的关键)
Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。
比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。
watermark其实就是Filnk存储每条数据中时间戳的地方,当然watermark只能配置Event time使用cuiyaonan2000@163.com
再次说明watermark仅限eventtime使用的时候才需要设置,通过官网的管理平台也可以确认cuiyaonan2000@163.com
WaterMark相当于一个EndLine,一旦Watermarks大于了某个window的end_time,就意味着windows_end_time时间和WaterMark时间相同的窗口开始计算执行了。
就是说,我们根据一定规则,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。
WaterMark时间可以用Flink系统现实时间,也可以用处理数据所携带的Event time。
使用Flink系统现实时间,在并行和多线程中需要注意的问题较少,因为都是以现实时间为标准。
如果使用处理数据所携带的Event time作为WaterMark时间,需要注意两点:
- 因为数据到达并不是循序的,注意保存一个当前最大时间戳作为WaterMark时间
- 并行同步问题
标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。
在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
定期水位线(Periodic Watermark)周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。
在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。
延迟数据的处理虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。
迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:
- 重新激活已经关闭的窗口并重新计算以修正结果。
- 将迟到事件收集起来另外处理。
- 将迟到事件视为错误消息并丢弃。
Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side Output
和Allowed Lateness
。
Side Output
机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。Allowed Lateness
机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了ProcessWindowFunction
API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。
这里总结机制为:
-
窗口window 的作用是为了周期性的获取数据。
-
watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
-
allowLateNess是将窗口关闭时间再延迟一段时间。
-
sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到特殊流。
这个是kafka的测试数据,这里是有意乱序了.
如下是Flink的处理窗口信息,可以看到是有序的,证明watermark的设置以及allowedLateness都生效了
Thread.sleep(10000);
long time = System.currentTimeMillis();
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(1, "name" + (i), time + 3000)));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(2, "name" + (i + 2), time + 3000)));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(2, "name" + (i + 2), time + 1000)));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(1, "name" + (i), time + 1000)));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(2, "name" + (i + 2), time + 4000)));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(1, "name" + (i), time + 4000 )));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(1, "name" + (i), time)));
kafkaTemplate.send("topic-name-cui",
gson.toJson(new User(2, "name" + (i + 2), time)));
从上可以证明单个窗口内的数据不一定是有序的,让你循环处理的,但是窗口和窗口之间内的数据一定是有序的.且注意通过keyby的数据之间的水位线是不会相互影响的.cuiyaonan2000@163.com