目录
1. Streaming模式
1.1 代码Demo
- 1. Streaming模式
- 1.1 代码Demo
- 1.2 flink run命令
- 2. Batch模式
- 2.1 代码Demo
- 2.2 flink run命令
package apiTest
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation}
object StreamWordCount {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val ds: DataStream[(String, Int)] = senv.addSource(new WordSourceFunction())
.map(word => (word, 1))
.keyBy(_._1)
.sum(1)
ds.print("ds")
// 懒加载,调用execute执行
senv.execute("StreamWordCount")
}
}
执行结果
ds:7> (flink,1)
ds:3> (hello,1)
ds:1> (table,1)
ds:1> (table,2)
ds:7> (batch,1)
ds:7> (flink,2)
ds:3> (hello,2)
ds:3> (hello,3)
....省略部分......
1.2 flink run命令
bin/flink run -Dexecution.runtime-mode=STREAMING examples/streaming/WordCount.jar
2. Batch模式
2.1 代码Demo
package apiTest
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment,createTypeInformation}
object BatchWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
val text: DataStream[String] = env.fromElements("flink batch demo", "batch demo", "demo")
val ds: DataStream[(String, Int)] = text.flatMap(_.split(" "))
.map(word => (word, 1))
.keyBy(_._1)
.sum(1)
ds.print("ds")
env.execute("BatchWordCount")
}
}
执行结果:
ds:7> (batch,2)
ds:7> (flink,1)
ds:3> (demo,3)
2.2 flink run命令
bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar