您当前的位置: 首页 >  kafka

蔚1

暂无认证

  • 0浏览

    0关注

    4753博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

你真的了解Flink Kafka source吗?

蔚1 发布时间:2020-05-05 23:31:35 ,浏览量:0

当我们在使用 Flink 进行数据实时处理时,使用 Kafka 作为一款发布与订阅的消息系统成为了标配。Flink 提供了相对应的 Kafka Consumer,使用起来非常的方便,只需要设置一下 Kafka 的参数,然后添加 Kafka 的 Source 就万事大吉了。如果你真的觉得事情就是如此的 So Easy,感觉妈妈再也不用担心你的学习了,那就真的是 Too Young Too Simple Sometimes Naive 了。

在本场 Chat 中,会讲到如下内容:

  • Flink Kafka Consumer 介绍
  • Flink Kafka Consumer 使用与参数配置
  • Flink Kafka Consumer 源码解读
  • 偏移量提交模式分析

适合人群: 对 Flink 有兴趣的技术人员

Flink 提供了专门的 Kafka 连接器,向 Kafka topic 中读取或者写入数据。Flink Kafka Consumer 集成了 Flink 的 Checkpoint 机制,可提供 exactly-once 的处理语义。为此,Flink 并不完全依赖于跟踪 Kafka 消费组的偏移量,而是在内部跟踪和检查偏移量。

引言

当我们在使用 Spark Streaming、Flink 等计算框架进行数据实时处理时,使用 Kafka 作为一款发布与订阅的消息系统成为了标配。Spark Streaming 与 Flink 都提供了相对应的 Kafka Consumer,使用起来非常的方便,只需要设置一下 Kafka 的参数,然后添加 kafka 的 source 就万事大吉了。如果你真的觉得事情就是如此的 so easy,感觉妈妈再也不用担心你的学习了,那就真的是 too young too simple sometimes naive 了。本文以 Flink 的 Kafka Source 为讨论对象,首先从基本的使用入手,然后深入源码逐一剖析,一并为你拨开 Flink Kafka connector 的神秘面纱。值得注意的是,本文假定读者具备了 Kafka 的相关知识,关于 Kafka 的相关细节问题,不在本文的讨论范围之内。

Flink Kafka Consumer 介绍

Flink Kafka Connector 有很多个版本,可以根据你的 kafka 和 Flink 的版本选择相应的包(maven artifact id)和类名。本文所涉及的 Flink 版本为 1.10,Kafka 的版本为 2.3.4。Flink 所提供的 Maven 依赖于类名如下表所示:

Maven 依赖自从哪个版本 开始支持类名Kafka 版本注意flink-connector-kafka-0.8_2.111.0.0FlinkKafkaConsumer08 FlinkKafkaProducer080.8.x这个连接器在内部使用 Kafka 的 SimpleConsumer API。偏移量由 Flink 提交给 ZK。flink-connector-kafka-0.9_2.111.0.0FlinkKafkaConsumer09 FlinkKafkaProducer090.9.x这个连接器使用新的 Kafka Consumer APIflink-connector-kafka-0.10_2.111.2.0FlinkKafkaConsumer010 FlinkKafkaProducer0100.10.x这个连接器支持 带有时间戳的 Kafka 消息,用于生产和消费。flink-connector-kafka-0.11_2.111.4.0FlinkKafkaConsumer011 FlinkKafkaProducer011>= 0.11.xKafka 从 0.11.x 版本开始不支持 Scala 2.10。此连接器支持了 Kafka 事务性的消息传递来为生产者提供 Exactly once 语义。flink-connector-kafka_2.111.7.0FlinkKafkaConsumer FlinkKafkaProducer>= 1.0.0这个通用的 Kafka 连接器尽力与 Kafka client 的最新版本保持同步。该连接器使用的 Kafka client 版本可能会在 Flink 版本之间发生变化。从 Flink 1.9 版本开始,它使用 Kafka 2.2.0 client。当前 Kafka 客户端向后兼容 0.10.0 或更高版本的 Kafka broker。 但是对于 Kafka 0.11.x 和 0.10.x 版本,我们建议你分别使用专用的 flink-connector-kafka-0.112.11 和 flink-connector-kafka-0.102.11 连接器。 Demo 示例 添加 Maven 依赖
  org.apache.flink  flink-connector-kafka_2.11  1.10.0
简单代码案例
public class KafkaConnector {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();        // 开启 checkpoint,时间间隔为毫秒        senv.enableCheckpointing(5000L);        // 选择状态后端        senv.setStateBackend((StateBackend) new FsStateBackend("file:///E://checkpoint"));        //senv.setStateBackend((StateBackend) new FsStateBackend("hdfs://kms-1:8020/checkpoint"));        Properties props = new Properties();        // kafka broker 地址        props.put("bootstrap.servers", "kms-2:9092,kms-3:9092,kms-4:9092");        // 仅 kafka0.8 版本需要配置        props.put("zookeeper.connect", "kms-2:2181,kms-3:2181,kms-4:2181");        // 消费者组        props.put("group.id", "test");        // 自动偏移量提交        props.put("enable.auto.commit", true);        // 偏移量提交的时间间隔,毫秒        props.put("auto.commit.interval.ms", 5000);        // kafka 消息的 key 序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // kafka 消息的 value 序列化器        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // 指定 kafka 的消费者从哪里开始消费数据        // 共有三种方式,        // #earliest        // 当各分区下有已提交的 offset 时,从提交的 offset 开始消费;        // 无提交的 offset 时,从头开始消费        // #latest        // 当各分区下有已提交的 offset 时,从提交的 offset 开始消费;        // 无提交的 offset 时,消费新产生的该分区下的数据        // #none        // topic 各分区都存在已提交的 offset 时,        // 从 offset 后开始消费;        // 只要有一个分区不存在已提交的 offset,则抛出异常        props.put("auto.offset.reset", "latest");        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer(                "qfbap_ods.code_city",                new SimpleStringSchema(),                props);        //设置 checkpoint 后在提交 offset,即 oncheckpoint 模式        // 该值默认为 true,        consumer.setCommitOffsetsOnCheckpoints(true);        // 最早的数据开始消费        // 该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。        //consumer.setStartFromEarliest();        // 消费者组最近一次提交的偏移量,默认。        // 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置        //consumer.setStartFromGroupOffsets();        // 最新的数据开始消费        // 该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。        //consumer.setStartFromLatest();        // 指定具体的偏移量时间戳,毫秒        // 对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。        // 如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。        // 在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。        //consumer.setStartFromTimestamp(1585047859000L);        // 为每个分区指定偏移量        /*Map specificStartOffsets = new HashMap();        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 0), 23L);        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 1), 31L);        specificStartOffsets.put(new KafkaTopicPartition("qfbap_ods.code_city", 2), 43L);        consumer1.setStartFromSpecificOffsets(specificStartOffsets);*/        /**         *         * 请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,         * 这些起始位置配置方法不会影响消费的起始位置。         * 在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 checkpoint 中的 offset 确定         *         */        DataStreamSource source = senv.addSource(consumer);        // TODO        source.print();        senv.execute("test kafka connector");    }}
参数配置解读

在 Demo 示例中,给出了详细的配置信息,下面将对上面的参数配置进行逐一分析。

kakfa 的 properties 参数配置
  • bootstrap.servers:kafka broker 地址

  • zookeeper.connect:仅 kafka0.8 版本需要配置

  • group.id:消费者组

  • enable.auto.commit:

    自动偏移量提交,该值的配置不是最终的偏移量提交模式,需要考虑用户是否开启了 checkpoint,

    在下面的源码分析中会进行解读

  • auto.commit.interval.ms:偏移量提交的时间间隔,毫秒

  • key.deserializer:

    kafka 消息的 key 序列化器,如果不指定会使用 ByteArrayDeserializer 序列化器

  • value.deserializer:

kafka 消息的 value 序列化器,如果不指定会使用 ByteArrayDeserializer 序列化器

  • auto.offset.reset:

    指定 kafka 的消费者从哪里开始消费数据,共有三种方式,

  • 第一种:earliest当各分区下有已提交的 offset 时,从提交的 offset 开始消费; 无提交的 offset 时,从头开始消费

  • 第二种:latest当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据

  • 第三种:nonetopic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset,则抛出异常

    注意:上面的指定消费模式并不是最终的消费模式,取决于用户在 Flink 程序中配置的消费模式

Flink 程序用户配置的参数
  • consumer.setCommitOffsetsOnCheckpoints(true)

​ 解释:设置 checkpoint 后在提交 offset,即 oncheckpoint 模式,该值默认为 true,该参数会影响偏移量的提交方式,下面的源码中会进行分析

  • consumer.setStartFromEarliest()

    解释: 最早的数据开始消费 ,该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。该方法为继承父类 FlinkKafkaConsumerBase 的方法。

  • consumer.setStartFromGroupOffsets()

    解释:消费者组最近一次提交的偏移量,默认。 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置,该方法为继承父类 FlinkKafkaConsumerBase 的方法。

  • consumer.setStartFromLatest()

    解释:最新的数据开始消费,该模式下,Kafka 中的 committed offset 将被忽略,不会用作起始位置。该方法为继承父类 FlinkKafkaConsumerBase 的方法。

  • consumer.setStartFromTimestamp(1585047859000L)

    解释:指定具体的偏移量时间戳,毫秒。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。 如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。

  • consumer.setStartFromSpecificOffsets(specificStartOffsets)

解释:为每个分区指定偏移量,该方法为继承父类 FlinkKafkaConsumerBase 的方法。

请注意:当 Job 从故障中自动恢复或使用 savepoint 手动恢复时,这些起始位置配置方法不会影响消费的起始位置。在恢复时,每个 Kafka 分区的起始位置由存储在 savepoint 或 checkpoint 中的 offset 确定。

Flink Kafka Consumer 源码解读 继承关系

Flink Kafka Consumer 继承了 FlinkKafkaConsumerBase 抽象类,而 FlinkKafkaConsumerBase 抽象类又继承了 RichParallelSourceFunction,所以要实现一个自定义的 source 时,有两种实现方式:一种是通过实现 SourceFunction 接口来自定义并行度为 1 的数据源;另一种是通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来自定义具有并行度的数据源。FlinkKafkaConsumer 的继承关系如下图所示。

源码解读 FlinkKafkaConsumer 源码

先看一下 FlinkKafkaConsumer 的源码,为了方面阅读,本文将尽量给出本比较完整的源代码片段,具体如下所示:代码较长,在这里可以先有有一个总体的印象,下面会对重要的代码片段详细进行分析。

public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase {    // 配置轮询超时超时时间,使用 flink.poll-timeout 参数在 properties 进行配置    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";    // 如果没有可用数据,则等待轮询所需的时间(以毫秒为单位)。 如果为 0,则立即返回所有可用的记录    //默认轮询超时时间    public static final long DEFAULT_POLL_TIMEOUT = 100L;    // 用户提供的 kafka 参数配置    protected final Properties properties;    // 如果没有可用数据,则等待轮询所需的时间(以毫秒为单位)。 如果为 0,则立即返回所有可用的记录    protected final long pollTimeout;    /**     * 创建一个 kafka 的 consumer source     * @param topic                   消费的主题名称     * @param valueDeserializer       反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象     * @param props                   用户传入的 kafka 参数     */    public FlinkKafkaConsumer(String topic, DeserializationSchema valueDeserializer, Properties props) {        this(Collections.singletonList(topic), valueDeserializer, props);    }    /**     * 创建一个 kafka 的 consumer source     * 该构造方法允许传入 KafkaDeserializationSchema,该反序列化类支持访问 kafka 消费的额外信息     * 比如:key/value 对,offsets(偏移量),topic(主题名称)     * @param topic                消费的主题名称     * @param deserializer         反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象     * @param props                用户传入的 kafka 参数     */    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema deserializer, Properties props) {        this(Collections.singletonList(topic), deserializer, props);    }    /**     * 创建一个 kafka 的 consumer source     * 该构造方法允许传入多个 topic(主题),支持消费多个主题     * @param topics          消费的主题名称,多个主题为 List 集合     * @param deserializer    反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象     * @param props           用户传入的 kafka 参数     */    public FlinkKafkaConsumer(List topics, DeserializationSchema deserializer, Properties props) {        this(topics, new KafkaDeserializationSchemaWrapper(deserializer), props);    }    /**     * 创建一个 kafka 的 consumer source     * 该构造方法允许传入多个 topic(主题),支持消费多个主题,     * @param topics         消费的主题名称,多个主题为 List 集合     * @param deserializer   反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象,支持获取额外信息     * @param props          用户传入的 kafka 参数     */    public FlinkKafkaConsumer(List topics, KafkaDeserializationSchema deserializer, Properties props) {        this(topics, null, deserializer, props);    }    /**     * 基于正则表达式订阅多个 topic     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非负数     * 只要是能够正则匹配上,主题一旦被创建就会立即被订阅     * @param subscriptionPattern   主题的正则表达式     * @param valueDeserializer   反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象,支持获取额外信息     * @param props               用户传入的 kafka 参数     */    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) {        this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper(valueDeserializer), props);    }    /**     * 基于正则表达式订阅多个 topic     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非负数     * 只要是能够正则匹配上,主题一旦被创建就会立即被订阅     * @param subscriptionPattern   主题的正则表达式     * @param deserializer          该反序列化类支持访问 kafka 消费的额外信息,比如:key/value 对,offsets(偏移量),topic(主题名称)     * @param props                 用户传入的 kafka 参数     */    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) {        this(null, subscriptionPattern, deserializer, props);    }    private FlinkKafkaConsumer(        List topics,        Pattern subscriptionPattern,        KafkaDeserializationSchema deserializer,        Properties props) {        // 调用父类(FlinkKafkaConsumerBase)构造方法,PropertiesUtil.getLong 方法第一个参数为 Properties,第二个参数为 key,第三个参数为 value 默认值        super(            topics,            subscriptionPattern,            deserializer,            getLong(                checkNotNull(props, "props"),                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),            !getBoolean(props, KEY_DISABLE_METRICS, false));        this.properties = props;        setDeserializer(this.properties);        // 配置轮询超时时间,如果在 properties 中配置了 KEY_POLL_TIMEOUT 参数,则返回具体的配置值,否则返回默认值 DEFAULT_POLL_TIMEOUT        try {            if (properties.containsKey(KEY_POLL_TIMEOUT)) {                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));            } else {                this.pollTimeout = DEFAULT_POLL_TIMEOUT;            }        }        catch (Exception e) {            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);        }    }   // 父类(FlinkKafkaConsumerBase)方法重写,该方法的作用是返回一个 fetcher 实例,    // fetcher 的作用是连接 kafka 的 broker,拉去数据并进行反序列化,然后将数据输出为数据流(data stream)    @Override    protected AbstractFetcher createFetcher(        SourceContext sourceContext,        Map assignedPartitionsWithInitialOffsets,        SerializedValue watermarksPeriodic,        SerializedValue watermarksPunctuated,        StreamingRuntimeContext runtimeContext,        OffsetCommitMode offsetCommitMode,        MetricGroup consumerMetricGroup,        boolean useMetrics) throws Exception {        // 确保当偏移量的提交模式为 ON_CHECKPOINTS(条件 1:开启 checkpoint,条件 2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用自动提交        // 该方法为父类(FlinkKafkaConsumerBase)的静态方法        // 这将覆盖用户在 properties 中配置的任何设置        // 当 offset 的模式为 ON_CHECKPOINTS,或者为 DISABLED 时,会将用户配置的 properties 属性进行覆盖        // 具体是将 ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置为"false        // 可以理解为:如果开启了 checkpoint,并且设置了 consumer.setCommitOffsetsOnCheckpoints(true),默认为 true,        // 就会将 kafka properties 的 enable.auto.commit 强制置为 false        adjustAutoCommitConfig(properties, offsetCommitMode);        return new KafkaFetcher(            sourceContext,            assignedPartitionsWithInitialOffsets,            watermarksPeriodic,            watermarksPunctuated,            runtimeContext.getProcessingTimeService(),            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),            runtimeContext.getUserCodeClassLoader(),            runtimeContext.getTaskNameWithSubtasks(),            deserializer,            properties,            pollTimeout,            runtimeContext.getMetricGroup(),            consumerMetricGroup,            useMetrics);    }    //父类(FlinkKafkaConsumerBase)方法重写    // 返回一个分区发现类,分区发现可以使用 kafka broker 的高级 consumer API 发现 topic 和 partition 的元数据    @Override    protected AbstractPartitionDiscoverer createPartitionDiscoverer(        KafkaTopicsDescriptor topicsDescriptor,        int indexOfThisSubtask,        int numParallelSubtasks) {        return new KafkaPartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, properties);    }    /**     *判断是否在 kafka 的参数开启了自动提交,即 enable.auto.commit=true,     * 并且 auto.commit.interval.ms>0,     * 注意:如果没有没有设置 enable.auto.commit 的参数,则默认为 true     *       如果没有设置 auto.commit.interval.ms 的参数,则默认为 5000 毫秒     * @return     */    @Override    protected boolean getIsAutoCommitEnabled() {        //        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;    }    /**     * 确保配置了 kafka 消息的 key 与 value 的反序列化方式,     * 如果没有配置,则使用 ByteArrayDeserializer 序列化器,     * 该类的 deserialize 方法是直接将数据进行 return,未做任何处理     * @param props     */    private static void setDeserializer(Properties props) {        final String deSerName = ByteArrayDeserializer.class.getName();        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);        }        if (valDeSer != null && !valDeSer.equals(deSerName)) {            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);        }        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);    }}
分析

上面的代码已经给出了非常详细的注释,下面将对比较关键的部分进行分析。

  • 构造方法分析

FlinkKakfaConsumer 提供了 7 种构造方法,如上图所示。不同的构造方法分别具有不同的功能,通过传递的参数也可以大致分析出每种构造方法特有的功能,为了方便理解,本文将对其进行分组讨论,具体如下:

单 topic

/**     * 创建一个 kafka 的 consumer source     * @param topic                   消费的主题名称     * @param valueDeserializer       反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象     * @param props                   用户传入的 kafka 参数     */    public FlinkKafkaConsumer(String topic, DeserializationSchema valueDeserializer, Properties props) {        this(Collections.singletonList(topic), valueDeserializer, props);    }/**     * 创建一个 kafka 的 consumer source     * 该构造方法允许传入 KafkaDeserializationSchema,该反序列化类支持访问 kafka 消费的额外信息     * 比如:key/value 对,offsets(偏移量),topic(主题名称)     * @param topic                消费的主题名称     * @param deserializer         反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象     * @param props                用户传入的 kafka 参数     */    public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema deserializer, Properties props) {        this(Collections.singletonList(topic), deserializer, props);    }

上面两种构造方法只支持单个 topic,区别在于反序列化的方式不一样。第一种使用的是 DeserializationSchema,第二种使用的是 KafkaDeserializationSchema,其中使用带有 KafkaDeserializationSchema 参数的构造方法可以获取更多的附属信息,比如在某些场景下需要获取 key/value 对,offsets(偏移量),topic(主题名称)等信息,可以选择使用此方式的构造方法。以上两种方法都调用了私有的构造方法,私有构造方法的分析见下面。

多 topic

/**     * 创建一个 kafka 的 consumer source     * 该构造方法允许传入多个 topic(主题),支持消费多个主题     * @param topics          消费的主题名称,多个主题为 List 集合     * @param deserializer    反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象     * @param props           用户传入的 kafka 参数     */    public FlinkKafkaConsumer(List topics, DeserializationSchema deserializer, Properties props) {        this(topics, new KafkaDeserializationSchemaWrapper(deserializer), props);    }    /**     * 创建一个 kafka 的 consumer source     * 该构造方法允许传入多个 topic(主题),支持消费多个主题,     * @param topics         消费的主题名称,多个主题为 List 集合     * @param deserializer   反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象,支持获取额外信息     * @param props          用户传入的 kafka 参数     */    public FlinkKafkaConsumer(List topics, KafkaDeserializationSchema deserializer, Properties props) {        this(topics, null, deserializer, props);    }

上面的两种多 topic 的构造方法,可以使用一个 list 集合接收多个 topic 进行消费,区别在于反序列化的方式不一样。第一种使用的是 DeserializationSchema,第二种使用的是 KafkaDeserializationSchema,其中使用带有 KafkaDeserializationSchema 参数的构造方法可以获取更多的附属信息,比如在某些场景下需要获取 key/value 对,offsets(偏移量),topic(主题名称)等信息,可以选择使用此方式的构造方法。以上两种方法都调用了私有的构造方法,私有构造方法的分析见下面。

正则匹配 topic

/**     * 基于正则表达式订阅多个 topic     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非负数     * 只要是能够正则匹配上,主题一旦被创建就会立即被订阅     * @param subscriptionPattern   主题的正则表达式     * @param valueDeserializer   反序列化类型,用于将 kafka 的字节消息转换为 Flink 的对象,支持获取额外信息     * @param props               用户传入的 kafka 参数     */    public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) {        this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper(valueDeserializer), props);    }    /**     * 基于正则表达式订阅多个 topic     * 如果开启了分区发现,即 FlinkKafkaConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值为非负数     * 只要是能够正则匹配上,主题一旦被创建就会立即被订阅     * @param subscriptionPattern   主题的正则表达式     * @param deserializer          该反序列化类支持访问 kafka 消费的额外信息,比如:key/value 对,offsets(偏移量),topic(主题名称)     * @param props                 用户传入的 kafka 参数     */    public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) {        this(null, subscriptionPattern, deserializer, props);    }

实际的生产环境中可能有这样一些需求,比如有一个 flink 作业需要将多种不同的数据聚合到一起,而这些数据对应着不同的 kafka topic,随着业务增长,新增一类数据,同时新增了一个 kafka topic,如何在不重启作业的情况下作业自动感知新的 topic。首先需要在构建 FlinkKafkaConsumer 时的 properties 中设置 flink.partition-discovery.interval-millis 参数为非负值,表示开启动态发现的开关,以及设置的时间间隔。此时 FLinkKafkaConsumer 内部会启动一个单独的线程定期去 kafka 获取最新的 meta 信息。具体的调用执行信息,参见下面的私有构造方法

私有构造方法

    private FlinkKafkaConsumer(        List topics,        Pattern subscriptionPattern,        KafkaDeserializationSchema deserializer,        Properties props) {        // 调用父类(FlinkKafkaConsumerBase)构造方法,PropertiesUtil.getLong 方法第一个参数为 Properties,第二个参数为 key,第三个参数为 value 默认值。KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS 值是开启分区发现的配置参数,在 properties 里面配置 flink.partition-discovery.interval-millis=5000(大于 0 的数),如果没有配置则使用 PARTITION_DISCOVERY_DISABLED=Long.MIN_VALUE(表示禁用分区发现)        super(            topics,            subscriptionPattern,            deserializer,            getLong(                checkNotNull(props, "props"),                KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),            !getBoolean(props, KEY_DISABLE_METRICS, false));        this.properties = props;        setDeserializer(this.properties);        // 配置轮询超时时间,如果在 properties 中配置了 KEY_POLL_TIMEOUT 参数,则返回具体的配置值,否则返回默认值 DEFAULT_POLL_TIMEOUT        try {            if (properties.containsKey(KEY_POLL_TIMEOUT)) {                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));            } else {                this.pollTimeout = DEFAULT_POLL_TIMEOUT;            }        }        catch (Exception e) {            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);        }    }
  • 其他方法分析

KafkaFetcher 对象创建

   // 父类(FlinkKafkaConsumerBase)方法重写,该方法的作用是返回一个 fetcher 实例,    // fetcher 的作用是连接 kafka 的 broker,拉去数据并进行反序列化,然后将数据输出为数据流(data stream)    @Override    protected AbstractFetcher createFetcher(        SourceContext sourceContext,        Map assignedPartitionsWithInitialOffsets,        SerializedValue watermarksPeriodic,        SerializedValue watermarksPunctuated,        StreamingRuntimeContext runtimeContext,        OffsetCommitMode offsetCommitMode,        MetricGroup consumerMetricGroup,        boolean useMetrics) throws Exception {        // 确保当偏移量的提交模式为 ON_CHECKPOINTS(条件 1:开启 checkpoint,条件 2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用自动提交        // 该方法为父类(FlinkKafkaConsumerBase)的静态方法        // 这将覆盖用户在 properties 中配置的任何设置        // 当 offset 的模式为 ON_CHECKPOINTS,或者为 DISABLED 时,会将用户配置的 properties 属性进行覆盖        // 具体是将 ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置为"false        // 可以理解为:如果开启了 checkpoint,并且设置了 consumer.setCommitOffsetsOnCheckpoints(true),默认为 true,        // 就会将 kafka properties 的 enable.auto.commit 强制置为 false        adjustAutoCommitConfig(properties, offsetCommitMode);        return new KafkaFetcher(            sourceContext,            assignedPartitionsWithInitialOffsets,            watermarksPeriodic,            watermarksPunctuated,            runtimeContext.getProcessingTimeService(),            runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),            runtimeContext.getUserCodeClassLoader(),            runtimeContext.getTaskNameWithSubtasks(),            deserializer,            properties,            pollTimeout,            runtimeContext.getMetricGroup(),            consumerMetricGroup,            useMetrics);    }

该方法的作用是返回一个 fetcher 实例,fetcher 的作用是连接 kafka 的 broker,拉去数据并进行反序列化,然后将数据输出为数据流(data stream),在这里对自动偏移量提交模式进行了强制调整,即确保当偏移量的提交模式为 ON_CHECKPOINTS(条件 1:开启 checkpoint,条件 2:consumer.setCommitOffsetsOnCheckpoints(true))时,禁用自动提交。这将覆盖用户在 properties 中配置的任何设置,简单可以理解为:如果开启了 checkpoint,并且设置了 consumer.setCommitOffsetsOnCheckpoints(true),默认为 true,就会将 kafka properties 的 enable.auto.commit 强制置为 false。关于 offset 的提交模式,见下文的偏移量提交模式分析。

判断是否设置了自动提交

   @Override    protected boolean getIsAutoCommitEnabled() {        //        return getBoolean(properties, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true) &&            PropertiesUtil.getLong(properties, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000) > 0;    }

判断是否在 kafka 的参数开启了自动提交,即 enable.auto.commit=true,并且 auto.commit.interval.ms>0, 注意:如果没有没有设置 enable.auto.commit 的参数,则默认为 true, 如果没有设置 auto.commit.interval.ms 的参数,则默认为 5000 毫秒。该方法会在 FlinkKafkaConsumerBase 的 open 方法进行初始化的时候调用。

反序列化

private static void setDeserializer(Properties props) {         // 默认的反序列化方式         final String deSerName = ByteArrayDeserializer.class.getName();         //获取用户配置的 properties 关于 key 与 value 的反序列化模式        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);         // 如果配置了,则使用用户配置的值        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);        }        if (valDeSer != null && !valDeSer.equals(deSerName)) {            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);        }        // 没有配置,则使用 ByteArrayDeserializer 进行反序列化        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);    }

确保配置了 kafka 消息的 key 与 value 的反序列化方式,如果没有配置,则使用 ByteArrayDeserializer 序列化器,ByteArrayDeserializer 类的 deserialize 方法是直接将数据进行 return,未做任何处理。

FlinkKafkaConsumerBase 源码
@Internalpublic abstract class FlinkKafkaConsumerBase extends RichParallelSourceFunction implements        CheckpointListener,        ResultTypeQueryable,        CheckpointedFunction {    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;    public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";    public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";    private boolean enableCommitOnCheckpoints = true;    /**     * 偏移量的提交模式,仅能通过在 FlinkKafkaConsumerBase#open(Configuration)进行配置     * 该值取决于用户是否开启了 checkpoint     */    private OffsetCommitMode offsetCommitMode;    /**     * 配置从哪个位置开始消费 kafka 的消息,     * 默认为 StartupMode#GROUP_OFFSETS,即从当前提交的偏移量开始消费     */    private StartupMode startupMode = StartupMode.GROUP_OFFSETS;    private Map specificStartupOffsets;    private Long startupOffsetsTimestamp;    /**     * 确保当偏移量的提交模式为 ON_CHECKPOINTS 时,禁用自动提交,     * 这将覆盖用户在 properties 中配置的任何设置。     * 当 offset 的模式为 ON_CHECKPOINTS,或者为 DISABLED 时,会将用户配置的 properties 属性进行覆盖     * 具体是将 ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit"的值重置为"false,即禁用自动提交     * @param properties       kafka 配置的 properties,会通过该方法进行覆盖     * @param offsetCommitMode    offset 提交模式     */    static void adjustAutoCommitConfig(Properties properties, OffsetCommitMode offsetCommitMode) {        if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS || offsetCommitMode == OffsetCommitMode.DISABLED) {            properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");        }    }    /**     * 决定是否在开启 checkpoint 时,在 checkpoin 之后提交偏移量,     * 只有用户配置了启用 checkpoint,该参数才会其作用     * 如果没有开启 checkpoint,则使用 kafka 的配置参数:enable.auto.commit     * @param commitOnCheckpoints     * @return     */    public FlinkKafkaConsumerBase setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {        this.enableCommitOnCheckpoints = commitOnCheckpoints;        return this;    }    /**     * 从最早的偏移量开始消费,     *该模式下,Kafka 中的已经提交的偏移量将被忽略,不会用作起始位置。     *可以通过 consumer1.setStartFromEarliest()进行设置     */    public FlinkKafkaConsumerBase setStartFromEarliest() {        this.startupMode = StartupMode.EARLIEST;        this.startupOffsetsTimestamp = null;        this.specificStartupOffsets = null;        return this;    }    /**     * 从最新的数据开始消费,     *  该模式下,Kafka 中的 已提交的偏移量将被忽略,不会用作起始位置。     *     */    public FlinkKafkaConsumerBase setStartFromLatest() {        this.startupMode = StartupMode.LATEST;        this.startupOffsetsTimestamp = null;        this.specificStartupOffsets = null;        return this;    }    /**     *指定具体的偏移量时间戳,毫秒     *对于每个分区,其时间戳大于或等于指定时间戳的记录将用作起始位置。     * 如果一个分区的最新记录早于指定的时间戳,则只从最新记录读取该分区数据。     * 在这种模式下,Kafka 中的已提交 offset 将被忽略,不会用作起始位置。     */    protected FlinkKafkaConsumerBase setStartFromTimestamp(long startupOffsetsTimestamp) {        checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");        long currentTimestamp = System.currentTimeMillis();        checkArgument(startupOffsetsTimestamp             
关注
打赏
1560489824
查看更多评论
0.0588s