您当前的位置: 首页 >  flink

cuiyaonan2000

暂无认证

  • 3浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink的批流统一:Ⅴ

cuiyaonan2000 发布时间:2022-03-28 16:38:20 ,浏览量:3

序言

以官网的例子为起点,选用Kafka为source和sink ,了解下批流统一的使用cuiyaonan2000@163.com

批流统一注册连接外部资源或者说是注册一个虚拟表或者实体表有2种方式,一种就是 如下例子用的使用SQL等方式,领完一种就是使用TableApi的方式,但是官网关于TableApi的方式的说明甚少cuiyaonan2000@163.com

如下所示是使用TableDescriptiors的方式来创建一个临时表或者实体表,但是官网还是主推的使用SQL的形式来创建表,数据来源使用关键字with来获取

另外真的想说还是直接用DataStreamApi吧.又发现个bug,红框的内容不生效,造成新建的topic没法使用group-offsets,除非使用kafka命令创建初始化的offsetcuiyaonan2000@163.com

 

参考资料:

  1. Kafka | Apache Flink       ----表连接器
  2. JSON | Apache Flink       ----表格式器

DEMO

如下是官网创建Kafka的SQL,后面都是针对该SQL开始的.据说会了Kafka,我们就可以掌握Hive,Haddop所以一个药引子的作用非常大cuiyaonan2000@163.com

package cui.yao.nan.flink;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.nio.file.FileSystem;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-3-24  16:13
 */
public class Test2 {

    public static void main(String[] args) throws Exception {


        //创建流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //创建表环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        TableResult tableResult = tableEnv.executeSql("CREATE TABLE jjjk (" +
                "  myoffset  BIGINT METADATA FROM 'offset' VIRTUAL," +
                "  mypartition  BIGINT METADATA FROM 'partition' VIRTUAL," +
                "  id BIGINT," +
                "  name STRING," +
                "  age BIGINT " +
                ") WITH (" +
                " 'connector' = 'kafka'," +
                " 'topic' = 'topic-name-cui'," +
                " 'properties.bootstrap.servers' = '172.17.15.2:9092'," +
                " 'properties.group.id' = 'testGroup'," +
                " 'scan.startup.mode' = 'earliest-offset'," +
                " 'format' = 'json'," +
                " 'json.fail-on-missing-field' = 'false'," +
                " 'json.ignore-parse-errors' = 'true'" +
                ")");

        Table table = tableEnv.sqlQuery("select id,name,age,mypartition,myoffset From jjjk");

        // 将该视图结果在转成一个流
        DataStream resultStream = tableEnv.toDataStream(table);

        // add a printing sink and execute in DataStream API
        resultStream.print();

        env.execute();
    }
}

SQLDDL connect kafka 配置信息

基于最新版本flink1.17.1

获取kafka服务的相关信息

以下的连接器元数据可以在表定义中通过元数据列的形式获取。

R/W 列定义了一个元数据是可读的(R)还是可写的(W)。 只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作中排除它们。

键数据类型描述R/WtopicSTRING NOT NULLKafka 记录的 Topic 名。RpartitionINT NOT NULLKafka 记录的 partition ID。RheadersMAP NOT NULL二进制 Map 类型的 Kafka 记录头(Header)。R/Wleader-epochINT NULLKafka 记录的 Leader epoch(如果可用)。RoffsetBIGINT NOT NULLKafka 记录在 partition 中的 offset。RtimestampTIMESTAMP_LTZ(3) NOT NULLKafka 记录的时间戳。R/Wtimestamp-typeSTRING NOT NULLKafka 记录的时间戳类型。可能的类型有 "NoTimestampType", "CreateTime"(会在写入元数据时设置),或 "LogAppendTime"。R

实例

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',   --kafka的信息
  `partition` BIGINT METADATA VIRTUAL,        --kafka的信息
  `offset` BIGINT METADATA VIRTUAL,        --kafka的信息
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);
获取kafka消息体的信息

连接器可以读出消息格式的元数据。格式元数据的配置键以 'value.' 作为前缀。

以下示例展示了如何获取 Kafka 和 Debezium 的元数据字段:

区别与上面的写法, 这样子可以使用别名,而不用服务自己的名字cuiyaonan2000@163.com

CREATE TABLE KafkaTable (
  `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,  -- from Debezium format
  `origin_table` STRING METADATA FROM 'value.source.table' VIRTUAL, -- from Debezium format
  `partition_id` BIGINT METADATA FROM 'partition' VIRTUAL,  -- from Kafka connector
  `offset` BIGINT METADATA VIRTUAL,  -- from Kafka connector
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'debezium-json'
);

连接器参数 #

参数是否必选默认值数据类型描述

connector

必选(无)String指定使用的连接器,Kafka 连接器使用 'kafka'

topic

required for sink(无)String当表用作 source 时读取数据的 topic 名。亦支持用分号间隔的 topic 列表,如 'topic-1;topic-2'。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。当表被用作 sink 时,该配置表示写入的 topic 名。注意 sink 表不支持 topic 列表。

topic-pattern

可选(无)String匹配读取 topic 名称的正则表达式。在作业开始运行时,所有匹配该正则表达式的 topic 都将被 Kafka consumer 订阅。注意,对 source 表而言,'topic' 和 'topic-pattern' 两个选项只能使用其中一个。

properties.bootstrap.servers

必选(无)String逗号分隔的 Kafka broker 列表。

properties.group.id

对 source 可选,不适用于 sink(无)StringKafka source 的消费组 id。如果未指定消费组 ID,则会使用自动生成的 "KafkaSource-{tableIdentifier}" 作为消费组 ID。

properties.*

可选(无)String可以设置和传递任意 Kafka 的配置项。后缀名必须匹配在 Kafka 配置文档 中定义的配置键。Flink 将移除 "properties." 配置键前缀并将变换后的配置键和值传入底层的 Kafka 客户端。例如,你可以通过 'properties.allow.auto.create.topics' = 'false' 来禁用 topic 的自动创建。但是某些配置项不支持进行配置,因为 Flink 会覆盖这些配置,例如 'key.deserializer' 和 'value.deserializer'

format

必选(无)String用来序列化或反序列化 Kafka 消息的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'value.format' 二者必需其一。

key.format

可选(无)String用来序列化和反序列化 Kafka 消息键(Key)的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:如果定义了键格式,则配置项 'key.fields' 也是必需的。 否则 Kafka 记录将使用空值作为键。

key.fields

可选[]List表结构中用来配置消息键(Key)格式数据类型的字段列表。默认情况下该列表为空,因此消息键没有定义。 列表格式为 'field1;field2'

key.fields-prefix

可选(无)String为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 'key.fields' 都需要使用带前缀的名称。 当构建消息键格式字段时,前缀会被移除,消息键格式将会使用无前缀的名称。 请注意该配置项要求必须将 'value.fields-include' 配置为 'EXCEPT_KEY'

value.format

必选(无)String序列化和反序列化 Kafka 消息体时使用的格式。 请参阅 格式 页面以获取更多关于格式的细节和相关配置项。 注意:该配置项和 'format' 二者必需其一。

value.fields-include

可选ALL

枚举类型

可选值:[ALL, EXCEPT_KEY]定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 'ALL' 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

scan.startup.mode

可选group-offsetsEnumKafka consumer 的启动模式。有效值为:'earliest-offset''latest-offset''group-offsets''timestamp' 和 'specific-offsets'。 请参阅下方 起始消费位点 以获取更多细节。

scan.startup.specific-offsets

可选(无)String在使用 'specific-offsets' 启动模式时为每个 partition 指定 offset,例如 'partition:0,offset:42;partition:1,offset:300'

scan.startup.timestamp-millis

可选(无)Long在使用 'timestamp' 启动模式时指定启动的时间戳(单位毫秒)。

scan.bounded.mode

optionalunboundedEnumBounded mode for Kafka consumer, valid values are 'latest-offset''group-offsets''timestamp' and 'specific-offsets'. See the following Bounded Ending Position for more details.

scan.bounded.specific-offsets

optionalyes(none)StringSpecify offsets for each partition in case of 'specific-offsets' bounded mode, e.g. 'partition:0,offset:42;partition:1,offset:300'. If an offset for a partition is not provided it will not consume from that partition..

scan.bounded.timestamp-millis

optionalyes(none)LongEnd at the specified epoch timestamp (milliseconds) used in case of 'timestamp' bounded mode.

scan.topic-partition-discovery.interval

可选(无)DurationConsumer 定期探测动态创建的 Kafka topic 和 partition 的时间间隔。

sink.partitioner

可选'default'StringFlink partition 到 Kafka partition 的分区映射关系,可选值有:
  • default:使用 Kafka 默认的分区器对消息进行分区。
  • fixed:每个 Flink partition 最终对应最多一个 Kafka partition。
  • round-robin:Flink partition 按轮循(round-robin)的模式对应到 Kafka partition。只有当未指定消息的消息键时生效。
  • 自定义 FlinkKafkaPartitioner 的子类:例如 'org.mycompany.MyPartitioner'
请参阅下方 Sink 分区 以获取更多细节。

sink.semantic

可选at-least-onceString定义 Kafka sink 的语义。有效值为 'at-least-once''exactly-once' 和 'none'。请参阅 一致性保证 以获取更多细节。

sink.parallelism

可选(无)Integer定义 Kafka sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。

JSON Format 参数 # 参数是否必须默认值类型描述

format

必选(none)String声明使用的格式,这里应为'json'

json.fail-on-missing-field

可选falseBoolean当解析字段缺失时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。

json.ignore-parse-errors

可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null

json.timestamp-format.standard

可选'SQL'String声明输入和输出的 TIMESTAMP 和 TIMESTAMP_LTZ 的格式。当前支持的格式为'SQL' 以及 'ISO-8601'
  • 可选参数 'SQL' 将会以 "yyyy-MM-dd HH:mm:ss.s{precision}" 的格式解析 TIMESTAMP, 例如 "2020-12-30 12:13:14.123", 以 "yyyy-MM-dd HH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30 12:13:14.123Z" 且会以相同的格式输出。
  • 可选参数 'ISO-8601' 将会以 "yyyy-MM-ddTHH:mm:ss.s{precision}" 的格式解析输入 TIMESTAMP, 例如 "2020-12-30T12:13:14.123" , 以 "yyyy-MM-ddTHH:mm:ss.s{precision}'Z'" 的格式解析 TIMESTAMP_LTZ, 例如 "2020-12-30T12:13:14.123Z" 且会以相同的格式输出。

json.map-null-key.mode

选填'FAIL'String指定处理 Map 中 key 值为空的方法. 当前支持的值有 'FAIL''DROP' 和 'LITERAL':
  • Option 'FAIL' 将抛出异常,如果遇到 Map 中 key 值为空的数据。
  • Option 'DROP' 将丢弃 Map 中 key 值为空的数据项。
  • Option 'LITERAL' 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 'json.map-null-key.literal' 定义。

json.map-null-key.literal

选填'null'String当 'json.map-null-key.mode' 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。

json.encode.decimal-as-plain-number

选填falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0403s