还有视频讲解在我的B站-宝哥chbxw, 希望大家可以支持一下,谢谢。
一、开发环境
1.1、编译环境
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
1.8
1.8
UTF-8
compile
compile
net.alchim31.maven
scala-maven-plugin
3.2.1
scala-compile-first
process-resources
add-source
compile
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
jar-with-dependencies
make-assembly
package
single
1.2、Fink依赖
开发 Flink 应用程序需要最低限度的 API 依赖。最低的依赖库包括:flink-scala 和flink-streaming-scala
。大多数应用需要依赖特定的连接器或其他类库,例如 Kafka 的连接器、TableAPI、CEP 库等。这些不是 Flink核心依赖的一部分,因此必须作为依赖项手动添加到应用程序中
org.apache.flink
flink-scala_2.11
1.10.1
org.apache.flink
flink-streaming-scala_2.11
1.10.1
二、Flink流处理
package com.chb.flink
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* 基于流计算的WordCount案例
* 案例需求:采用 Netcat 数据源发送数据,使用 Flink 统计每个单词的数量。
* 1、初始化Flink Streaming 上下文环境
2、导入隐式转换
3、读取数据
4、数据的转换与处理
5、结果的打印
6、 启动流式处理程序
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {
//初始化Flink的Streaming(流计算)上下文执行环境
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.streaming.api.scala._
//读取数据
val stream: DataStream[String] = streamEnv.socketTextStream("10.0.0.201", 8888)
//转换计算
val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
.map((_, 1))
.keyBy(0)
.sum(1)
//打印结果到控制台
result.print()
//启动流式处理,如果没有该行代码上面的程序不会运行
streamEnv.execute("wordcount")
}
}
三、Flink批处理
在批处理中,Flink有Spark Core相比没有什么优势,所以在接下来的课程中,相对弱化Flink批处理的功能
package com.chb.flink
import java.net.URL
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* 需求:读取本地数据文件,统计文件中每个单词出现的次数。
* 根据需求,很明显是有界流(批计算),所以采用另外一个上下文环境:ExecutionEnvironment
*/
object BatchWordCount {
def main(args: Array[String]): Unit = {
//初始化flink的环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题
import org.apache.flink.api.scala._ //读取数据
val dataURL = getClass.getResource("/wc.txt") //wc.txt文件在main目录下的resources中
val data: DataSet[String] = env.readTextFile(dataURL.getPath)
//计算
val result: AggregateDataSet[(String, Int)] = data.flatMap(_.split(" "))
.map((_, 1))
.groupBy(0) //其中0代表元组中的下标,“0”下标代表:单词
.sum(1) //其中1代表元组中的下标,“1”下标代表:单词出现的次数
//打印结果
result.print()
}
}