以官网的例子为起点,选用Kafka为source和sink ,了解下批流统一的使用cuiyaonan2000@163.com
批流统一注册连接外部资源或者说是注册一个虚拟表或者实体表有2种方式,一种就是 如下例子用的使用SQL等方式,领完一种就是使用TableApi的方式,但是官网关于TableApi的方式的说明甚少cuiyaonan2000@163.com
如下所示是使用TableDescriptiors的方式来创建一个临时表或者实体表,但是官网还是主推的使用SQL的形式来创建表,数据来源使用关键字with来获取
另外真的想说还是直接用DataStreamApi吧.又发现个bug,红框的内容不生效,造成新建的topic没法使用group-offsets,除非使用kafka命令创建初始化的offsetcuiyaonan2000@163.com
参考资料:
- Kafka | Apache Flink ----表连接器
- JSON | Apache Flink ----表格式器
如下是官网创建Kafka的SQL,后面都是针对该SQL开始的.据说会了Kafka,我们就可以掌握Hive,Haddop所以一个药引子的作用非常大cuiyaonan2000@163.com
package cui.yao.nan.flink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.nio.file.FileSystem;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-3-24 16:13
*/
public class Test2 {
public static void main(String[] args) throws Exception {
//创建流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
TableResult tableResult = tableEnv.executeSql("CREATE TABLE jjjk (" +
" myoffset BIGINT METADATA FROM 'offset' VIRTUAL," +
" mypartition BIGINT METADATA FROM 'partition' VIRTUAL," +
" id BIGINT," +
" name STRING," +
" age BIGINT " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'topic-name-cui'," +
" 'properties.bootstrap.servers' = '172.17.15.2:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset'," +
" 'format' = 'json'," +
" 'json.fail-on-missing-field' = 'false'," +
" 'json.ignore-parse-errors' = 'true'" +
")");
Table table = tableEnv.sqlQuery("select id,name,age,mypartition,myoffset From jjjk");
// 将该视图结果在转成一个流
DataStream resultStream = tableEnv.toDataStream(table);
// add a printing sink and execute in DataStream API
resultStream.print();
env.execute();
}
}
SQLDDL connect kafka 配置信息
基于最新版本flink1.17.1
获取kafka服务的相关信息以下的连接器元数据可以在表定义中通过元数据列的形式获取。
R/W
列定义了一个元数据是可读的(R
)还是可写的(W
)。 只读列必须声明为 VIRTUAL
以在 INSERT INTO
操作中排除它们。
topic
STRING NOT NULL
Kafka 记录的 Topic 名。R
partition
INT NOT NULL
Kafka 记录的 partition ID。R
headers
MAP NOT NULL
二进制 Map 类型的 Kafka 记录头(Header)。R/W
leader-epoch
INT NULL
Kafka 记录的 Leader epoch(如果可用)。R
offset
BIGINT NOT NULL
Kafka 记录在 partition 中的 offset。R
timestamp
TIMESTAMP_LTZ(3) NOT NULL
Kafka 记录的时间戳。R/W
timestamp-type
STRING NOT NULL
Kafka 记录的时间戳类型。可能的类型有 "NoTimestampType", "CreateTime"(会在写入元数据时设置),或 "LogAppendTime"。R
实例
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --kafka的信息
`partition` BIGINT METADATA VIRTUAL, --kafka的信息
`offset` BIGINT METADATA VIRTUAL, --kafka的信息
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
获取kafka消息体的信息
连接器可以读出消息格式的元数据。格式元数据的配置键以 'value.'
作为前缀。
以下示例展示了如何获取 Kafka 和 Debezium 的元数据字段:
区别与上面的写法, 这样子可以使用别名,而不用服务自己的名字cuiyaonan2000@163.com
CREATE TABLE KafkaTable (
`event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- from Debezium format
`origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
`partition_id` BIGINT METADATA FROM 'partition' VIRTUAL, -- from Kafka connector
`offset` BIGINT METADATA VIRTUAL, -- from Kafka connector
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
连接器参数 #
参数是否必选默认值数据类型描述 connector
必选(无)String指定使用的连接器,Kafka 连接器使用'kafka'
。 topic
required for sink(无)String当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如'topic-1;topic-2'
。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。 topic-pattern
可选(无)String匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。properties.bootstrap.servers
必选(无)String逗号分隔的 Kafka broker 列表。properties.group.id
对 source 可选,不适用于 sink(无)StringKafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 "KafkaSource-{tableIdentifier}" 作为消费组 ID。properties.*
可选(无)String可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 Kafka 配置文档 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过'properties.allow.auto.create.topics' = 'false'
来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer'
和 'value.deserializer'
。 format
必选(无)String用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和'value.format'
二者必需其一。 key.format
可选(无)String用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项'key.fields'
也是必需的。 否则 Kafka 记录将使用空值作为键。 key.fields
可选[]List表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为'field1;field2'
。 key.fields-prefix
可选(无)String为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项'key.fields'
都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include'
配置为 'EXCEPT_KEY'
。 value.format
必选(无)String序列化和反序列化 Kafka 消息体时使用的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和'format'
二者必需其一。 value.fields-include
可选ALL枚举类型
可选值:[ALL, EXCEPT_KEY]定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中'ALL'
即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。 scan.startup.mode
可选group-offsetsEnumKafka consumer 的启动模式。有效值为:'earliest-offset'
,'latest-offset'
,'group-offsets'
,'timestamp'
和 'specific-offsets'
。 请参阅下方 起始消费位点 以获取更多细节。 scan.startup.specific-offsets
可选(无)String在使用'specific-offsets'
启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'
。 scan.startup.timestamp-millis
可选(无)Long在使用'timestamp'
启动模式时指定启动的时间戳(单位毫秒)。 scan.bounded.mode
optionalunboundedEnumBounded mode for Kafka consumer, valid values are'latest-offset'
, 'group-offsets'
, 'timestamp'
and 'specific-offsets'
. See the following Bounded Ending Position for more details. scan.bounded.specific-offsets
optionalyes(none)StringSpecify offsets for each partition in case of'specific-offsets'
bounded mode, e.g. 'partition:0,offset:42;partition:1,offset:300'. If an offset for a partition is not provided it will not consume from that partition.
. scan.bounded.timestamp-millis
optionalyes(none)LongEnd at the specified epoch timestamp (milliseconds) used in case of'timestamp'
bounded mode. scan.topic-partition-discovery.interval
可选(无)DurationConsumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。sink.partitioner
可选'default'StringFlink partition 到 Kafka partition 的分区映射关系,可选值有:default
:使用 Kafka 默认的分区器对消息进行分区。fixed
:每个 Flink partition 最终对应最多一个 Kafka partition。round-robin
:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。- 自定义
FlinkKafkaPartitioner
的子类:例如'org.mycompany.MyPartitioner'
。
sink.semantic
可选at-least-onceString定义 Kafka sink 的语义。有效值为'at-least-once'
,'exactly-once'
和 'none'
。请参阅 一致性保证 以获取更多细节。 sink.parallelism
可选(无)Integer定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 JSON Format 参数 # 参数是否必须默认值类型描述format
必选(none)String声明使用的格式,这里应为'json'
。 json.fail-on-missing-field
可选falseBoolean当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。json.ignore-parse-errors
可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
。 json.timestamp-format.standard
可选'SQL'
String声明输入和输出的 TIMESTAMP
和 TIMESTAMP_LTZ
的格式。当前支持的格式为'SQL'
以及 'ISO-8601'
:
- 可选参数
'SQL'
将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。 - 可选参数
'ISO-8601'
将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。
json.map-null-key.mode
选填'FAIL'
String指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL'
, 'DROP'
和 'LITERAL'
:
- Option
'FAIL'
将抛出异常,如果遇到 Map 中 key 值为空的数据。 - Option
'DROP'
将丢弃 Map 中 key 值为空的数据项。 - Option
'LITERAL'
将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由'json.map-null-key.literal'
定义。
json.map-null-key.literal
选填'null'String当'json.map-null-key.mode'
是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。 json.encode.decimal-as-plain-number
选填falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027
默认会表示为 2.7E-8
。当此选项设为 true 时,则会表示为 0.000000027
。