您当前的位置: 首页 >  kafka

杨林伟

暂无认证

  • 3浏览

    0关注

    3337博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

flink kafka connector源码解读(超详细)

杨林伟 发布时间:2022-04-12 23:56:32 ,浏览量:3

文章目录
  • 01 引言
  • 02 Kafka-Connector 源码分析
    • 2.1 项目结构
    • 2.2 工厂(源码解读入口)
      • 2.2.1 KafkaDynamicTableFactory
      • 2.2.2 UpsertKafkaDynamicTableFactory
      • 2.2.3 KafkaTableSourceSinkFactory
    • 2.3 序列化和反序列化工厂
      • 2.3.1 DeserializationFormatFactory
        • 2.3.1.2.1 JsonRowDataDeserializationSchema
      • 2.3.2 SerializationFormatFactory
        • 2.3.2.2 JsonRowDataSerializationSchema
    • 2.4 DynamicTable动态表
      • 2.4.1 KafkaDynamicSource
      • 2.4.2 KafkaDynamicSink
  • 03 总结

01 引言

在上一节《Flink自定义Connector》,我们知道了Flink自定义Connector的基本流程,其流程图如下: 在这里插入图片描述 进入代码层面,开发者自定义connector的流程如下:

  1. 首先定义Flink SQL的DDL;
  2. 创建解析和验证选项的工厂(DynamicTableSourceFactory、DeserializationFormatFactory),注意两个工厂都已添加到META-INF/services目录中;
  3. 实现ScanTableSource;
  4. 具体业务细节在getScanRuntimeProvider实现;

为了掌握Flink自定义Connector,本文直接从源码出发,研究Flinkkafka connector是如何实现的?

附:Flink源码下载地址

02 Kafka-Connector 源码分析 2.1 项目结构

导入Flink的源码后,可以看到Kafka Connector的源码结构如下: 在这里插入图片描述 先来看看里面的pom文件,其余细节不用看,主要看依赖的内容,下面做了一些整理与添加了相关的描述:




	4.0.0

	
		flink-connectors
		org.apache.flink
		1.13.6
		..
	

	flink-connector-kafka_${scala.binary.version}
	Flink : Connectors : Kafka

	jar

	
		2.4.1
	

	

		

		
			org.apache.flink
			flink-streaming-java_${scala.binary.version}
			${project.version}
			provided
		
		
	    
	    	
	    

		

		
			org.apache.kafka
			kafka-clients
			${kafka.version}
		
	

	
		
	



2.2 工厂(源码解读入口)

src/main/resources/META-INF/services目录,我们可以看到了有两个文件: 在这里插入图片描述 结合引言里面的《流程图》,可以知道定义完DDL后,工厂就是整个流程的入口了,我们看看里面的两个文件内容。

org.apache.flink.table.factories.Factory(Table SQL模式):

org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
org.apache.flink.streaming.connectors.kafka.table.UpsertKafkaDynamicTableFactory

org.apache.flink.table.factories.TableFactory(Table API模式):

org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory

从配置文件可以得知,我们需要继续研读以下工厂类:

  • KafkaDynamicTableFactory
  • UpsertKafkaDynamicTableFactory
  • KafkaTableSourceSinkFactory
2.2.1 KafkaDynamicTableFactory

KafkaDynamicTableFactory 的详细代码如下,里面已经写好注释了,可以方便大家的理解:

/**
 * Kafka KafkaDynamicTable工厂
 * 

* 描述:其作用是为 KafkaDynamicSource和KafkaDynamicSink类配置实例 * * @author : YangLinWei * @createTime: 2022/4/12 11:06 上午 * @version: 1.0.0 */ @Internal public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { public static final String IDENTIFIER = "kafka"; /** * 返回一个唯一的标识符(对应的是FlinkSQL里面的`connector = '...'`) */ @Override public String factoryIdentifier() { return IDENTIFIER; } /** * 设置该工厂实现需要(必填)的Options集合 */ @Override public Set requiredOptions() { final Set options = new HashSet(); options.add(PROPS_BOOTSTRAP_SERVERS); return options; } /** * 设置工厂可能需要(非必填)的Options集合 */ @Override public Set optionalOptions() { final Set options = new HashSet(); options.add(FactoryUtil.FORMAT); options.add(KEY_FORMAT); options.add(KEY_FIELDS); options.add(KEY_FIELDS_PREFIX); options.add(VALUE_FORMAT); options.add(VALUE_FIELDS_INCLUDE); options.add(TOPIC); options.add(TOPIC_PATTERN); options.add(PROPS_GROUP_ID); options.add(SCAN_STARTUP_MODE); options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); options.add(SCAN_TOPIC_PARTITION_DISCOVERY); options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); options.add(SINK_PARTITIONER); options.add(SINK_SEMANTIC); options.add(SINK_PARALLELISM); return options; } /** * 创建DynamicTableSource *

* 描述:在这个方法中执行验证和进一步嵌套工厂 */ @Override public DynamicTableSource createDynamicTableSource(Context context) { //1. 在这里使用FactoryUtil实现的自定义验证逻辑… final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); //2. 获取所有的配置 final ReadableConfig tableOptions = helper.getOptions(); //3. 获取一个合适的key解码格式 final Optional keyDecodingFormat = getKeyDecodingFormat(helper); //4. 获取一个合适的value解码格式 final DecodingFormat valueDecodingFormat = getValueDecodingFormat(helper); //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常) helper.validateExcept(PROPERTIES_PREFIX); validateTableSourceOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); //6. 获取参数并设置参数到Properties final StartupOptions startupOptions = getStartupOptions(tableOptions); final Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); properties.setProperty( FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, String.valueOf( tableOptions .getOptional(SCAN_TOPIC_PARTITION_DISCOVERY) .map(Duration::toMillis) .orElse(FlinkKafkaConsumerBase.PARTITION_DISCOVERY_DISABLED))); final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); // 7. 最后创建DynamicTableSource实例并注入参数 return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), valueDecodingFormat, keyProjection, valueProjection, keyPrefix, KafkaOptions.getSourceTopics(tableOptions), KafkaOptions.getSourceTopicPattern(tableOptions), properties, startupOptions.startupMode, startupOptions.specificOffsets, startupOptions.startupTimestampMillis); } /** * 创建DynamicTableSink(原理和创建DynamicTableSource几乎一致) *

* 描述:在这个方法中执行验证和进一步嵌套工厂 */ @Override public DynamicTableSink createDynamicTableSink(Context context) { //1. 在这里使用FactoryUtil实现的自定义验证逻辑… final TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper( this, autoCompleteSchemaRegistrySubject(context)); //2. 获取所有的配置 final ReadableConfig tableOptions = helper.getOptions(); //3. 获取一个合适的key解码格式 final Optional keyEncodingFormat = getKeyEncodingFormat(helper); //4. 获取一个合适的value解码格式 final EncodingFormat valueEncodingFormat = getValueEncodingFormat(helper); //5. 校验参数(参数格式、主题、starupmode、主键等,校验失败直接抛异常) helper.validateExcept(PROPERTIES_PREFIX); validateTableSinkOptions(tableOptions); validatePKConstraints( context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat); //6. 获取参数 final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); final int[] keyProjection = createKeyFormatProjection(tableOptions, physicalDataType); final int[] valueProjection = createValueFormatProjection(tableOptions, physicalDataType); final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); final Integer parallelism = tableOptions.getOptional(SINK_PARALLELISM).orElse(null); // 7. 最后创建DynamicTableSink实例并注入参数 return createKafkaTableSink( physicalDataType, keyEncodingFormat.orElse(null), valueEncodingFormat, keyProjection, valueProjection, keyPrefix, tableOptions.get(TOPIC).get(0), getKafkaProperties(context.getCatalogTable().getOptions()), getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), getSinkSemantic(tableOptions), parallelism); } // ---以下方法均是上面代码的校验、创建实例细节,无需深入研究,了解即可----------------------------------- private static Optional getKeyDecodingFormat( TableFactoryHelper helper) { final Optional keyDecodingFormat = helper.discoverOptionalDecodingFormat( DeserializationFormatFactory.class, KEY_FORMAT); keyDecodingFormat.ifPresent( format -> { if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { throw new ValidationException( String.format( "A key format should only deal with INSERT-only records. " + "But %s has a changelog mode of %s.", helper.getOptions().get(KEY_FORMAT), format.getChangelogMode())); } }); return keyDecodingFormat; } private static Optional getKeyEncodingFormat( TableFactoryHelper helper) { final Optional keyEncodingFormat = helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, KEY_FORMAT); keyEncodingFormat.ifPresent( format -> { if (!format.getChangelogMode().containsOnly(RowKind.INSERT)) { throw new ValidationException( String.format( "A key format should only deal with INSERT-only records. " + "But %s has a changelog mode of %s.", helper.getOptions().get(KEY_FORMAT), format.getChangelogMode())); } }); return keyEncodingFormat; } private static DecodingFormat getValueDecodingFormat( TableFactoryHelper helper) { return helper.discoverOptionalDecodingFormat( DeserializationFormatFactory.class, FactoryUtil.FORMAT) .orElseGet( () -> helper.discoverDecodingFormat( DeserializationFormatFactory.class, VALUE_FORMAT)); } private static EncodingFormat getValueEncodingFormat( TableFactoryHelper helper) { return helper.discoverOptionalEncodingFormat( SerializationFormatFactory.class, FactoryUtil.FORMAT) .orElseGet( () -> helper.discoverEncodingFormat( SerializationFormatFactory.class, VALUE_FORMAT)); } private static void validatePKConstraints( ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { if (catalogTable.getSchema().getPrimaryKey().isPresent() && format.getChangelogMode().containsOnly(RowKind.INSERT)) { Configuration options = Configuration.fromMap(catalogTable.getOptions()); String formatName = options.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT)); throw new ValidationException( String.format( "The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", tableName.asSummaryString(), formatName)); } } // -------------------------------------------------------------------------------------------- protected KafkaDynamicSource createKafkaTableSource( DataType physicalDataType, @Nullable DecodingFormat keyDecodingFormat, DecodingFormat valueDecodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, @Nullable List topics, @Nullable Pattern topicPattern, Properties properties, StartupMode startupMode, Map specificStartupOffsets, long startupTimestampMillis) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, valueDecodingFormat, keyProjection, valueProjection, keyPrefix, topics, topicPattern, properties, startupMode, specificStartupOffsets, startupTimestampMillis, false); } protected KafkaDynamicSink createKafkaTableSink( DataType physicalDataType, @Nullable EncodingFormat keyEncodingFormat, EncodingFormat valueEncodingFormat, int[] keyProjection, int[] valueProjection, @Nullable String keyPrefix, String topic, Properties properties, FlinkKafkaPartitioner partitioner, KafkaSinkSemantic semantic, Integer parallelism) { return new KafkaDynamicSink( physicalDataType, physicalDataType, keyEncodingFormat, valueEncodingFormat, keyProjection, valueProjection, keyPrefix, topic, properties, partitioner, semantic, false, SinkBufferFlushMode.DISABLED, parallelism); } }

从源码我们得知,KafkaDynamicTableFactory的主要作用有:

  • 定义keyvalue的序列化和反序列化格式工厂
  • 校验连接参数
  • 注入连接参数到DynamicTableSourceDynamicTableSink实例

后面会继续介绍如何具体实现SerializationFormatFactoryDeserializationFormatFactory,以及DynamicTableSourceDynamicTableSink实例的实现。

2.2.2 UpsertKafkaDynamicTableFactory

UpsertKafkaDynamicTableFactory源码里面未找到这个工厂,其代码流程猜测与KafkaDynamicTableFactory一致。

2.2.3 KafkaTableSourceSinkFactory

Table API模式与Table SQL模式(声明式)原理几乎一致,都是用作查询的输入和输出,只是写法表现上有些区别,因此,这里我们只需要研读Table SQL模式下的工厂代码就好了,这里不再讲解KafkaTableSourceSinkFactory,不影响整体理解。

2.3 序列化和反序列化工厂 2.3.1 DeserializationFormatFactory

从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂定义了SerializationFormatFactoryDeserializationFormatFactory,以及DynamicTableSourceDynamicTableSink实例的实现。下面按顺序的讲解。首先来看看DeserializationFormatFactory。

Ctrl+T,可以查看 DeserializationFormatFactory 的反序列化工厂有哪些实现。 在这里插入图片描述 这里拿典型的JsonFormatFactory来举例,完整代码如下,里面已经写好注释,方便大家的理解:

/**
 * JsonFormatFactory支持将json字符串转换为RowData行数据
 *
 * @author : YangLinWei
 * @createTime: 2022/4/12 11:57 上午
 * @version: 1.0.0
 */
public class JsonFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory {

    /**
     * 唯一标识(对应Flink SQL的'format' = 'json)
     */
    public static final String IDENTIFIER = "json";

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    /**
     * 创建解码格式,主要为了创建运行时解码器,即:DeserializationSchema
     */
    @Override
    public DecodingFormat createDecodingFormat(
            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
        FactoryUtil.validateFactoryOptions(this, formatOptions);
        validateDecodingFormatOptions(formatOptions);

        final boolean failOnMissingField = formatOptions.get(FAIL_ON_MISSING_FIELD);
        final boolean ignoreParseErrors = formatOptions.get(IGNORE_PARSE_ERRORS);
        TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);

        return new DecodingFormat() {
            @Override
            public DeserializationSchema createRuntimeDecoder(
                    DynamicTableSource.Context context, DataType producedDataType) {
                final RowType rowType = (RowType) producedDataType.getLogicalType();
                final TypeInformation rowDataTypeInfo =
                        context.createTypeInformation(producedDataType);
                return new JsonRowDataDeserializationSchema(
                        rowType,
                        rowDataTypeInfo,
                        failOnMissingField,
                        ignoreParseErrors,
                        timestampOption);
            }

            @Override
            public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
            }
        };
    }

    /**
     * 创建编码格式,主要为了创建运行时编码器,即:SerializationSchema
     */
    @Override
    public EncodingFormat createEncodingFormat(
            DynamicTableFactory.Context context, ReadableConfig formatOptions) {
        FactoryUtil.validateFactoryOptions(this, formatOptions);
        validateEncodingFormatOptions(formatOptions);

        TimestampFormat timestampOption = JsonOptions.getTimestampFormat(formatOptions);
        JsonOptions.MapNullKeyMode mapNullKeyMode = JsonOptions.getMapNullKeyMode(formatOptions);
        String mapNullKeyLiteral = formatOptions.get(MAP_NULL_KEY_LITERAL);

        final boolean encodeDecimalAsPlainNumber =
                formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);

        return new EncodingFormat() {
            @Override
            public SerializationSchema createRuntimeEncoder(
                    DynamicTableSink.Context context, DataType consumedDataType) {
                final RowType rowType = (RowType) consumedDataType.getLogicalType();
                return new JsonRowDataSerializationSchema(
                        rowType,
                        timestampOption,
                        mapNullKeyMode,
                        mapNullKeyLiteral,
                        encodeDecimalAsPlainNumber);
            }

            @Override
            public ChangelogMode getChangelogMode() {
                return ChangelogMode.insertOnly();
            }
        };
    }

    @Override
    public Set requiredOptions() {
        return Collections.emptySet();
    }

    @Override
    public Set optionalOptions() {
        Set options = new HashSet();
        options.add(FAIL_ON_MISSING_FIELD);
        options.add(IGNORE_PARSE_ERRORS);
        options.add(TIMESTAMP_FORMAT);
        options.add(MAP_NULL_KEY_MODE);
        options.add(MAP_NULL_KEY_LITERAL);
        options.add(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
        return options;
    }
}

我们在前面可以知道,工厂的思想就是定义规则,并把这些规则注入到我们新建的实例并返回给上一级。

从上面的代码注释,可以看到JsonFormatFactory工厂实现了DeserializationFormatFactorySerializationFormatFactory 两个工厂,里面均实现了这两个类的方法,分别为“createDecodingFormat”和“createEncodingFormat”,而这两个方法的核心主要是为了生成“DeserializationSchema”和“SerializationSchema”。

简单的说,就是JsonFormatFactory这个工厂具体的序列化和反序列化实现在“DeserializationSchema”和“SerializationSchema”这两个接口的实现里面,分别对应“JsonRowDataDeserializationSchema”和“JsonRowDataSerializationSchema”。

下面先看看“JsonRowDataDeserializationSchema”这个类。

2.3.1.2.1 JsonRowDataDeserializationSchema

JsonRowDataDeserializationSchema的主要作用是将JSON反序列化为Flink Table/SQL的内部数据结构,其代码与注释如下:

/**
 * 反序列化JSON为Flink Table/SQL的内部数据结构
 * 

* 将byte[]消息反序列化为JSON对象,并读取指定的字段 */ @Internal public class JsonRowDataDeserializationSchema implements DeserializationSchema { private static final long serialVersionUID = 1L; /** 如果字段缺失,指示是否失败的标志. */ private final boolean failOnMissingField; /** 指示是否忽略无效字段/行的标志(默认:抛出异常)。 */ private final boolean ignoreParseErrors; /** 类型生成的信息. */ private final TypeInformation resultTypeInfo; /** * 运行时转换器,将JsonNode转换为Flink SQL内部数据结构的对象。 */ private final JsonToRowDataConverters.JsonToRowDataConverter runtimeConverter; /** 解析JSON的对象映射器。 */ private final ObjectMapper objectMapper = new ObjectMapper(); /** 时间戳格式规范,用于解析时间戳。 */ private final TimestampFormat timestampFormat; /** 构造函数 */ public JsonRowDataDeserializationSchema( RowType rowType, TypeInformation resultTypeInfo, boolean failOnMissingField, boolean ignoreParseErrors, TimestampFormat timestampFormat) { if (ignoreParseErrors && failOnMissingField) { throw new IllegalArgumentException( "JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled."); } this.resultTypeInfo = checkNotNull(resultTypeInfo); this.failOnMissingField = failOnMissingField; this.ignoreParseErrors = ignoreParseErrors; this.runtimeConverter = new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat) .createConverter(checkNotNull(rowType)); this.timestampFormat = timestampFormat; boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType); if (hasDecimalType) { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); } objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), true); } /** * 实际的反序列化操作 */ @Override public RowData deserialize(@Nullable byte[] message) throws IOException { if (message == null) { return null; } try { return convertToRowData(deserializeToJsonNode(message)); } catch (Throwable t) { if (ignoreParseErrors) { return null; } throw new IOException( format("Failed to deserialize JSON '%s'.", new String(message)), t); } } /** 反序列化内容为JsonNode */ public JsonNode deserializeToJsonNode(byte[] message) throws IOException { return objectMapper.readTree(message); } /** 转换JsonNode为Flink Table/SQL的内部数据结构 */ public RowData convertToRowData(JsonNode message) { return (RowData) runtimeConverter.convert(message); } @Override public boolean isEndOfStream(RowData nextElement) { return false; } @Override public TypeInformation getProducedType() { return resultTypeInfo; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } JsonRowDataDeserializationSchema that = (JsonRowDataDeserializationSchema) o; return failOnMissingField == that.failOnMissingField && ignoreParseErrors == that.ignoreParseErrors && resultTypeInfo.equals(that.resultTypeInfo) && timestampFormat.equals(that.timestampFormat); } @Override public int hashCode() { return Objects.hash(failOnMissingField, ignoreParseErrors, resultTypeInfo, timestampFormat); } }

里面使用过了工具类JsonToRowDataConverter ,它的主要作用是将JsonNode转换为Flink Table/SQL的内部数据结构,这里由于篇幅原因,代码细节不再详述,不过可以看下代码片段: 在这里插入图片描述

2.3.2 SerializationFormatFactory

与 DeserializationFormatFactory原理差不多,此处不再详述。

2.3.2.2 JsonRowDataSerializationSchema

JsonRowDataSerializationSchema与JsonRowDataDeserializationSchema的原理差不多,主要作用是将Flink Table/SQL的内部数据结构序列化为JSON,这里不再详述。

2.4 DynamicTable动态表

从 《2.2.1 KafkaDynamicTableFactory》我们可以得知,里面的工厂除了定义了SerializationFormatFactoryDeserializationFormatFactory,还定义DynamicTableSourceDynamicTableSink实例的实现。

接下来讲解DynamicTableSource,其对应的实现类是KafkaDynamicSource,我们看看KafkaDynamicSource里面的代码。

2.4.1 KafkaDynamicSource

KafkaDynamicSource主要是对Source的一些处理,比如这里定义了Kafka的消费者,具体的代码及注释如下:

/** KafkaDynamicSource. */
@Internal
public class KafkaDynamicSource
        implements ScanTableSource, SupportsReadingMetadata, SupportsWatermarkPushDown {

    // --------------------------------------------------------------------------------------------
    // 可变的属性
    // --------------------------------------------------------------------------------------------

    /** 描述源的最终输出的数据类型。 */
    protected DataType producedDataType;

    /** 附加在物理源行末尾的元数据。 */
    protected List metadataKeys;

    /** 用于生成每个分区水印的水印策略。 */
    protected @Nullable
    WatermarkStrategy watermarkStrategy;

    // --------------------------------------------------------------------------------------------
    // 格式属性
    // --------------------------------------------------------------------------------------------

    private static final String VALUE_METADATA_PREFIX = "value.";

    /** 配置格式的数据类型。 */
    protected final DataType physicalDataType;

    /** 从Kafka解码keys的可选格式。 */
    protected final @Nullable
    DecodingFormat keyDecodingFormat;

    /** 从Kafka解码值的格式。 */
    protected final DecodingFormat valueDecodingFormat;

    /** 确定key字段和生成行的目标位置的索引。 */
    protected final int[] keyProjection;

    /** 确定value字段和生成行的目标位置的索引。 */
    protected final int[] valueProjection;

    /** 在构造物理数据类型时需要从字段中删除的前缀。 */
    protected final @Nullable
    String keyPrefix;

    // --------------------------------------------------------------------------------------------
    // Kafka特定的属性
    // --------------------------------------------------------------------------------------------

    /** 要消费的Kafka主题。 */
    protected final List topics;

    /** 消费的Kafka主题模式。 */
    protected final Pattern topicPattern;

    /** Kafka消费者的属性。 */
    protected final Properties properties;

    /**
     * 被包含的消费者的启动模式(默认是{@link StartupMode#GROUP_OFFSETS})。
     */
    protected final StartupMode startupMode;

    /**
     * 具体启动补偿;仅当启动模式为{@link时相关StartupMode # SPECIFIC_OFFSETS}。
     */
    protected final Map specificStartupOffsets;

    /**
     * 定位分区偏移量的开始时间戳;仅当启动模式为{@link时相关 StartupMode #时间戳}。
     */
    protected final long startupTimestampMillis;

    /** 标志以确定源模式。在upsert模式下,它将保留tombstone消息。 * */
    protected final boolean upsertMode;

    /** 构造函数,初始化一些信息 */
    public KafkaDynamicSource(
            DataType physicalDataType,
            @Nullable DecodingFormat keyDecodingFormat,
            DecodingFormat valueDecodingFormat,
            int[] keyProjection,
            int[] valueProjection,
            @Nullable String keyPrefix,
            @Nullable List topics,
            @Nullable Pattern topicPattern,
            Properties properties,
            StartupMode startupMode,
            Map specificStartupOffsets,
            long startupTimestampMillis,
            boolean upsertMode) {
        // Format attributes
        this.physicalDataType =
                Preconditions.checkNotNull(
                        physicalDataType, "Physical data type must not be null.");
        this.keyDecodingFormat = keyDecodingFormat;
        this.valueDecodingFormat =
                Preconditions.checkNotNull(
                        valueDecodingFormat, "Value decoding format must not be null.");
        this.keyProjection =
                Preconditions.checkNotNull(keyProjection, "Key projection must not be null.");
        this.valueProjection =
                Preconditions.checkNotNull(valueProjection, "Value projection must not be null.");
        this.keyPrefix = keyPrefix;
        // Mutable attributes
        this.producedDataType = physicalDataType;
        this.metadataKeys = Collections.emptyList();
        this.watermarkStrategy = null;
        // Kafka-specific attributes
        Preconditions.checkArgument(
                (topics != null && topicPattern == null)
                        || (topics == null && topicPattern != null),
                "Either Topic or Topic Pattern must be set for source.");
        this.topics = topics;
        this.topicPattern = topicPattern;
        this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
        this.startupMode =
                Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
        this.specificStartupOffsets =
                Preconditions.checkNotNull(
                        specificStartupOffsets, "Specific offsets must not be null.");
        this.startupTimestampMillis = startupTimestampMillis;
        this.upsertMode = upsertMode;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        return valueDecodingFormat.getChangelogMode();
    }

    /**
     * 返回一个用于读取数据的运行时实现的Provider,这里是主要的业务实现
     */
    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
        final DeserializationSchema keyDeserialization =
                createDeserialization(context, keyDecodingFormat, keyProjection, keyPrefix);

        final DeserializationSchema valueDeserialization =
                createDeserialization(context, valueDecodingFormat, valueProjection, null);

        final TypeInformation producedTypeInfo =
                context.createTypeInformation(producedDataType);

        final FlinkKafkaConsumer kafkaConsumer =
                createKafkaConsumer(keyDeserialization, valueDeserialization, producedTypeInfo);

        return SourceFunctionProvider.of(kafkaConsumer, false);
    }

    /**
     * 返回元数据键的映射及其可以生成的相应数据类型
     */
    @Override
    public Map listReadableMetadata() {
        final Map metadataMap = new LinkedHashMap();

        // according to convention, the order of the final row must be
        // PHYSICAL + FORMAT METADATA + CONNECTOR METADATA
        // where the format metadata has highest precedence

        // add value format metadata with prefix
        valueDecodingFormat
                .listReadableMetadata()
                .forEach((key, value) -> metadataMap.put(VALUE_METADATA_PREFIX + key, value));

        // add connector metadata
        Stream.of(ReadableMetadata.values())
                .forEachOrdered(m -> metadataMap.putIfAbsent(m.key, m.dataType));

        return metadataMap;
    }

    @Override
    public void applyReadableMetadata(List metadataKeys, DataType producedDataType) {
        // separate connector and format metadata
        final List formatMetadataKeys =
                metadataKeys.stream()
                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
                        .collect(Collectors.toList());
        final List connectorMetadataKeys = new ArrayList(metadataKeys);
        connectorMetadataKeys.removeAll(formatMetadataKeys);

        // push down format metadata
        final Map formatMetadata = valueDecodingFormat.listReadableMetadata();
        if (formatMetadata.size() > 0) {
            final List requestedFormatMetadataKeys =
                    formatMetadataKeys.stream()
                            .map(k -> k.substring(VALUE_METADATA_PREFIX.length()))
                            .collect(Collectors.toList());
            valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
        }

        this.metadataKeys = connectorMetadataKeys;
        this.producedDataType = producedDataType;
    }

    @Override
    public void applyWatermark(WatermarkStrategy watermarkStrategy) {
        this.watermarkStrategy = watermarkStrategy;
    }

    @Override
    public DynamicTableSource copy() {
        final KafkaDynamicSource copy =
                new KafkaDynamicSource(
                        physicalDataType,
                        keyDecodingFormat,
                        valueDecodingFormat,
                        keyProjection,
                        valueProjection,
                        keyPrefix,
                        topics,
                        topicPattern,
                        properties,
                        startupMode,
                        specificStartupOffsets,
                        startupTimestampMillis,
                        upsertMode);
        copy.producedDataType = producedDataType;
        copy.metadataKeys = metadataKeys;
        copy.watermarkStrategy = watermarkStrategy;
        return copy;
    }

    @Override
    public String asSummaryString() {
        return "Kafka table source";
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        final KafkaDynamicSource that = (KafkaDynamicSource) o;
        return Objects.equals(producedDataType, that.producedDataType)
                && Objects.equals(metadataKeys, that.metadataKeys)
                && Objects.equals(physicalDataType, that.physicalDataType)
                && Objects.equals(keyDecodingFormat, that.keyDecodingFormat)
                && Objects.equals(valueDecodingFormat, that.valueDecodingFormat)
                && Arrays.equals(keyProjection, that.keyProjection)
                && Arrays.equals(valueProjection, that.valueProjection)
                && Objects.equals(keyPrefix, that.keyPrefix)
                && Objects.equals(topics, that.topics)
                && Objects.equals(String.valueOf(topicPattern), String.valueOf(that.topicPattern))
                && Objects.equals(properties, that.properties)
                && startupMode == that.startupMode
                && Objects.equals(specificStartupOffsets, that.specificStartupOffsets)
                && startupTimestampMillis == that.startupTimestampMillis
                && Objects.equals(upsertMode, that.upsertMode)
                && Objects.equals(watermarkStrategy, that.watermarkStrategy);
    }

    @Override
    public int hashCode() {
        return Objects.hash(
                producedDataType,
                metadataKeys,
                physicalDataType,
                keyDecodingFormat,
                valueDecodingFormat,
                keyProjection,
                valueProjection,
                keyPrefix,
                topics,
                topicPattern,
                properties,
                startupMode,
                specificStartupOffsets,
                startupTimestampMillis,
                upsertMode,
                watermarkStrategy);
    }

    // ---------具体的业务实现------------------------------------------------------------------------

    /** 创建Kafka消费者 **/
    protected FlinkKafkaConsumer createKafkaConsumer(
            DeserializationSchema keyDeserialization,
            DeserializationSchema valueDeserialization,
            TypeInformation producedTypeInfo) {

        final MetadataConverter[] metadataConverters =
                metadataKeys.stream()
                        .map(
                                k ->
                                        Stream.of(ReadableMetadata.values())
                                                .filter(rm -> rm.key.equals(k))
                                                .findFirst()
                                                .orElseThrow(IllegalStateException::new))
                        .map(m -> m.converter)
                        .toArray(MetadataConverter[]::new);

        // check if connector metadata is used at all
        final boolean hasMetadata = metadataKeys.size() > 0;

        // adjust physical arity with value format's metadata
        final int adjustedPhysicalArity =
                producedDataType.getChildren().size() - metadataKeys.size();

        // adjust value format projection to include value format's metadata columns at the end
        final int[] adjustedValueProjection =
                IntStream.concat(
                        IntStream.of(valueProjection),
                        IntStream.range(
                                keyProjection.length + valueProjection.length,
                                adjustedPhysicalArity))
                        .toArray();

        final KafkaDeserializationSchema kafkaDeserializer =
                new DynamicKafkaDeserializationSchema(
                        adjustedPhysicalArity,
                        keyDeserialization,
                        keyProjection,
                        valueDeserialization,
                        adjustedValueProjection,
                        hasMetadata,
                        metadataConverters,
                        producedTypeInfo,
                        upsertMode);

        final FlinkKafkaConsumer kafkaConsumer;
        if (topics != null) {
            kafkaConsumer = new FlinkKafkaConsumer(topics, kafkaDeserializer, properties);
        } else {
            kafkaConsumer = new FlinkKafkaConsumer(topicPattern, kafkaDeserializer, properties);
        }

        switch (startupMode) {
            case EARLIEST:
                kafkaConsumer.setStartFromEarliest();
                break;
            case LATEST:
                kafkaConsumer.setStartFromLatest();
                break;
            case GROUP_OFFSETS:
                kafkaConsumer.setStartFromGroupOffsets();
                break;
            case SPECIFIC_OFFSETS:
                kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
                break;
            case TIMESTAMP:
                kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
                break;
        }

        kafkaConsumer.setCommitOffsetsOnCheckpoints(properties.getProperty("group.id") != null);

        if (watermarkStrategy != null) {
            kafkaConsumer.assignTimestampsAndWatermarks(watermarkStrategy);
        }
        return kafkaConsumer;
    }

    /** 创建反序列化模式 **/
    private @Nullable
    DeserializationSchema createDeserialization(
            DynamicTableSource.Context context,
            @Nullable DecodingFormat format,
            int[] projection,
            @Nullable String prefix) {
        if (format == null) {
            return null;
        }
        DataType physicalFormatDataType =
                DataTypeUtils.projectRow(this.physicalDataType, projection);
        if (prefix != null) {
            physicalFormatDataType = DataTypeUtils.stripRowPrefix(physicalFormatDataType, prefix);
        }
        return format.createRuntimeDecoder(context, physicalFormatDataType);
    }

    // --------------------------------------------------------------------------------------------
    // 元数据处理
    // --------------------------------------------------------------------------------------------

    enum ReadableMetadata {
        TOPIC(
                "topic",
                DataTypes.STRING().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        return StringData.fromString(record.topic());
                    }
                }),

        PARTITION(
                "partition",
                DataTypes.INT().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        return record.partition();
                    }
                }),

        HEADERS(
                "headers",
                // key and value of the map are nullable to make handling easier in queries
                DataTypes.MAP(DataTypes.STRING().nullable(), DataTypes.BYTES().nullable())
                        .notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        final Map map = new HashMap();
                        for (Header header : record.headers()) {
                            map.put(StringData.fromString(header.key()), header.value());
                        }
                        return new GenericMapData(map);
                    }
                }),

        LEADER_EPOCH(
                "leader-epoch",
                DataTypes.INT().nullable(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        return record.leaderEpoch().orElse(null);
                    }
                }),

        OFFSET(
                "offset",
                DataTypes.BIGINT().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        return record.offset();
                    }
                }),

        TIMESTAMP(
                "timestamp",
                DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        return TimestampData.fromEpochMillis(record.timestamp());
                    }
                }),

        TIMESTAMP_TYPE(
                "timestamp-type",
                DataTypes.STRING().notNull(),
                new MetadataConverter() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Object read(ConsumerRecord record) {
                        return StringData.fromString(record.timestampType().toString());
                    }
                });

        final String key;

        final DataType dataType;

        final MetadataConverter converter;

        ReadableMetadata(String key, DataType dataType, MetadataConverter converter) {
            this.key = key;
            this.dataType = dataType;
            this.converter = converter;
        }
    }
}

当然,里面最主要的还是FlinkKafkaConsumer消费者这个类,这里就不再描述具体的代码细节,贴上核心的解析: 在这里插入图片描述 在这里插入图片描述 RichParallelSourceFunction就自己看源码吧,一般都不会改的。

2.4.2 KafkaDynamicSink

原理和KafkaDynamicSource差不多,这里就不再详述了。

03 总结

本文的代码繁多,需要读者耐心的去看,为了更进一步的加深大家了解,这里画了一张图,希望能让大家更加容易理解: 在这里插入图片描述

到这里,本文讲解完了Flink Kafka Connector的源码了,希望能帮助到大家,谢谢大家的阅读,本文完!

关注
打赏
1662376985
查看更多评论
立即登录/注册

微信扫码登录

0.1503s