您当前的位置: 首页 >  flink

Bulut0907

暂无认证

  • 6浏览

    0关注

    346博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Flink DataSet聚合、分区、排序、关联转换算子的使用

Bulut0907 发布时间:2021-10-23 23:18:10 ,浏览量:6

目录
  • 1. 聚合转换算子
    • 1.1 reduce
    • 1.2 aggregate
    • 1.3 distinct
  • 2. 分区转换算子
    • 2.1 partitionByHash、partitionByRange、sortPartition
    • 2.2 mapPartition
  • 3. 排序转换算子
    • 3.1 MinBy / MaxBy、First-N
  • 4. 关联转换算子
    • 4.1 join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cross、coGroup

1. 聚合转换算子 1.1 reduce
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}

object TranformationOperatorTest {
  
  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val input = env.fromElements(1,2,3,4)
    val output = input.reduce((x1, x2) => x1+x2)
    output.print()

  }

}

输出结果:

10
1.2 aggregate
  • 只能作用于Tuple数据类型
package devBase

import org.apache.flink.api.java.aggregation.Aggregations.{SUM,MIN,MAX}
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}

object TranformationOperatorTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val input = env.fromElements(
      ("Liming", 10, 100),
      ("Liming", 11, 110),
      ("Zhangsan", 50, 500),
      ("Zhangsan", 51, 510)
    )
    val output = input.groupBy(0)
      // 对同一字段只能应用一次聚合计算,否则以最后一次聚合计算为准
      .aggregate(SUM,1).and(MIN,2).and(MAX, 2)
    output.print()

  }

}

输出结果:

(Zhangsan,101,510)
(Liming,21,110)
1.3 distinct
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}

object TranformationOperatorTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val input = env.fromElements(
      ("Liming", 10, 100),
      ("Liming", 10, 110),
      ("Liming", 10, 110),
      ("Zhangsan", 50, 510),
      ("Zhangsan", 50, 500),
      ("Zhangsan", 50, 500)
    )

    val output1 = input.distinct()
    output1.print()
    println("====================")
    // 根据键key进行去重, 相同key取第一条数据
    val output2 = input.distinct(0,1)
    output2.print()



  }

}

执行结果:

(Zhangsan,50,500)
(Zhangsan,50,510)
(Liming,10,110)
(Liming,10,100)
====================
(Liming,10,100)
(Zhangsan,50,510)
2. 分区转换算子 2.1 partitionByHash、partitionByRange、sortPartition
package devBase

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}

object TranformationOperatorTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val input = env.fromElements(
      ("Liming", 10, 100),
      ("Liming", 15, 150),
      ("Liming", 13, 130),
      ("Zhangsan", 50, 500),
      ("Zhangsan", 80, 800),
      ("Zhangsan", 60, 600)
    ).setParallelism(2)
    input.printOnTaskManager("input")

    // 按第1个字段进行hash分区
    val output1=input.partitionByHash(0)
    output1.printOnTaskManager("output1")
    // 按第3个字段的值进行范围分区
    val output2=input.partitionByRange("_3")
    output2.printOnTaskManager("output2")
    // 只能对partition内的数据进行排序
    val output3 = input.sortPartition(1, Order.ASCENDING)
      .sortPartition(2,Order.DESCENDING)
    output3.printOnTaskManager("output3")

    env.execute()
  }

}

执行结果:

input:1> (Liming,10,100)
input:1> (Liming,13,130)
input:1> (Zhangsan,80,800)
input:2> (Liming,15,150)
input:2> (Zhangsan,50,500)
input:2> (Zhangsan,60,600)
output1:5> (Liming,10,100)
output1:5> (Liming,15,150)
output1:5> (Liming,13,130)
output1:2> (Zhangsan,50,500)
output1:2> (Zhangsan,80,800)
output1:2> (Zhangsan,60,600)
output3:1> (Liming,10,100)
output3:1> (Liming,13,130)
output3:1> (Zhangsan,80,800)
output3:2> (Liming,15,150)
output3:2> (Zhangsan,50,500)
output3:2> (Zhangsan,60,600)
output2:7> (Zhangsan,80,800)
output2:4> (Zhangsan,50,500)
output2:3> (Liming,15,150)
output2:1> (Liming,10,100)
output2:6> (Zhangsan,60,600)
output2:2> (Liming,13,130)
  • 本示例是在IDEA中执行的,所以结果能直接打印到控制台
  • 同时可以看出,每个Job的执行是异步的
2.2 mapPartition
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector

object TranformationOperatorTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val input = env.fromElements(1, 2)
      .setParallelism(2)
    input.printOnTaskManager("input")

    val output = input.mapPartition((iterator:Iterator[Int], out:Collector[(Int, String)]) => {
      iterator.toArray.foreach(x => out.collect((x, "value"+x)))
      out.collect((3, "value3"))
    })

    output.printOnTaskManager("output")
    env.execute()

  }

}

执行结果:

input:2> 2
input:1> 1
output:2> (2,value2)
output:1> (1,value1)
output:2> (3,value3)
output:1> (3,value3)
3. 排序转换算子 3.1 MinBy / MaxBy、First-N
package devBase

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}

object TranformationOperatorTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val input = env.fromElements(
      ("Liming", 10, 99, 1100),
      ("Liming", 10, 99, 1000),
      ("Liming", 10, 100, 1000),
      ("Liming", 11, 110, 1100),
      ("Zhangsan", 50, 500, 5000),
      ("Zhangsan", 51, 510, 5100),
      ("Zhangsan", 51, 520, 5100),
      ("Zhangsan", 51, 520, 5000)
    )

    // 先分组,然后根据选取的字段,取最小或最大的一条,如果根据选取的字段有多条相同的,则取第一条
    val output_min = input.groupBy(0).minBy(1, 2)
    val output_max = input.groupBy(0).maxBy(1, 2)

    output_min.print()
    println("=====================================")
    output_max.print()

    // 分组,排序,取前N条
    val output_firstN=input.groupBy(0)
      .sortGroup(1, Order.DESCENDING)
      .first(2)
    println("=====================================")
    output_firstN.print()
  }

}

执行结果:

(Zhangsan,50,500,5000)
(Liming,10,99,1100)
=====================================
(Zhangsan,51,520,5100)
(Liming,11,110,1100)
=====================================
(Zhangsan,51,510,5100)
(Zhangsan,51,520,5100)
(Liming,11,110,1100)
(Liming,10,99,1100)
4. 关联转换算子 4.1 join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cross、coGroup
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector

import scala.collection.mutable

object TranformationOperatorTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val join1 = env.fromElements(
      (1, "orange"),(1, "apple"),(2, "fruit2")
    )
    val join2 = env.fromElements(
      (1, 10),(1, 20),(3, 30)
    )

    val result_join = join1.join(join2).where(0).equalTo(0)
    println("===============join====================")
    result_join.print()

    val result_left = join1.leftOuterJoin(join2).where(0).equalTo(0) {
          // 需要对右表的缺失值进行处理
      (left, right) => {
        // 填充的字段值不能为null
        val right_data = if(right == null) (0, 0) else right
        (left, right_data)
      }
    }
    println("===============leftOuterJoin====================")
    result_left.print()


    val result_right = join1.rightOuterJoin(join2).where(0).equalTo(0) {
      // 需要对左表的缺失值进行处理
      (left, right) => {
        // 填充的字段值不能为null
        val left_data = if(left == null) (0, "") else left
        (left_data, right)
      }
    }
    println("===============rightOuterJoin====================")
    result_right.print()

    val result_full = join1.fullOuterJoin(join2).where(0).equalTo(0) {
      // 需要对左右表的缺失值进行处理
      (left, right) => {
        // 填充的字段值不能为null
        val left_data = if(left == null) (0, "") else left
        val right_data = if(right == null) (0, 0) else right
        (left_data, right_data)
      }
    }
    println("===============fullOuterJoin====================")
    result_full.print()

    val result_cross = join1.cross(join2)
    println("===============cross====================")
    result_cross.print()

    // 先按key进行关联,再将左边的放到一个iterator,右边放到另外一个iterator
    // key关联不到的就为空iterator
    val result_coGroup = join1.coGroup(join2).where(0).equalTo(0)
      .apply((left_iterator:Iterator[(Int,String)], right_iterator:Iterator[(Int,Int)], out:Collector[(mutable.Buffer[(Int,String)],mutable.Buffer[(Int,Int)])]) => {
        val left_buffer = left_iterator.toBuffer
        val right_buffer = right_iterator.toBuffer
        out.collect((left_buffer, right_buffer))
      })
    println("===============coGroup====================")
    result_coGroup.print()

  }

}

运行结果:

===============join====================
((1,orange),(1,10))
((1,apple),(1,10))
((1,orange),(1,20))
((1,apple),(1,20))
===============leftOuterJoin====================
((1,orange),(1,10))
((1,orange),(1,20))
((1,apple),(1,10))
((1,apple),(1,20))
((2,fruit2),(0,0))
===============rightOuterJoin====================
((0,),(3,30))
((1,orange),(1,10))
((1,apple),(1,10))
((1,orange),(1,20))
((1,apple),(1,20))
===============fullOuterJoin====================
((0,),(3,30))
((1,orange),(1,10))
((1,orange),(1,20))
((1,apple),(1,10))
((1,apple),(1,20))
((2,fruit2),(0,0))
===============cross====================
((1,orange),(1,10))
((1,orange),(1,20))
((1,orange),(3,30))
((1,apple),(1,10))
((1,apple),(1,20))
((1,apple),(3,30))
((2,fruit2),(1,10))
((2,fruit2),(1,20))
((2,fruit2),(3,30))
===============coGroup====================
(ArrayBuffer(),ArrayBuffer((3,30)))
(ArrayBuffer((1,orange), (1,apple)),ArrayBuffer((1,10), (1,20)))
(ArrayBuffer((2,fruit2)),ArrayBuffer())
关注
打赏
1664501120
查看更多评论
立即登录/注册

微信扫码登录

0.0410s