目录
1. 多流转换算子
1.1 union
- 1. 多流转换算子
- 1.1 union
- 1.2 connect
- 2. 键控流转换算子
- 2.1 keyBy和min、max、sum、minBy、maxBy
- 2.2 keyBy和reduce
- 3. 窗口转换算子
- 3.1 WindowedStream.apply
- 3.2 AllWindowedStream.apply
- 3.3 DataStream.coGroup(DataStream)
- 4. 连接转换算子
- 4.1 窗口连接
- 4.2 间隔连接
- 5. 物理分区算子
package devBase
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input1 = senv.fromElements(("Liming",16), ("Zhangsan", 30))
val input2 = senv.fromElements(("Zhaosi",40), ("Wangwu", 58))
val output = input1.union(input2)
output.print()
senv.execute()
}
}
执行结果:
3> (Wangwu,58)
4> (Zhangsan,30)
2> (Zhaosi,40)
3> (Liming,16)
- 支持union多个DataStream:
def union(dataStreams: DataStream[T]*): DataStream[T] = ......
package devBase
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input1 = senv.fromElements(("Liming",16), ("Zhangsan", 30))
val input2 = senv.fromElements(("Zhaosi","Beijing"), ("Wangwu", "Shanghai"))
// 虽然在同一个流,但两个源数据流的数据格式没变
val connectedStream = input1.connect(input2)
// 将流中的数据格式变成一样
val output = connectedStream.map(
// 如果是input1的数据,用此转换函数
input1_stream => (input1_stream._1, input1_stream._2.toString),
// 如果是input2的数据,用此转换函数
input2_stream => (input2_stream._1, input2_stream._2)
)
output.print()
senv.execute()
}
}
执行结果:
2> (Liming,16)
1> (Wangwu,Shanghai)
8> (Zhaosi,Beijing)
3> (Zhangsan,30)
2. 键控流转换算子
2.1 keyBy和min、max、sum、minBy、maxBy
package devBase
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.fromElements(
("Liming",20, 21),
("Liming",10, 11),
("Liming",30, 31),
("Zhangsan", 200, 210),
("Zhangsan", 100, 110),
("Zhangsan", 300, 310)
)
val output_min = input.keyBy(tuple => tuple._1).min(1)
output_min.print("output_min")
/* 执行结果如下:
output_min:8> (Zhangsan,200,210)
output_min:8> (Zhangsan,100,210)
output_min:2> (Liming,20,21)
output_min:8> (Zhangsan,100,210)
output_min:2> (Liming,10,21)
output_min:2> (Liming,10,21)
*/
val output_max = input.keyBy(tuple => tuple._1).max(1)
output_max.print("output_max")
/* 执行结果如下:
output_max:8> (Zhangsan,200,210)
output_max:2> (Liming,20,21)
output_max:8> (Zhangsan,200,210)
output_max:2> (Liming,20,21)
output_max:8> (Zhangsan,300,210)
output_max:2> (Liming,30,21)
*/
val output_sum = input.keyBy(tuple => tuple._1).sum(1)
output_sum.print("output_sum")
/* 执行结果如下:
output_sum:2> (Liming,20,21)
output_sum:8> (Zhangsan,200,210)
output_sum:2> (Liming,30,21)
output_sum:2> (Liming,60,21)
output_sum:8> (Zhangsan,300,210)
output_sum:8> (Zhangsan,600,210)
*/
val output_minBy = input.keyBy(tuple => tuple._1).minBy(1)
output_minBy.print("output_minBy")
/* 执行结果如下:
output_minBy:8> (Zhangsan,200,210)
output_minBy:2> (Liming,20,21)
output_minBy:8> (Zhangsan,100,110)
output_minBy:2> (Liming,10,11)
output_minBy:2> (Liming,10,11)
output_minBy:8> (Zhangsan,100,110)
*/
val output_maxBy = input.keyBy(tuple => tuple._1).maxBy(1)
output_maxBy.print("output_maxBy")
/* 执行结果如下:
output_maxBy:8> (Zhangsan,200,210)
output_maxBy:2> (Liming,20,21)
output_maxBy:8> (Zhangsan,200,210)
output_maxBy:2> (Liming,20,21)
output_maxBy:8> (Zhangsan,300,310)
output_maxBy:2> (Liming,30,31)
*/
senv.execute()
}
}
- min / max / sum根据选取的字段,迭代求最小 / 最大 / 求和, 其它字段选第一行
- minBy / maxBy
- 根据选取的字段,迭代求最小 / 最大
- 其它字段,选第1步迭代求到最小 / 最大的那一行
package devBase
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.fromElements(
("Liming",20, 21),
("Liming",10, 11),
("Liming",30, 31),
("Zhangsan", 200, 210),
("Zhangsan", 100, 110),
("Zhangsan", 300, 310)
)
val output_min = input.keyBy(tuple => tuple._1).reduce(
(v1, v2) => (v1._1, v1._2 + v2._2, v1._3 + v2._3)
)
output_min.print()
senv.execute()
}
}
执行结果如下:
8> (Zhangsan,200,210)
2> (Liming,20,21)
8> (Zhangsan,300,320)
2> (Liming,30,32)
2> (Liming,60,63)
8> (Zhangsan,600,630)
3. 窗口转换算子
3.1 WindowedStream.apply
package devBase
import apiTest.WordSourceFunction
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.addSource(new WordSourceFunction())
val output = input.map(word => (word, 1))
.keyBy(tuple => tuple._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
.apply(new WindowFunction[(String, Int), (String, Int), String, TimeWindow] {
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
var count = 0
for (tuple (table,1)
8> (stream,1)
7> (flink,1)
7> (batch,2)
4> (sql,2)
5> (world,1)
1> (table,1)
3> (hello,2)
8> (stream,2)
5> (world,3)
7> (batch,1)
4> (sql,3)
8> (stream,1)
3> (hello,2)
4> (sql,1)
5> (world,2)
3> (hello,1)
8> (stream,3)
7> (batch,1)
7> (flink,2)
......省略部分......
- 1.13版本不用执行
senv.setStreamTimeCharacteristic
, 因为默认就是EventTime, 且processing time、ingestion time也可以在该时间语义下工作
package devBase
import apiTest.WordSourceFunction
import org.apache.flink.streaming.api.scala.function.{AllWindowFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.addSource(new WordSourceFunction())
val output = input.map(word => (word, 1))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
.apply(new AllWindowFunction[(String, Int), (String,Int),TimeWindow] {
override def apply(window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
var window_word = ""
var count = 0
for (tuple (stream,9)
3> (world,10)
4> (batch,10)
5> (stream,10)
6> (hello,10)
7> (flink,10)
......省略部分......
3.3 DataStream.coGroup(DataStream)
package devBase
import apiTest.WordSourceFunction
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.lang
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input = senv.addSource(new WordSourceFunction())
val output = input.map(word => (word, 1))
.coGroup(
input.map(word => (word, word + "666"))
).where(tuple => tuple._1).equalTo(tuple => tuple._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10L)))
.apply(new CoGroupFunction[(String, Int),(String, String), String] {
override def coGroup(first: lang.Iterable[(String, Int)], second: lang.Iterable[(String, String)], out: Collector[String]): Unit = {
val first_iter = first.iterator()
val second_iter = second.iterator()
while(first_iter.hasNext) {out.collect(first_iter.next()._1)}
while(second_iter.hasNext) {out.collect(second_iter.next()._2)}
}
})
output.print()
senv.execute()
}
}
执行结果:
7> batch
7> batch
7> batch666
7> batch666
3> hello
3> hello666
8> stream
......省略部分......
4. 连接转换算子
4.1 窗口连接
package devBase
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = {
fdf.parse(element._3).getTime
}
}
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
var maxTimestamp: Long = _
val maxOutofOrderness = 0
override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = {
maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
}
}
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = {
new RecordTimestampAssigner()
}
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = {
new PeriodWatermarkGenerator()
}
}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input1 = senv.fromElements(
("A", 110, "2021-09-01 08:00:11"),
("A", 120, "2021-09-01 08:00:12"),
("B", 130, "2021-09-01 08:00:13"),
("B", 140, "2021-09-01 08:00:14"),
("C", 210, "2021-09-01 08:00:21"),
("C", 220, "2021-09-01 08:00:22"),
("D", 310, "2021-09-01 08:00:31"),
("D", 320, "2021-09-01 08:00:32")
).assignTimestampsAndWatermarks(new MyWatermarkStrategy())
val input2 = senv.fromElements(
("A", 110, "2021-09-01 08:00:11"),
("A", 120, "2021-09-01 08:00:12"),
("B", 130, "2021-09-01 08:00:13"),
("B", 140, "2021-09-01 08:00:14"),
("C", 210, "2021-09-01 08:00:21")
).assignTimestampsAndWatermarks(new MyWatermarkStrategy())
val output = input1.join(input2)
.where(event => event._1).equalTo((event => event._1))
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.apply((left: (String, Int, String), right: (String, Int, String), collector: Collector[(String, Int, String)]) => {
val maxDatetime = if (left._3 > right._3) left._3 else right._3
collector.collect(left._1, left._2 + right._2, maxDatetime)
})
output.print()
senv.execute()
}
}
执行结果:
7> (A,220,2021-09-01 08:00:11)
7> (A,230,2021-09-01 08:00:12)
7> (A,230,2021-09-01 08:00:12)
7> (A,240,2021-09-01 08:00:12)
2> (B,260,2021-09-01 08:00:13)
2> (B,270,2021-09-01 08:00:14)
2> (B,270,2021-09-01 08:00:14)
2> (B,280,2021-09-01 08:00:14)
2> (C,420,2021-09-01 08:00:21)
2> (C,430,2021-09-01 08:00:22)
- 对于键B,在窗口[08:00:10, 08:00:20)中,apply函数的所有结果元素的timestamp为最大的timestamp 08:00:14
- 滑动窗口:
SlidingEventTimeWindows.of(Time.seconds(10L),Time.seconds(1L))
- 会话窗口:
EventTimeSessionWindows.withGap(Time.seconds(10L))
package devBase
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.flink.api.common.eventtime._
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
class RecordTimestampAssigner extends TimestampAssigner[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
override def extractTimestamp(element: (String, Int, String), recordTimestamp: Long): Long = {
fdf.parse(element._3).getTime
}
}
class PeriodWatermarkGenerator extends WatermarkGenerator[(String, Int, String)] {
val fdf = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
var maxTimestamp: Long = _
val maxOutofOrderness = 0
override def onEvent(event: (String, Int, String), eventTimestamp: Long, output: WatermarkOutput): Unit = {
maxTimestamp = math.max(fdf.parse(event._3).getTime, maxTimestamp)
}
override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(maxTimestamp - maxOutofOrderness - 1))
}
}
class MyWatermarkStrategy extends WatermarkStrategy[(String, Int, String)] {
override def createTimestampAssigner(context: TimestampAssignerSupplier.Context): TimestampAssigner[(String, Int, String)] = {
new RecordTimestampAssigner()
}
override def createWatermarkGenerator(context: WatermarkGeneratorSupplier.Context): WatermarkGenerator[(String, Int, String)] = {
new PeriodWatermarkGenerator()
}
}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
val input1 = senv.fromElements(
("A", 110, "2021-09-01 08:00:11"),
("A", 120, "2021-09-01 08:00:12"),
("B", 130, "2021-09-01 08:00:13"),
("B", 140, "2021-09-01 08:00:14"),
("C", 210, "2021-09-01 08:00:21"),
("C", 220, "2021-09-01 08:00:22"),
("D", 310, "2021-09-01 08:00:31"),
("D", 320, "2021-09-01 08:00:32")
).assignTimestampsAndWatermarks(new MyWatermarkStrategy())
val input2 = senv.fromElements(
("A", 110, "2021-09-01 08:00:11"),
("A", 120, "2021-09-01 08:00:12"),
("B", 130, "2021-09-01 08:00:13"),
("B", 140, "2021-09-01 08:00:14"),
("C", 210, "2021-09-01 08:00:21")
).assignTimestampsAndWatermarks(new MyWatermarkStrategy())
val output = input1.keyBy(_._1)
.intervalJoin(input2.keyBy(_._1))
.between(Time.seconds(-2L), Time.seconds(2L))
.lowerBoundExclusive().upperBoundExclusive()
.process(new ProcessJoinFunction[(String, Int, String), (String, Int, String), (String, Int, String)] {
override def processElement(in1: (String, Int, String), in2: (String, Int, String), context: ProcessJoinFunction[(String, Int, String), (String, Int, String), (String, Int, String)]#Context, collector: Collector[(String, Int, String)]): Unit = {
val maxDatetime = if (in1._3 > in2._3) in1._3 else in2._3
collector.collect((in1._1, in1._2 + in2._2, maxDatetime))
}
})
output.print()
senv.execute()
}
}
执行结果
2> (B,260,2021-09-01 08:00:13)
2> (B,270,2021-09-01 08:00:14)
2> (B,270,2021-09-01 08:00:14)
2> (B,280,2021-09-01 08:00:14)
7> (A,220,2021-09-01 08:00:11)
2> (C,420,2021-09-01 08:00:21)
7> (A,230,2021-09-01 08:00:12)
7> (A,230,2021-09-01 08:00:12)
7> (A,240,2021-09-01 08:00:12)
2> (C,430,2021-09-01 08:00:22)
- 连接条件是:left_key == right_key && left_timestamp - 2 < right_timestamp < left_timestamp + 2,默认是包含上下界的
- 只支持EventTime模式
- processElement函数的结果元素timestamp为in1、in2的最大timestamp,即
ProcessJoinFunction.Context.getTimestamp
函数的返回结果
package devBase
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
class StringHashPartitioner extends Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int = {
(key.hashCode) % numPartitions
}
}
object TranformationOperatorTest {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
println("parallelism: " + senv.getParallelism) // parallelism: 8
val input = senv.fromElements(("key1","value1"),("key2","value2"),("key3","value3"))
input.print("input")
/*
input:5> (key1,value1)
input:7> (key3,value3)
input:6> (key2,value2)
*/
val partitionCustom_output = input.partitionCustom(new StringHashPartitioner(),tuple => tuple._1)
partitionCustom_output.print("partitionCustom_output")
/*
partitionCustom_output:5> (key3,value3)
partitionCustom_output:3> (key1,value1)
partitionCustom_output:4> (key2,value2)
*/
val shuffle_output = input.shuffle
shuffle_output.print("shuffle_output")
/*
shuffle_output:8> (key2,value2)
shuffle_output:7> (key3,value3)
shuffle_output:3> (key1,value1)
*/
val rebalance_output = input.rebalance
rebalance_output.print("rebalance_output")
/*
rebalance_output:1> (key2,value2)
rebalance_output:8> (key1,value1)
rebalance_output:2> (key3,value3)
*/
val rescale_output = input.rescale
rescale_output.print("rescale_output")
/*
rescale_output:1> (key1,value1)
rescale_output:2> (key2,value2)
rescale_output:3> (key3,value3)
*/
val broadcast_output = input.broadcast
broadcast_output.print("broadcast_output")
/*
broadcast_output:1> (key1,value1)
broadcast_output:1> (key2,value2)
broadcast_output:1> (key3,value3)
broadcast_output:8> (key1,value1)
broadcast_output:8> (key2,value2)
broadcast_output:8> (key3,value3)
......省略16条输出......
*/
senv.execute()
}
}
- shuffle将元素进行随机分区,可能会导致某些分区随机的数据量多
- rebalance进行轮询分区,不用计算随机数,性能更好
- rescale只在同一服务器进行轮询分区,不产生网络传输
- broadcast将元素进行广播,例如dataStreamA有3条数据,senv环境有8个分区,则广播后有24条数据