您当前的位置: 首页 >  flink

cuiyaonan2000

暂无认证

  • 0浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink:算子

cuiyaonan2000 发布时间:2021-10-25 18:11:52 ,浏览量:0

序言

整理一下flink的任务吧.

我们在搞定了source和sink后.那算子就很重要了.为啥子呢?因为算子就是我们处理source的过程,最后需要sink到指定的存储空间里.cuiyaonan200@163.com

本文基于官网v1.13.2的版本整理

官方网址:概览 | Apache Flink

算子

用户通过算子能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子合并成一个复杂的数据流拓扑。

这部分内容将描述 Flink DataStream API 中基本的数据转换API,数据转换后各种数据分区方式,以及算子的链接策略。

其实看了API感觉官网可以在进一步组合api,我觉得算子可以分为3大类(后面在慢慢整理吧cuiyaonan2000@163.com):

  1. 针对data的算子
  2. 针对window的算子
  3. 针对stream的算子

Map 

DataStream → DataStream #

Takes one element and produces one element. A map function that doubles the values of the input stream: 

遍历流中的对象并返回一个新对象

DataStream dataStream = //...
dataStream.map(new MapFunction() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
});

FlatMap 

DataStream → DataStream #

Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

根据用例可以看出flatmap就是将原始流中的对象进行切分或者平摊,返回多个对象cuiyaonan2000@163.com

dataStream.flatMap(new FlatMapFunction() {
    @Override
    public void flatMap(String value, Collector out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

Filter 

DataStream → DataStream #

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

dataStream.filter(new FilterFunction() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
});

KeyBy(很重要可以扩展算子的并行度cuiyaonan2000@163.com)

可以参考The Meaning Of Window And WaterMark On Flink_难得糊涂-CSDN博客  中的窗口创建

DataStream → KeyedStream (此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream。)

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。

dataStream.keyBy(value -> value.getSomeKey());
dataStream.keyBy(value -> value.f0);

Reduce 

KeyedStream → DataStream 

接收根据key分组后的数据,然后进行累计计算并返回.比如value1 与value2 计算的结果value1+value2会与value3进行累计计算cuiyaonan2000@163.com

keyedStream.reduce(new ReduceFunction() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
});

Window 

KeyedStream → WindowedStream 

可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组计算。这里有个关键的点就是,首先根据keys进行了分流,然后在分流的基础上根据时间进行分组,用于计算cuiyaonan2000@163.com

dataStream
  .keyBy(value -> value.f0)
  .window(TumblingEventTimeWindows.of(Time.seconds(5))); 

WindowAll 

DataStream → AllWindowedStream 

Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。

注意:在许多情况下,这是非并行转换。所有记录将收集在windowAll 算子的一个任务中--即算子的并行度为1

dataStream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
Window Apply 

WindowedStream → DataStream 

AllWindowedStream → DataStream 

Apply函数的作用是针对 一个窗口内的数据,进行计算,并返回一个结果.比如我们一个窗口是10s,那apply的重载函数就是处理该10s秒内的数据并返回一个结果值,或者返回多个结果值.cuiyaonan2000@163.com

下面是一个手动求和窗口数据元的函数


//注意apply 接收什么,返回什么都都是通过注解来声明.
//比如下面的Tuple2 指明了窗口期内的对象内容
//Collector指明了返回的类型
//cuiyaonan2000@163.com
windowedStream.apply(new WindowFunction() {
    public void apply (Tuple tuple,
            Window window,
            Iterable values,
            Collector out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction() {
    public void apply (Window window,
            Iterable values,
            Collector out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

WindowReduce 

WindowedStream → DataStream 

对窗口应用 reduce function 并返回 reduce 后的值。---即窗口内的值

windowedStream.reduce (new ReduceFunction() {
    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
        return new Tuple2(value1.f0, value1.f1 + value2.f1);
    }
});

Union 

DataStream* → DataStream 

两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元

dataStream.union(otherStream1, otherStream2, ...);

Window Join 

DataStream,DataStream → DataStream 

根据指定的 key 和窗口 join 两个数据流。

dataStream.join(otherStream)
    .where().equalTo()
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

Interval Join 

KeyedStream,KeyedStream → DataStream 

在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0369s