通俗的将Flink的状态即为存储算子中的子任务的中间值,相当于我们web的session.这里需要注意的是子任务是个线程,且这个线程在不断地处理数据,那flink的state就是存储线程中间变量的一个解决方案cuiyaonan2000@163.com
参考版本为: v1.13.2
参考地址:
- Working with State | Apache Flink
- 概览 | Apache Flink
随着我们自定的算子实现,我们不得不考虑一个问题.即数据的状态,流式数据处理其实也是批量的处理,那我们的计算就会有依赖,即依赖上一步的结果或者相关的处理结果才能继续.所以这里就必须要了解Flink的状态管理
流计算一般分为有状态和无状态两种(这里的解决方案用大家都懂的意思就是,使用一个全局的变量来存储每次的计算结果.这里的全局变量是跨机器,跨线程的,因为我们的算子是有并行度的概念的.cuiyaonan2000@163.com)
- 无状态计算指的是处理过程中不依赖于之前的数据处理结果或其他中间数据;
- 而有状态的计算会维护状态,并基于最新数据和当前状态生成输出结果。
Flink 保证 exactly-once 主要是通过他的 checkpoint 和 savepoint 机制.
Flink 状态Flink 应用程序的状态访问都在本地进行,因为这有助于其提高吞吐量和降低延迟。通常情况下 Flink 应用程序都是将状态存储在 JVM 堆上,但如果状态太大,我们也可以选择将其以结构化数据格式存储在高速磁盘中。(这里注意不同的算子,相同算子的并行子任务是无法共享状态的cuiyaonan2000@163.com)
如下为一个并行度为3的算子的状态示意图.
-
算子状态(Operator State): 算子状态的作用范围限定为算子任务
-
键控状态(keyed State):生产中应用案例较多,根据输入数据流中定义的key来维护和访问
keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream
上使用,在Java/Scala API上可以通过 stream.keyBy(...)
得到 KeyedStream
所有类型的状态还有一个clear()
方法,清除当前 key 下的状态数据,也就是当前输入元素的 key。这里注意不同的算子,相同算子的并行子任务是无法共享状态的cuiyaonan2000@163.com
-
ValueState
: 保存一个可以更新和检索的值(如上所述,每个值都对应到当前的输入数据的 key,因此算子接收到的每个 key 都可能对应一个值)。 这个值可以通过update(T)
进行更新,通过T value()
进行检索。 -
ListState
: 保存一个元素的列表。可以往这个列表中追加数据,并在当前的列表上进行检索。可以通过add(T)
或者addAll(List)
进行添加元素,通过Iterable get()
获得整个列表。还可以通过update(List)
覆盖当前的列表。 -
ReducingState
: 保存一个单值,表示添加到状态的所有值的聚合。接口与ListState
类似,但使用add(T)
增加元素,会使用提供的ReduceFunction
进行聚合。 -
AggregatingState
: 保留一个单值,表示添加到状态的所有值的聚合。和ReducingState
相反的是, 聚合类型可能与 添加到状态的元素的类型不同。 接口与ListState
类似,但使用add(IN)
添加的元素会用指定的AggregateFunction
进行聚合。 -
MapState
: 维护了一个映射列表。 你可以添加键值对到状态中,也可以获得反映当前所有映射的迭代器。使用put(UK,UV)
或者putAll(Map)
添加映射。 使用get(UK)
检索特定 key。 使用entries()
,keys()
和values()
分别检索映射、键和值的可迭代视图。你还可以通过isEmpty()
来判断是否包含任何键值对。
创建状态必须使用StateDescriptor,根据不同的状态类型可以创建如下的
ValueStateDescriptor
,ListStateDescriptor
,AggregatingStateDescriptor
,ReducingStateDescriptor
-
MapStateDescriptor
。
状态通过 RuntimeContext
进行访问,因此只能在 rich functions 中使用。请参阅这里获取相关信息, 但是我们很快也会看到一个例子。RichFunction
中 RuntimeContext
提供如下方法:
ValueState getState(ValueStateDescriptor)
ReducingState getReducingState(ReducingStateDescriptor)
ListState getListState(ListStateDescriptor)
AggregatingState getAggregatingState(AggregatingStateDescriptor)
MapState getMapState(MapStateDescriptor)
示例:
package cui.yao.nan.flink.string;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import cui.yao.nan.flink.MyKafkaDeserializationSchema;
import cui.yao.nan.flink.MyKafkaSerializationSchema;
import cui.yao.nan.flink.domain.User;
import cui.yao.nan.pojo.Person;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
@Component
@Order(2)
//CommandLineRunner会在服务启动之后,且所有的bean都实例化后,会立即执行 run 方法
//Order 注解的执行优先级是按value值从小到大顺序。所以 producer比consumer先执行
public class KafkaConsumer implements CommandLineRunner {
protected static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
public static void main(String[] args) {
new KafkaConsumer().run(null);
}
public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void run(String... args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("topic-name-cui2",
new MyKafkaDeserializationSchema(User.class),
getProperties());
//从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。
//如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。
kafkaConsumer.setStartFromGroupOffsets();
//如果启用了 checkpointing,那么当 checkpointing 完成时,Flink Kafka Consumer 将提交的 offset 存储在 checkpoint 状态中。
// 这确保 Kafka broker 中提交的 offset 与 checkpoint 状态中的 offset 一致
//默认是true
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
//没有设置watermarket
// kafkaConsumer.assignTimestampsAndWatermarks()
DataStream dataStream = env.addSource(kafkaConsumer);
dataStream.rebalance();
FlinkKafkaProducer kafkaProducer = new
FlinkKafkaProducer("topic-name-cui"
, new MyKafkaSerializationSchema() , getProperties());
dataStream.map(new RichMapFunction() {
@Override
public void open(Configuration parameters) throws Exception {
String name = Thread.currentThread().getName();
}
@Override
public User map(User s) throws Exception {
String name = Thread.currentThread().getName();
return s;
}
})
.keyBy(value -> value.getId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.apply(new RichWindowFunction() {
//不序列化keyList;
private transient ListState keyList;
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor outputStateDescriptor
= new ListStateDescriptor(
"outPutState",
String.class);
keyList = getRuntimeContext().getListState(outputStateDescriptor);
log.info("初始化apply,时间是{},线程名称{}" ,sdf.format(new Date()),Thread.currentThread().getName());
}
@Override
public void apply(Integer key, TimeWindow timeWindow, Iterable iterable, Collector collector) throws Exception {
Tuple1 result = new Tuple1();
int num = 0 ;
for (User user : iterable) {
keyList.add(user.getId()+"");
num++;
}
result.f0 = (num+"");
collector.collect(result);
log.info("处理apply结束时间是:{},处理的key是{},线程名称:{}",sdf.format(new Date()),keyList.get().toString(),Thread.currentThread().getName());
}
}).addSink(kafkaProducer);
//如下的废弃了
// .flatMap(
// new RichFlatMapFunction(){
// @Override
// public void flatMap(User s, Collector collector) throws Exception {
// String name = Thread.currentThread().getName();
// log.info("keyby 后的map的线程name:{},消费的数据时间:{}",name,s.getId());
// collector.collect(s);
// }
//
// public User map(User s) throws Exception {
// String name = Thread.currentThread().getName();
// log.info("keyby 后的map的线程name:{},消费的数据时间:{}",name,s.getAge());
// return s;
// }
// }
// ).setParallelism(2);
//打印也是一种sink
// dataStream.print();
try {
env.execute("start kafkaconsume");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public class AA implements KeySelector{
@Override
public String getKey(Person person) throws Exception {
return person.getName();
}
}
private Properties getProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "172.17.12.124:9092");
properties.setProperty("zookeeper.connect", "172.17.12.124:2181");
properties.setProperty("group.id", "testKafka");
// properties.setProperty("client.id", "cuiyaonan-client");
// properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
状态有效期 (TTL)
即设置状态什么时候过期,过期了怎么处理的设置.
在使用状态 TTL 前,需要先构建一个StateTtlConfig
配置对象。 然后把配置传递到 state descriptor 中启用 TTL 功能:
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
如上的代码详解:
TTL 配置有以下几个选项: newBuilder
的第一个参数表示数据的有效期,是必选项(过期时间设置)。
TTL 的更新策略(默认是 OnCreateAndWrite
):----即刷过期时间从什么时候开始
StateTtlConfig.UpdateType.OnCreateAndWrite
- 仅在创建和写入时更新StateTtlConfig.UpdateType.OnReadAndWrite
- 读取时也更新
数据在过期但还未被清理时的可见性配置如下(默认为 NeverReturnExpired
):
StateTtlConfig.StateVisibility.NeverReturnExpired
- 不返回过期数据StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp
- 会返回过期但未清理的数据
NeverReturnExpired
情况下,过期数据就像不存在一样,不管是否被物理删除。这对于不能访问过期数据的场景下非常有用,比如敏感数据。 ReturnExpiredIfNotCleanedUp
在数据被物理删除前都会返回。
默认情况下,过期数据会在读取的时候被删除,例如 ValueState#value
,同时会有后台线程定期清理(如果 StateBackend 支持的话)。可以通过 StateTtlConfig
配置关闭后台清理():
import org.apache.flink.api.common.state.StateTtlConfig;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
关于清理策略和优化官网还有很多设置,具体参考官网.
Operator State这里其实需要注意有的算子本身就有状态,这种的更难以管理和使用.
算子状态(或者非 keyed 状态)是绑定到一个并行算子实例的状态。Kafka Connector 是 Flink 中使用算子状态一个很具有启发性的例子。Kafka consumer 每个并行实例维护了 topic partitions 和偏移量的 map 作为它的算子状态。
当并行度改变的时候,算子状态支持将状态重新分发给各并行算子实例。处理重分发过程有多种不同的方案。
在典型的有状态 Flink 应用中你无需使用算子状态。它大都作为一种特殊类型的状态使用。用于实现 source/sink,以及无法对 state 进行分区而没有主键的这类场景中。
注意: Python DataStream API 仍无法支持算子状态。
具体参考官网: Working with State | Apache Flink