- 01 引言
- 02 Connector概述
- 2.1 Metadata - 元数据模块
- 2.2 Planning - 规划模块
- 2.3 Runtime - 运行时模块
- 03 Connector相关API
- 3.1 Dynamic Table Factories - 动态表工厂
- 3.2 Dynamic Table Source - 动态表源
- 3.2.1 Scan Table Source - 扫描表源
- 3.2.2 Lookup Table Source - 查找表源
- 3.3 Dynamic Table Sink - 动态表接收器
- 3.4 Encoding / Decoding Formats - 编码/解码格式
- 04 案例演示
- 4.1 工厂实现
- 4.1.1 SocketDynamicTableFactory
- 4.1.2 ChangelogCsvFormatFactory
- 4.2 表源和解码格式
- 4.2.1 SocketDynamicTableSource
- 4.2.2 ChangelogCsvFormat
- 4.3 Runtime运行时
- 4.3.1 ChangelogCsvDeserializer
- 4.3.2 SocketSourceFunction
因为最近需要使用自定义Connectors
,所以参照了Flink
官网的教程,整理了在Flink
上如何实现自定义Source
和Sink
。
官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/sourcessinks/
首先我们需要知道动态表(dynamic tables
)只是一个逻辑概念,Flink
本身并不拥有数据。相反,动态表的内容存储在外部系统(如数据库、键值存储、消息队列)或文件中。
动态源(sources
)和动态接收器(sink
)可用于从外部系统读取数据和向外部系统写入数据,一般源(sources
)和接收器(sink
)通常用术语连接器(connector
)下进行总结。
在许多情况下,开发人员不需要从头开始创建新的连接器(connector
),只要稍微修改现有的连接器或钩子,就可以加载连接器到现有的堆栈。
Flink
实现Connector
的流程如下:
从上图可以看到,
Flink
实现Connector
连接器主要分为Metadata
(元数据)、Planning
(规划),Runtime
(运行时提供者)三个部分的内容。
Table API 和 SQL 都是声明式 API,注意是表的声明。因此,如上图所示,在执行CREATE TABLE
语句会导致目标目录Catalog
中的元数据更新。
对于大多数目标目录Catalog
实现,外部系统中的物理数据不会针对此类操作进行修改,因此Connector
连接器的依赖项不必存在于类路径中,SQL
语句中声明的选项WITH
既不被验证也不被解释。
动态表的元数据(即通过 DDL
创建或由目录提供)表示为CatalogTable
, 必要时,将在CatalogTable
内部解析表名。
如上图所示,目标目录CatalogTable
下一步在对表程序进行规划和优化时,CatalogTable
需要将a
解析为两种动态表类型,分别是:
- DynamicTableSource:用于在查询中读取
SELECT
- DynamicTableSink:用于在语句中写入
INSERT INTO
DynamicTableSourceFactory
和DynamicTableSinkFactory
提供特定于连接器的逻辑,用于将元数据转换为DynamicTableSource
和 DynamicTableSink
实例。在大多数情况下,工厂的目的是验证选项(例如:DynamicTableSourceDynamicTableSink'port' = '5022'
),配置编码/解码格式(如果需要),并创建表连接器的参数化实例。
默认情况下,使用 Java
的服务提供者接口 (SPI
)实现DynamicTableSourceFactory
和DynamicTableSinkFactory
。连接器选项(例如 'connector' = 'custom'
)必须对应于有效的工厂标识符。
虽然它在类命名中可能并不明显,但DynamicTableSource
和DynamicTableSink
可以看作是有状态的工厂,最终产生具体的运行时实现来读取/写入实际数据。
规划器planner
使用源source
和接收器sink
实例来执行特定于连接器connector
的双向通信,直到找到最佳逻辑规划。
根据可选声明的能力接口(例如 SupportsProjectionPushDown
或SupportsOverwrite
),规划器planner
可能会将更改应用于实例,从而改变生成的运行时实现。
一旦逻辑规划完成,规划器
planner
将从表连接器connector
获取运行时实现。运行时逻辑在 Flink
的核心连接器接口中实现,例如InputFormat
或SourceFunction
。
这些接口按另一个抽象级别分组为ScanRuntimeProvider
、 LookupRuntimeProvider
和的子类SinkRuntimeProvider
。
例如:
- OutputFormatProvider:providing org.apache.flink.api.common.io.OutputFormat
- SinkFunctionProvider:providing org.apache.flink.streaming.api.functions.sink.SinkFunction
都是SinkRuntimeProvider
计划者Planner
可以处理的具体实例。
动态表工厂用于根据目录和会话信息为外部存储系统配置动态表连接器。
- org.apache.flink.table.factories.DynamicTableSourceFactory :可以实现构造一个DynamicTableSource.
- org.apache.flink.table.factories.DynamicTableSinkFactory:可以实现构造一个DynamicTableSink.
默认情况下,使用connector
选项的值作为工厂标识符和Java
的服务提供者接口(SPI)来发现工厂。
在 JAR 文件中,可以将对新实现的引用添加到服务文件中:
- META-INF/services/org.apache.flink.table.factories.Factory
该框架由工厂标识符和请求的基类唯一标识的单个匹配工厂检查,例如:DynamicTableSourceFactory
。
如有必要,目录实现可以绕过工厂发现过程。为此,目录需要返回一个实例,该实例在org.apache.flink.table.catalog.Catalog#getFactory.
根据定义,动态表可以随时间变化。
在读取动态表时,内容可以被认为是:
- 一个更改日志(有限或无限),所有更改都会持续使用,直到更改日志用完,这由
ScanTableSource
接口表示。 - 一个不断变化的或非常大的外部表,其内容通常不会被完全读取,而是在必要时查询单个值 ,这由
LookupTableSource
接口表示。
一个类可以同时实现这两个接口,规划器根据指定的查询决定它们的使用。
3.2.1 Scan Table Source - 扫描表源ScanTableSource
(扫描表源)在运行时扫描来自外部存储系统的所有行。
扫描的行不一定只包含插入,还可以包含更新和删除。因此,表源可用于读取(有限或无限)变更日志。返回的更改日志模式指示计划程序在运行时可以预期的一组更改。
- 对于常规的批处理场景,源可以发出有限的仅插入行流。
- 对于常规流式处理方案,源可以发出无限制的仅插入行流。
- 对于变更数据捕获 (CDC) 方案,源可以发出带有插入、更新和删除行的有界或无界流。
Table Source可以实现进一步的能力接口,SupportsProjectionPushDown在规划期间可能会改变实例。所有能力都可以在org.apache.flink.table.connector.source.abilities
包中找到并列在源能力表中。
运行时实现的ScanTableSource必须产生内部数据结构,因此,记录必须以org.apache.flink.table.data.RowData
. 该框架提供了运行时转换器,因此源仍然可以处理常见的数据结构并在最后执行转换。
Lookup TableSource在运行时通过一个或多个键查找外部存储系统的行。
- 与ScanTableSource相比,源不必读取整个表,并且可以在必要时从(可能不断变化的)外部表中懒惰地获取单个值。
- 与ScanTableSource相比, LookupTableSource当前仅支持发出仅插入更改。
LookupTableSource 的运行时由 TableFunction或AsyncTableFunction实现,该函数将在运行时使用给定查找键的值调用。
接口描述SupportsFilterPushDown启用将过滤器下推到DynamicTableSource. 为了提高效率,源可以将过滤器进一步向下推,以接近实际数据生成。SupportsLimitPushDown允许将限制(预期的最大生成记录数)下推到DynamicTableSource.SupportsPartitionPushDown允许将可用分区传递给规划器并将分区下推到DynamicTableSource. 在运行时,源将只从传递的分区列表中读取数据以提高效率。SupportsProjectionPushDown能够将(可能是嵌套的)投影下推到DynamicTableSource. 为了提高效率,源可以将投影进一步向下推,以接近实际的数据生成。如果源也实现SupportsReadingMetadata了,源也将只读取所需的元数据。SupportsReadingMetadata允许从DynamicTableSource. 源负责在生成的行末尾添加所需的元数据。这包括可能从包含的格式转发元数据列。SupportsWatermarkPushDown能够将水印策略下推到DynamicTableSource. 水印策略是时间戳提取和水印生成的构建器/工厂。在运行时,水印生成器位于源内部,并且能够生成每个分区的水印。SupportsSourceWatermark可以完全依赖ScanTableSource 自身提供的水印策略。因此,CREATE TABLEDDL 能够使用SOURCE_WATERMARK()内置标记功能,计划器将检测到该功能,并在可用时将其转换为对该接口的调用。 3.3 Dynamic Table Sink - 动态表接收器根据定义,动态表可以随时间变化。在编写动态表时,可以始终将内容视为更改日志(有限或无限),其中所有更改都被连续写出,直到更改日志用完为止。返回的更改日志模式 指示接收器在运行时接受的更改集。
- 对于常规批处理场景,接收器可以仅接受仅插入行并写出有界流。
- 对于常规的流式处理方案,接收器只能接受仅插入行,并且可以写出无界流。
- 对于变更数据捕获 (CDC) 场景,接收器可以使用插入、更新和删除行写出有界或无界流。
表接收器可以实现进一步的能力接口,所有能力都可以在org.apache.flink.table.connector.sink.abilities
包装中找到,并列在水槽能力表中。
运行时实现DynamicTableSink
必须使用内部数据结构。因此,记录必须被接受为org.apache.flink.table.data.RowData
, 该框架提供了运行时转换器,因此接收器仍然可以在通用数据结构上工作并在开始时执行转换。
一些表连接器接受对键和/或值进行编码和解码的不同格式。
格式的工作方式类似于模式DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider
,工厂负责翻译选项,源负责创建运行时逻辑。
因为格式可能位于不同的模块中,所以使用类似于表工厂的 Java 服务提供者接口来发现它们。为了发现格式工厂,动态表工厂搜索与工厂标识符和特定于连接器的基类相对应的工厂。
例如,Kafka
表源需要一个DeserializationSchema
作为运行时接口的解码格式。因此,Kafka
表源工厂使用该value.format
选项的值来发现一个DeserializationFormatFactory
.
当前支持以下格式工厂:
org.apache.flink.table.factories.DeserializationFormatFactory
org.apache.flink.table.factories.SerializationFormatFactory
格式工厂将选项转换为EncodingFormat
或DecodingFormat
。这些接口是另一种为给定数据类型生成专用格式运行时逻辑的工厂。
例如:对于 Kafka
表源工厂,DeserializationFormatFactory
将返回一个EncodingFormat
可以传递给 Kafka
表源的值。
需求:表源使用简单的单线程SourceFunction
打开一个监听传入字节的socket
,原始字节通过可插入格式解码成行,该格式需要一个更改日志标志作为第一列。
下面讲解所有组件是如何协同工作的,比如:
- 创建解析和验证选项的工厂
- 实现表连接器
- 实现和发现自定义格式,
- 使用提供的工具类,例如:数据结构转换器和
FactoryUtil
.
我们将使用上面提到的大部分接口来启用以下 DDL
:
CREATE TABLE UserScores (name STRING, score INT)
WITH (
'connector' = 'socket',
'hostname' = 'localhost',
'port' = '9999',
'byte-delimiter' = '10',
'format' = 'changelog-csv',
'changelog-csv.column-delimiter' = '|'
);
由于该格式支持变更日志语义,我们能够在运行时摄取更新并创建一个可以持续评估变化数据的更新视图:
SELECT name, SUM(score) FROM UserScores GROUP BY name;
使用以下命令在终端中抓取数据:
> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18
4.1 工厂实现
这里说明如何将来自目录的元数据(Metadata
)转换为具体的连接器实例。
注意:两个工厂都已添加到META-INF/services目录中
4.1.1 SocketDynamicTableFactory这里将SocketDynamicTableFactory
目录表转换为表源(因为表源需要解码格式,所以FactoryUtil
为了方便起见,我们正在使用提供的格式来发现格式)。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
// define all options statically
public static final ConfigOption HOSTNAME = ConfigOptions.key("hostname")
.stringType()
.noDefaultValue();
public static final ConfigOption PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();
public static final ConfigOption BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
.intType()
.defaultValue(10); // corresponds to '\n'
@Override
public String factoryIdentifier() {
return "socket"; // used for matching to `connector = '...'`
}
@Override
public Set requiredOptions() {
final Set options = new HashSet();
options.add(HOSTNAME);
options.add(PORT);
options.add(FactoryUtil.FORMAT); // use pre-defined option for format
return options;
}
@Override
public Set optionalOptions() {
final Set options = new HashSet();
options.add(BYTE_DELIMITER);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// validate all options
helper.validate();
// get the validated options
final ReadableConfig options = helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(PORT);
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
}
4.1.2 ChangelogCsvFormatFactory
ChangelogCsvFormatFactory
的功能主要是转换为特定的格式。
FactoryUtilinSocketDynamicTableFactory
负责相应地调整选项键(option keys
)并处理前缀,如changelog-csv.column-delimiter
.
因为这个工厂实现DeserializationFormatFactory
了 ,它也可以用于支持反序列化格式的其他连接器,例如 Kafka 连接器。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
// define all options statically
public static final ConfigOption COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
.stringType()
.defaultValue("|");
@Override
public String factoryIdentifier() {
return "changelog-csv";
}
@Override
public Set requiredOptions() {
return Collections.emptySet();
}
@Override
public Set optionalOptions() {
final Set options = new HashSet();
options.add(COLUMN_DELIMITER);
return options;
}
@Override
public DecodingFormat createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
// either implement your custom validation logic here ...
// or use the provided helper method
FactoryUtil.validateFactoryOptions(this, formatOptions);
// get the validated options
final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
// create and return the format
return new ChangelogCsvFormat(columnDelimiter);
}
}
4.2 表源和解码格式
这里讲解如何从计划层的实例转换为交付到集群的运行时实例。
4.2.1 SocketDynamicTableSourceSocketDynamicTableSource在计划期间使用。在我们的示例中,我们没有实现任何可用的能力接口。
因此,主要逻辑(getScanRuntimeProvider(...)
) 可以在我们实例化所需SourceFunction
及其DeserializationSchema
运行时的地方找到。两个实例都被参数化以返回内部数据结构(即RowData)。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
public class SocketDynamicTableSource implements ScanTableSource {
private final String hostname;
private final int port;
private final byte byteDelimiter;
private final DecodingFormat decodingFormat;
private final DataType producedDataType;
public SocketDynamicTableSource(
String hostname,
int port,
byte byteDelimiter,
DecodingFormat decodingFormat,
DataType producedDataType) {
this.hostname = hostname;
this.port = port;
this.byteDelimiter = byteDelimiter;
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
}
@Override
public ChangelogMode getChangelogMode() {
// in our example the format decides about the changelog mode
// but it could also be the source itself
return decodingFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// create runtime classes that are shipped to the cluster
final DeserializationSchema deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
final SourceFunction sourceFunction = new SocketSourceFunction(
hostname,
port,
byteDelimiter,
deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
@Override
public String asSummaryString() {
return "Socket Table Source";
}
}
4.2.2 ChangelogCsvFormat
ChangelogCsvFormat
是DeserializationSchema
在运行时使用的解码格式。它支持INSERT
和DELETE
更改。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
public class ChangelogCsvFormat implements DecodingFormat {
private final String columnDelimiter;
public ChangelogCsvFormat(String columnDelimiter) {
this.columnDelimiter = columnDelimiter;
}
@Override
@SuppressWarnings("unchecked")
public DeserializationSchema createRuntimeDecoder(
DynamicTableSource.Context context,
DataType producedDataType) {
// create type information for the DeserializationSchema
final TypeInformation producedTypeInfo = (TypeInformation) context.createTypeInformation(
producedDataType);
// most of the code in DeserializationSchema will not work on internal data structures
// create a converter for conversion at the end
final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
// use logical types during runtime for parsing
final List parsingTypes = producedDataType.getLogicalType().getChildren();
// create runtime class
return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
}
@Override
public ChangelogMode getChangelogMode() {
// define that this format can produce INSERT and DELETE rows
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.build();
}
}
4.3 Runtime运行时
接下来将说明SourceFunction
和DeserializationSchema
的运行时逻辑。
ChangelogCsvDeserializer
包含一个简单的解析逻辑,用于将字节转换Integer
类型Row
行为String
,最后的转换步骤将它们转换为内部数据结构。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
public class ChangelogCsvDeserializer implements DeserializationSchema {
private final List parsingTypes;
private final DataStructureConverter converter;
private final TypeInformation producedTypeInfo;
private final String columnDelimiter;
public ChangelogCsvDeserializer(
List parsingTypes,
DataStructureConverter converter,
TypeInformation producedTypeInfo,
String columnDelimiter) {
this.parsingTypes = parsingTypes;
this.converter = converter;
this.producedTypeInfo = producedTypeInfo;
this.columnDelimiter = columnDelimiter;
}
@Override
public TypeInformation getProducedType() {
// return the type information required by Flink's core interfaces
return producedTypeInfo;
}
@Override
public void open(InitializationContext context) {
// converters must be open
converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
}
@Override
public RowData deserialize(byte[] message) {
// parse the columns including a changelog flag
final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
final RowKind kind = RowKind.valueOf(columns[0]);
final Row row = new Row(kind, parsingTypes.size());
for (int i = 0; i getRuntimeContext().getMetricGroup());
}
@Override
public void run(SourceContext ctx) throws Exception {
while (isRunning) {
// open and consume from socket
try (final Socket socket = new Socket()) {
currentSocket = socket;
socket.connect(new InetSocketAddress(hostname, port), 0);
try (InputStream stream = socket.getInputStream()) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b;
while ((b = stream.read()) >= 0) {
// buffer until delimiter
if (b != byteDelimiter) {
buffer.write(b);
}
// decode and emit record
else {
ctx.collect(deserializer.deserialize(buffer.toByteArray()));
buffer.reset();
}
}
}
} catch (Throwable t) {
t.printStackTrace(); // print and continue
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
try {
currentSocket.close();
} catch (Throwable t) {
// ignore
}
}
}