- 还有视频讲解在我的B站-宝哥chbxw, 希望大家可以支持一下,谢谢。
- 一、 State
- 1.1、Keyed State
- 1.1.1、Keyed State案例一
- 1.1.2、Keyed State案例二
- 1.2、Operator State
- 二、CheckPoint
- 2.1、CheckPoint 原理
- 2.2、CheckPoint 参数和设置 `streamEnv.getCheckpointConfig`
- 2.2.1、CheckPoint的开启和时间间隔指定
- 2.2.2、exactly-ance 和 at-least-once 语义选择
- 2.2.3、超时时间
- 2.2.4、检查点之间最小时间间隔:
- 2.2.5、通过 setMaxConcurrentCheckpoints()方法设定能够最大同时执行的 Checkpoint 数量。
- 2.2.6、是否删除 Checkpoint 中保存的数据:
- 2.2.7、TolerableCheckpointFailureNumber:
- 三、保存机制 StateBackend(状态后端)
- 3.1、MemoryStateBackend
- 3.2、FsStateBackend
- 3.3、RocksDBStateBackend
- 4.4、全局配置 StateBackend
- 四、Checkpoint案例
- 五、SavePoint
- 5.1、配置 Savepoints 的存储路径
- 5.2、在代码中设置算子ID
- 5.2.1、通过uid设置ID
- 5.3、触发SavePoint
- 5.3.1、启动Job
- 5.3.2、触发SavePoint
- 5.3.3、通过SvaePoint,启动Job
- 返回总目录
- 关注我的公众号【宝哥大数据】,更多干货
Flink 是一个默认就有状态的分析引擎,前面的 WordCount 案例可以做到单词的数量的累加,其实是因为在内存中保存了每个单词的出现的次数,这些数据其实就是状态数据。但是如果一个 Task 在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义(At -least-once 和 Exactly-once)上来说,Flink引入了 State 和 CheckPoint。
还有视频讲解在我的B站-宝哥chbxw, 希望大家可以支持一下,谢谢。 一、 StateState 一般指一个具体的 Task/Operator 的状态,State 数据默认保存在 Java 的堆内存
Flink 中有两种基本的状态:Keyed State 和 Operator State。
1.1、Keyed State基于KeyedStream 上的状态,这个状态是跟特定的 Key 绑定的。KeyedStream 流上的每一个 Key,都对应一个 State。Flink 针对 Keyed State 提供了 以下可以保存 State 的数据结构:
- ValueState: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通update(T) 进行更新,通过 T value() 进行检索。
- ListState: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过 add(T) 或者 addAll(List) 进行添加元素,通过 Iterableget() 获得整个列表。还可以通过 update(List) 覆盖当前的列表。
- ReducingState: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState 类似,但使用 add(T) 增加元素,会使用提供的 ReduceFunction 进行聚合。
- AggregatingState: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState 相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState 类似,但使用 add(IN) 添加的元素会用指定的AggregateFunction 进行聚合。
- FoldingState: 保留一个单值,表示添加到状态的所有值的聚合。 与ReducingState 相反,聚合类型可能与添加到状态的元素类型不同。接口与 ListState类似,但使用 add(T)添加的元素会用指定的 FoldFunction 折叠成聚合值。
- MapState: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用 put(UK,UV) 或者 putAll(Map) 添加映射。使用 get(UK) 检索特定 key。 使用 entries(),keys() 和 values() 分别检索映射、键和值的可迭代视图。
package com.chb.flink.state
import com.chb.flink.source.StationLog
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
/**
* 案例需求:计算每个手机的呼叫间隔时间,单位是毫秒
* 第一种方法
*/
object TestKeyState1 {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取文件数据
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
data.keyBy(_.callOut) // 按照主键分组
.flatMap(new CallIntervalFunc()) // 定义一个富函数
.print()
streamEnv.execute()
}
/**
* 定义一个富函数:
*
* 输出时一个二元组(手机号码,时间间隔)
*/
class CallIntervalFunc extends RichFlatMapFunction[StationLog, (String, Long)] {
// 定义一个状态,用于保存前一次呼叫的的时间
var preCallTimeState: ValueState[Long] = _
override def open(parameters: Configuration): Unit = {
// 状态描述
val stateDescriptor = new ValueStateDescriptor[Long]("pre", classOf[Long])
// 通过上下文,创建一个状态
preCallTimeState = getRuntimeContext.getState(stateDescriptor)
}
override def flatMap(in: StationLog, collector: Collector[(String, Long)]): Unit = {
var pre = preCallTimeState.value()
if (pre == null || pre == 0) { // 第一次呼叫
preCallTimeState.update(in.callTime)
} else {
val interval = Math.abs(in.callTime - pre)
collector.collect((in.callOut + " " + in.callIn, interval))
// preCallTimeState.update(in.callTime)
}
}
}
}
1.1.2、Keyed State案例二
package com.chb.flink.state
import com.chb.flink.source.StationLog
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
/**
* 案例需求:计算每个手机的呼叫间隔时间,单位是毫秒
* 第一种方法
*/
object TestKeyState2 {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.streaming.api.scala._
//读取文件数据
val data = streamEnv.readTextFile(getClass.getResource("/station.log").getPath)
.map(line => {
var arr = line.split(",")
new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
})
// data.keyBy(_.callOut) // 按照主叫分组
// //有两种情况1、状态中有上一次的通话时间,2、没有。采用scala中的模式匹配
// .mapWithState[(String, Long), StationLog] {
// case (in: StationLog, None) => ((in.callOut, 0), Some(in)) //状态中没有值 是第一次呼叫
// case (in: StationLog, pre: Some[StationLog]) => { //状态中有值,是第二次呼叫
// var interval = Math.abs(in.callTime - pre.get.callTime)
// ((in.callOut, interval), Some(in))
// }
// }
// .filter(_._2 != 0)
// .print()
data.keyBy(_.callOut) //按照呼叫手机号分组
.flatMapWithState[(String, Long), StationLog] {
case (in: StationLog, None) => (List.empty, Some(in)) //如果状态中没有,则存入
case (in: StationLog, pre: Some[StationLog]) => { //如果状态中有值则计算时间间隔
var interval = Math.abs(in.callTime - pre.get.callTime)
(List((in.callIn, interval)), Some(in))
}
}
.filter(_._2 != 0)
.print()
streamEnv.execute()
}
}
1.2、Operator State
Operator State 与 Key 无关,而是与 Operator 绑定,整个 Operator 只对应一个 State。
比如: Flink 中的 Kafka Connector 就使用了 Operator State,它会在每个 Connector 实例中,保存该实例消费 Topic 的所有(partition, offset)映射。
二、CheckPointCheckPoint(可以理解为 CheckPoint 是把 State 数据持久化存储了)则表示了一个Flink Job 在一个特定时刻的一份全局状态快照,即包含了所有 Task/Operator 的状态。
当程序出现问题需要恢复 Sate 数据的时候,只有程序提供支持才可以实现 State 的容错。State 的容错需要依靠 CheckPoint 机制,这样才可以保证 Exactly-once 这种语义,但是注意,它只能保证 Flink 系统内的 Exactly-once,比如 Flink 内置支持的算子。针对 Source和 Sink 组件,如果想要保证 Exactly-once 的话,则这些组件本身应支持这种语义
2.1、CheckPoint 原理 Flink 中基于异步轻量级的分布式快照技术提供了 Checkpoints 容错机制,分布式快照可以将同一时间点 Task/Operator 的状态数据全局统一快照处理,包括前面提到的 KeyedState 和 Operator State。Flink 会在输入的数据集上间隔性地生成 checkpoint barrier,通过栅栏(barrier)将间隔时间段内的数据划分到相应的 checkpoint 中。 如下图:
从检查点恢复如下图:
- 做1、2、3、4…偶数和奇数累加操作, 在计算累加7的时候失败
- 重新启动程序
- 从checkpoint开始恢复
- 5是索引,从第6个数据开始读,即读入6
- 6是偶数的和, 6+6=12,恢复
- 9是奇数的和
- 接下来就是读取新的数据7,继续开始执行
streamEnv.getCheckpointConfig
2.2.1、CheckPoint的开启和时间间隔指定
默认是不开启
streamEnv.enableCheckpointing(1000); // 1000ms
2.2.2、exactly-ance 和 at-least-once 语义选择
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
2.2.3、超时时间
超时时间指定了每次 Checkpoint 执行过程中的上限时间范围,一旦 Checkpoint 执行时间超过该阈值,Flink 将会中断 Checkpoint 过程,并按照超时处理。该指标可以通过setCheckpointTimeout 方法设定,默认为 10 分钟。
streamEnv.getCheckpointConfig.setCheckpointTimeout(50000)
2.2.4、检查点之间最小时间间隔:
该参数主要目的是设定两个 Checkpoint 之间的最小时间间隔,防止出现例如状态数据过大而导致 Checkpoint 执行时间过长,从而导致 Checkpoint 积压过多,最终 Flink 应用密集地触发 Checkpoint 操作,会占用了大量计算资源而影响到整个应用的性能。
streamEnv.getCheckpointConfig.setMinPauseBetweenCheckpoints(600)
2.2.5、通过 setMaxConcurrentCheckpoints()方法设定能够最大同时执行的 Checkpoint 数量。
在默认情况下只有一个检查点可以运行,根据用户指定的数量可以同时触发多个 Checkpoint,进而提升 Checkpoint 整体的效率。
streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
2.2.6、是否删除 Checkpoint 中保存的数据:
设置为 RETAIN_ON_CANCELLATION:表示一旦 Flink 处理程序被 cancel 后,会保留CheckPoint 数据,以便根据实际需要恢复到指定的 CheckPoint。 设置为 DELETE_ON_CANCELLATION:表示一旦 Flink 处理程序被 cancel 后,会删除CheckPoint 数据,只有 Job 执行失败的时候才会保存 CheckPoint
//删除
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)
//保留
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
2.2.7、TolerableCheckpointFailureNumber:
设置可以容忍的检查的失败数,超过这个数量则系统自动关闭和停止任务。
streamEnv.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)
三、保存机制 StateBackend(状态后端)
默认情况下,State 会保存在 TaskManager 的内存中,CheckPoint 会存储在 JobManager的内存中。State 和 CheckPoint 的存储位置取决于 StateBackend 的配置。
Flink 一共提供了 3 种 StateBackend 。 包 括 基 于 内 存 的MemoryStateBackend 、 基 于 文 件 系 统 的FsStateBackend,以及基于 RockDB 作为存储介质的 RocksDBState-Backend。
3.1、MemoryStateBackend基于内存的状态管理具有非常快速和高效的特点,但也具有非常多的限制,最主要的就是内存的容量限制,一旦存储的状态数据过多就会导致系统内存溢出等问题,从而影响整个应用的正常运行。同时如果机器出现问题,整个主机内存中的状态数据都会丢失,进而无法恢复任务中的状态数据。因此从数据安全的角度建议用户尽可能地避免在生产环境中使用MemoryStateBackend。
streamEnv.setStateBackend(new MemoryStateBackend(10*1024*1024))
3.2、FsStateBackend
FsStateBackend 是基于文件系统的一种状态管理器,这里的文件系统可以是本地文件系统,也可以是 HDFS 分布式文件系统。FsStateBackend 更适合任务状态非常大的情况,例如应用中含有时间范围非常长的窗口计算,或 Key/valueState 状态数据量非常大的场景。
streamEnv.setStateBackend(new FsStateBackend("hdfs://chb01:9000/checkpoint/cp1"))
3.3、RocksDBStateBackend
是 Flink 中内置的第三方状态管理器,需要单独引入相关的依赖包到工程中。
org.apache.flink
flink-statebackend-rocksdb_2.11
1.10.1
RocksDBStateBackend 采用异步的方式进行状态数据的 Snapshot,任务中的状态数据首先被写入本地 RockDB 中,这样在 RockDB 仅会存储正在进行计算的热数据,而需要进行CheckPoint 的时候,会把本地的数据直接复制到远端的 FileSystem 中。
与 FsStateBackend 相比,RocksDBStateBackend 在性能上要比 FsStateBackend 高一些,主要是因为借助于 RocksDB 在本地存储了最新热数据,然后通过异步的方式再同步到文件系统中,但 RocksDBStateBackend 和 MemoryStateBackend 相比性能就会较弱一些。RocksDB克服了 State 受内存限制的缺点,同时又能够持久化到远端文件系统中,推荐在生产中使用。
streamEnv.setStateBackend(new RocksDBStateBackend ("hdfs://chb01:9000/checkpoint/cp2"))
4.4、全局配置 StateBackend
以上的代码都是单 job 配置状态后端,也可以全局配置状态后端,需要修改flink-conf.yaml 配置文件:
state.backend: filesystem
其中:
- filesystem 表示使用 FsStateBackend,
- jobmanager 表示使用 MemoryStateBackend
- rocksdb 表示使用 RocksDBStateBackend。
state.checkpoints.dir: hdfs://chb01:9000/checkpoints
默认情况下,如果设置了 CheckPoint 选项,则 Flink 只保留最近成功生成的 1 个CheckPoint,而当 Flink 程序失败时,可以通过最近的 CheckPoint 来进行恢复。但是,如果希望保留多个 CheckPoint,并能够根据实际需要选择其中一个进行恢复,就会更加灵活。 添加如下配置,指定最多可以保存的 CheckPoint 的个数。
state.checkpoints.num-retained: 2
四、Checkpoint案例
package com.chb.flink.state
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TestCheckPointByHDFS {
//使用WordCount案例来测试一下HDFS的状态后端,先运行一段时间Job,然后cansol,在重新启动,看看状态是否是连续的
def main(args: Array[String]): Unit = {
//1、初始化Flink流计算的环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//开启CheckPoint并且设置一些参数
streamEnv.enableCheckpointing(5000) //每隔5秒开启一次CheckPoint
streamEnv.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/checkpoint/cp1")) //存放检查点数据
streamEnv.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
streamEnv.getCheckpointConfig.setCheckpointTimeout(5000)
streamEnv.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
streamEnv.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //终止job保留检查的数据
//修改并行度
streamEnv.setParallelism(1) //默认所有算子的并行度为1
//2、导入隐式转换
import org.apache.flink.streaming.api.scala._
//3、读取数据,读取sock流中的数据
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888) //DataStream ==> spark 中Dstream
//4、转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.map((_, 1)).setParallelism(2)
.keyBy(0) //分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1).setParallelism(2) //聚会累加算子
//5、打印结果
result.print("结果").setParallelism(1)
//6、启动流计算程序
streamEnv.execute("wordcount")
}
}
- 取消Job
- 查看checkpoint的状态文件
- 通过命令启动Job
[chengbao@ShServer bin]$ ./flink run -d \
-s hdfs://hadoop01:9000/checkpoint/cp1/2e5c9efcd0154918b3a25327e168876f/chk-61 \
-c com.chb.flink.state.TestCheckPointByHDFS ~/FlinkProject-1.0-SNAPSHOT.jar
Job has been submitted with JobID 640aad7d832a4166cd47451e545515a0
[chengbao@ShServer bin]$
出错: Caused by: java.io.FileNotFoundException: Cannot find meta data file '_metadata' in directory 'hdfs://hadoop01:9000/checkpoint/cp1/60c679c11cca5d36b14e77aaad3ccc5a'.
Please try to load the checkpoint/savepoint directly from the metadata file instead of the directory.
失败由于少写chk-7
4. 启动完成,输入新数据,查看结果
Savepoints 是检查点的一种特殊实现,底层实现其实也是使用 Checkpoint 的机制。Savepoint 是用户以手动命令的方式触发Checkpoint,并将结果持久化到指定的存储路径中,其主要目的是帮助用户在升级和维护集群过程中保存系统中的状态数据,避免因为停机运维或者升级应用等正常终止应用的操作而导致系统无法恢复到原有的计算状态的情况,从而无法实现从端到端的 Excatly-Once 语义保证。
5.1、配置 Savepoints 的存储路径在 flink-conf.yaml 中配置 SavePoint 存储的位置,设置后,如果要创建指定 Job 的SavePoint,可以不用在手动执行命令时指定 SavePoint 的位置。
state.savepoints.dir: hdfs:/hadoop101:9000/savepoint
5.2、在代码中设置算子ID
为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过手动给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动设置 ID。
5.2.1、通过uid设置ID //3、读取数据,读取sock流中的数据
val stream: DataStream[String] = streamEnv.socketTextStream("hadoop01", 8888) //DataStream ==> spark 中Dstream
.uid("socket001")
//4、转换和处理数据
val result: DataStream[(String, Int)] = stream.flatMap(_.split(" "))
.uid("flatmap001")
.map((_, 1)).setParallelism(2)
.uid("map001")
.keyBy(0) //分组算子 : 0 或者 1 代表下标。前面的DataStream[二元组] , 0代表单词 ,1代表单词出现的次数
.sum(1)
.uid("sum001")
5.3、触发SavePoint
5.3.1、启动Job
[chengbao@ShServer bin]$ ./flink run -c com.chb.flink.state.TestSavePoints -d ~/FlinkProject-1.0-SNAPSHOT.jar
Job has been submitted with JobID 359508b9669cb752dcc9f2709ef30968
[chengbao@ShServer bin]$
结果> (chb,1)
结果> (ling,1)
结果> (love,1)
5.3.2、触发SavePoint
# 触发SavePoint, 359508b9669cb752dcc9f2709ef30968就是上面的JobID
[chengbao@ShServer bin]$ ./flink savepoint 359508b9669cb752dcc9f2709ef30968
Triggering savepoint for job 359508b9669cb752dcc9f2709ef30968.
Waiting for response...
Savepoint completed. Path: hdfs://hadoop01:9000/savepoint/savepoint-359508-333ecb216903
You can resume your program from this savepoint with the run command.
[chengbao@ShServer bin]$
# 然后 cancel Job
[chengbao@ShServer bin]$ ./flink cancel 359508b9669cb752dcc9f2709ef30968
Cancelling job 359508b9669cb752dcc9f2709ef30968.
Cancelled job 359508b9669cb752dcc9f2709ef30968.
[chengbao@ShServer bin]$
5.3.3、通过SvaePoint,启动Job
[chengbao@ShServer bin]$ ./flink run -s hdfs://hadoop01:9000/savepoint/savepoint-359508-333ecb216903 -c com.chb.flink.state.TestSavePoints -d ~/FlinkProject-1.0-SNAPSHOT.jar
Job has been submitted with JobID b82b90c643114dbf956d717e2e6d8fd4
[chengbao@ShServer bin]$
# 输入新数据
结果> (chb,2)
结果> (ling,2)
结果> (love,2)
返回总目录
关注我的公众号【宝哥大数据】,更多干货