您当前的位置: 首页 >  flink

cuiyaonan2000

暂无认证

  • 2浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink : Custom Source

cuiyaonan2000 发布时间:2022-03-10 17:01:06 ,浏览量:2

序言

官网提供了几种连接器,帮助我们直接配置Kafka,ElasticSearch数据源.

但是在V1.14.3.之前使用的Kafka连接器已经被弃用了~~~短短几个版本的事情,变化可以真不小.cuiyaonan2000@163.com

基于V1.14.3 我们梳理下自定义数据源,同时可以了解Flink是如何读取数据的,如何拆分数据任务并行处理,以提高处理的效率

参考文档:

  1. 数据源 | Apache Flink

Source 核心

一个数据 source 包括三个核心组件:分片(Splits)、分片枚举器(SplitEnumerator) 以及 源阅读器(SourceReader)。

  • 分片(Split) 是对一部分 source 数据的包装,如一个文件或者日志分区。分片是 source 进行任务分配和数据并行读取的基本粒度。-----source被拆分的最小单位,即程序读取的最小数据单元

  • 源阅读器(SourceReader) 会请求分片并进行处理,例如读取分片所表示的文件或日志分区。SourceReader 在 TaskManagers 上的 SourceOperators 并行运行,并产生并行的事件流/记录流。-------------TaskManagers上读取分片,并处理,即读取数据并处理

  • 分片枚举器(SplitEnumerator) 会生成分片并将它们分配给 SourceReader。该组件在 JobManager 上以单并行度运行(这里说明source的分片拆分是单线程处理的cuiyaonan2000@163.com),负责对未分配的分片进行维护,并以均衡的方式将其分配给 reader。----------------用于给sourcereader 分任

Source 类作为API入口,将上述三个组件结合在了一起。

流模式/批模式下Source的差异

事实上,这两种情况之间的区别是非常小的:

  • 有界/批处理模式:   枚举器生成固定数量的分片,而且每个分片都必须是有限的(每个分片大小都是固定的)。
  • 流模式:   但在无界流的情况下,则无需遵从限制,也就是分片大小可以不是有限的(分片大小不固定),或者枚举器将不断生成新的分片(分片数量不固定)。

有界 File Source 

Source 将包含待读取目录的 URI/路径(Path),以及一个定义了如何对文件进行解析的 格式(Format)。在该情况下:

  • 分片是一个文件,或者是文件的一个区域(如果该文件格式支持对文件进行拆分)。(这里有个非常重要的关注点,如果我们的source是mysql或者mongodb 而且数据量特别多,Flink会有什么策略来解决大批量的数据运算cuiyaonan2000@163.com)
  • SplitEnumerator 将会列举给定目录路径下的所有文件,并在收到来自 reader 的请求时对分片进行分配。一旦所有的分片都被分配完毕,则会使用 NoMoreSplits 来响应请求。
  • SourceReader 则会请求分片,读取所分配的分片(文件或者文件区域),并使用给定的格式进行解析。如果当前请求没有获得下一个分片,而是 NoMoreSplits,则会终止任务。

无界 Streaming File Source

这个 source 的工作方式与上面描述的基本相同,除了 SplitEnumerator 从不会使用 NoMoreSplits 来响应 SourceReader 的请求,并且还会定期列出给定 URI/路径下的文件来检查是否有新文件。一旦发现新文件,则生成对应的新分片,并将它们分配给空闲的 SourceReader。

无界 Kafka Source

Source 将具有 Kafka Topic(亦或者一系列 Topics 或者通过正则表达式匹配的 Topic)以及一个 解析器(Deserializer) 来解析记录(record)。

  • 分片是一个 Kafka Topic Partition。
  • SplitEnumerator 会连接到 broker 从而列举出已订阅的 Topics 中的所有 Topic Partitions。枚举器可以重复此操作以检查是否有新的 Topics/Partitions。
  • SourceReader 使用 KafkaConsumer 读取所分配的分片(Topic Partition),并使用提供的 解析器 反序列化记录。由于流处理中分片(Topic Partition)大小是无限的,因此 reader 永远无法读取到数据的尾部。

有界 Kafka Source

这种情况下,除了每个分片(Topic Partition)都会有一个预定义的结束偏移量,其他与上述相同。一旦 SourceReader 读取到分片的结束偏移量,整个分片的读取就会结束。而一旦所有所分配的分片读取结束,SourceReader 也就终止任务了。

Source Function

算子区分RichFunction和Function,Source不但区分了RicchFunction和Function,还有多线程的Function,以及异步的Function.

既然SourceFunction可以重写,那邪恶想法油然而生,流式计算果然是大数据的大一统的标准cuiyaonan2000@163.com

  1. RichSourceFunction/SourceFunction : 并行度只能为1,读取数据源
  2. RichParallelSourceFunction/ParallelSourceFunction : 多并行度读取数据源
Source 不同来源的容错保证

Source

语义保证

kafka

exactly once

Collections

exactly once

Files

exactly once

Socktes

at most once

并行度用例
package cui.yao.nan.flink.source.test;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-3-11  14:16
 */
public class TestSourceAction {

    public static void main(String[] args) throws InterruptedException {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


/*
        通过Java的collection集合创建一个数据流,集合中的所有元素必须是相同类型的
        DataStreamSource nums = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9));

        //这一这的source的并行度只能为1
        nums.setParallelism(1).map(a->{
            System.out.println(Thread.currentThread().getName());
            return a+1;
        }).setParallelism(2).print().setParallelism(2);

*/


/*
        单并行度的数据源
        DataStreamSource streamSource = env.addSource(new TestSourceAction()
                .new MyRichSourceFunction());

        streamSource.map(a->{
            System.out.println(Thread.currentThread().getName());
            return a;
        }).returns(Types.TUPLE(Types.STRING)).setParallelism(2).print();
*/


        MyRichParallelSourceFunction mrpf = new TestSourceAction().new MyRichParallelSourceFunction();

        DataStreamSource streamSource = env.addSource(mrpf).setParallelism(3);

        streamSource.map(a->{
            System.out.println("map处理的线程"+Thread.currentThread().getName());
            return a;
        }).returns(Types.TUPLE(Types.STRING)).setParallelism(2).print();


        try {
            env.execute("start kafkaconsume");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }


        Thread.sleep(5000);

    }

    public class MyRichSourceFunction extends RichSourceFunction{

        Boolean closeSource = false;

        @Override
        public void run(SourceContext sourceContext) throws Exception {

            int i = 0 ;
            while(!closeSource){
                Thread.sleep(1000);
                sourceContext.collect(Tuple1.of(i++ + ""));
            }
        }

        @Override
        public void cancel() {
            closeSource = true;
        }
    }

    public class MyRichParallelSourceFunction extends RichParallelSourceFunction{

        Boolean closeSource = false;

        @Override
        public void run(SourceContext sourceContext) throws Exception {
            int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
            int i = 0 ;
            while(!closeSource){
                Thread.sleep(1000);
                System.out.println("多并行度数据源的线程名:"+Thread.currentThread().getName()
                +"    subtaskIndex的值为:" +subtaskIndex);
                sourceContext.collect(Tuple1.of(i++ + ""));
            }
        }

        @Override
        public void cancel() {
            closeSource = true;
        }
    }


}

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0381s