目录
1. 聚合转换算子
1.1 reduce
- 1. 聚合转换算子
- 1.1 reduce
- 1.2 aggregate
- 1.3 distinct
- 2. 分区转换算子
- 2.1 partitionByHash、partitionByRange、sortPartition
- 2.2 mapPartition
- 3. 排序转换算子
- 3.1 MinBy / MaxBy、First-N
- 4. 关联转换算子
- 4.1 join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cross、coGroup
package devBase
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(1,2,3,4)
val output = input.reduce((x1, x2) => x1+x2)
output.print()
}
}
输出结果:
10
1.2 aggregate
- 只能作用于Tuple数据类型
package devBase
import org.apache.flink.api.java.aggregation.Aggregations.{SUM,MIN,MAX}
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(
("Liming", 10, 100),
("Liming", 11, 110),
("Zhangsan", 50, 500),
("Zhangsan", 51, 510)
)
val output = input.groupBy(0)
// 对同一字段只能应用一次聚合计算,否则以最后一次聚合计算为准
.aggregate(SUM,1).and(MIN,2).and(MAX, 2)
output.print()
}
}
输出结果:
(Zhangsan,101,510)
(Liming,21,110)
1.3 distinct
package devBase
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(
("Liming", 10, 100),
("Liming", 10, 110),
("Liming", 10, 110),
("Zhangsan", 50, 510),
("Zhangsan", 50, 500),
("Zhangsan", 50, 500)
)
val output1 = input.distinct()
output1.print()
println("====================")
// 根据键key进行去重, 相同key取第一条数据
val output2 = input.distinct(0,1)
output2.print()
}
}
执行结果:
(Zhangsan,50,500)
(Zhangsan,50,510)
(Liming,10,110)
(Liming,10,100)
====================
(Liming,10,100)
(Zhangsan,50,510)
2. 分区转换算子
2.1 partitionByHash、partitionByRange、sortPartition
package devBase
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(
("Liming", 10, 100),
("Liming", 15, 150),
("Liming", 13, 130),
("Zhangsan", 50, 500),
("Zhangsan", 80, 800),
("Zhangsan", 60, 600)
).setParallelism(2)
input.printOnTaskManager("input")
// 按第1个字段进行hash分区
val output1=input.partitionByHash(0)
output1.printOnTaskManager("output1")
// 按第3个字段的值进行范围分区
val output2=input.partitionByRange("_3")
output2.printOnTaskManager("output2")
// 只能对partition内的数据进行排序
val output3 = input.sortPartition(1, Order.ASCENDING)
.sortPartition(2,Order.DESCENDING)
output3.printOnTaskManager("output3")
env.execute()
}
}
执行结果:
input:1> (Liming,10,100)
input:1> (Liming,13,130)
input:1> (Zhangsan,80,800)
input:2> (Liming,15,150)
input:2> (Zhangsan,50,500)
input:2> (Zhangsan,60,600)
output1:5> (Liming,10,100)
output1:5> (Liming,15,150)
output1:5> (Liming,13,130)
output1:2> (Zhangsan,50,500)
output1:2> (Zhangsan,80,800)
output1:2> (Zhangsan,60,600)
output3:1> (Liming,10,100)
output3:1> (Liming,13,130)
output3:1> (Zhangsan,80,800)
output3:2> (Liming,15,150)
output3:2> (Zhangsan,50,500)
output3:2> (Zhangsan,60,600)
output2:7> (Zhangsan,80,800)
output2:4> (Zhangsan,50,500)
output2:3> (Liming,15,150)
output2:1> (Liming,10,100)
output2:6> (Zhangsan,60,600)
output2:2> (Liming,13,130)
- 本示例是在IDEA中执行的,所以结果能直接打印到控制台
- 同时可以看出,每个Job的执行是异步的
package devBase
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(1, 2)
.setParallelism(2)
input.printOnTaskManager("input")
val output = input.mapPartition((iterator:Iterator[Int], out:Collector[(Int, String)]) => {
iterator.toArray.foreach(x => out.collect((x, "value"+x)))
out.collect((3, "value3"))
})
output.printOnTaskManager("output")
env.execute()
}
}
执行结果:
input:2> 2
input:1> 1
output:2> (2,value2)
output:1> (1,value1)
output:2> (3,value3)
output:1> (3,value3)
3. 排序转换算子
3.1 MinBy / MaxBy、First-N
package devBase
import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(
("Liming", 10, 99, 1100),
("Liming", 10, 99, 1000),
("Liming", 10, 100, 1000),
("Liming", 11, 110, 1100),
("Zhangsan", 50, 500, 5000),
("Zhangsan", 51, 510, 5100),
("Zhangsan", 51, 520, 5100),
("Zhangsan", 51, 520, 5000)
)
// 先分组,然后根据选取的字段,取最小或最大的一条,如果根据选取的字段有多条相同的,则取第一条
val output_min = input.groupBy(0).minBy(1, 2)
val output_max = input.groupBy(0).maxBy(1, 2)
output_min.print()
println("=====================================")
output_max.print()
// 分组,排序,取前N条
val output_firstN=input.groupBy(0)
.sortGroup(1, Order.DESCENDING)
.first(2)
println("=====================================")
output_firstN.print()
}
}
执行结果:
(Zhangsan,50,500,5000)
(Liming,10,99,1100)
=====================================
(Zhangsan,51,520,5100)
(Liming,11,110,1100)
=====================================
(Zhangsan,51,510,5100)
(Zhangsan,51,520,5100)
(Liming,11,110,1100)
(Liming,10,99,1100)
4. 关联转换算子
4.1 join、leftOuterJoin、rightOuterJoin、fullOuterJoin、cross、coGroup
package devBase
import org.apache.flink.api.scala.{ExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
import scala.collection.mutable
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val join1 = env.fromElements(
(1, "orange"),(1, "apple"),(2, "fruit2")
)
val join2 = env.fromElements(
(1, 10),(1, 20),(3, 30)
)
val result_join = join1.join(join2).where(0).equalTo(0)
println("===============join====================")
result_join.print()
val result_left = join1.leftOuterJoin(join2).where(0).equalTo(0) {
// 需要对右表的缺失值进行处理
(left, right) => {
// 填充的字段值不能为null
val right_data = if(right == null) (0, 0) else right
(left, right_data)
}
}
println("===============leftOuterJoin====================")
result_left.print()
val result_right = join1.rightOuterJoin(join2).where(0).equalTo(0) {
// 需要对左表的缺失值进行处理
(left, right) => {
// 填充的字段值不能为null
val left_data = if(left == null) (0, "") else left
(left_data, right)
}
}
println("===============rightOuterJoin====================")
result_right.print()
val result_full = join1.fullOuterJoin(join2).where(0).equalTo(0) {
// 需要对左右表的缺失值进行处理
(left, right) => {
// 填充的字段值不能为null
val left_data = if(left == null) (0, "") else left
val right_data = if(right == null) (0, 0) else right
(left_data, right_data)
}
}
println("===============fullOuterJoin====================")
result_full.print()
val result_cross = join1.cross(join2)
println("===============cross====================")
result_cross.print()
// 先按key进行关联,再将左边的放到一个iterator,右边放到另外一个iterator
// key关联不到的就为空iterator
val result_coGroup = join1.coGroup(join2).where(0).equalTo(0)
.apply((left_iterator:Iterator[(Int,String)], right_iterator:Iterator[(Int,Int)], out:Collector[(mutable.Buffer[(Int,String)],mutable.Buffer[(Int,Int)])]) => {
val left_buffer = left_iterator.toBuffer
val right_buffer = right_iterator.toBuffer
out.collect((left_buffer, right_buffer))
})
println("===============coGroup====================")
result_coGroup.print()
}
}
运行结果:
===============join====================
((1,orange),(1,10))
((1,apple),(1,10))
((1,orange),(1,20))
((1,apple),(1,20))
===============leftOuterJoin====================
((1,orange),(1,10))
((1,orange),(1,20))
((1,apple),(1,10))
((1,apple),(1,20))
((2,fruit2),(0,0))
===============rightOuterJoin====================
((0,),(3,30))
((1,orange),(1,10))
((1,apple),(1,10))
((1,orange),(1,20))
((1,apple),(1,20))
===============fullOuterJoin====================
((0,),(3,30))
((1,orange),(1,10))
((1,orange),(1,20))
((1,apple),(1,10))
((1,apple),(1,20))
((2,fruit2),(0,0))
===============cross====================
((1,orange),(1,10))
((1,orange),(1,20))
((1,orange),(3,30))
((1,apple),(1,10))
((1,apple),(1,20))
((1,apple),(3,30))
((2,fruit2),(1,10))
((2,fruit2),(1,20))
((2,fruit2),(3,30))
===============coGroup====================
(ArrayBuffer(),ArrayBuffer((3,30)))
(ArrayBuffer((1,orange), (1,apple)),ArrayBuffer((1,10), (1,20)))
(ArrayBuffer((2,fruit2)),ArrayBuffer())