RDD:弹性式分布式数据集当做集合 例如列表 list
现在文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html
中文变量:http://spark.apachecn.org/#/
Flink的文档:https://flink.apache.org/zh/
SparkCore的核心框架的来说 三大抽象的概念:
数据集:RDD
共享变量:Shared Variables
累加器 / 广播变量
1.1 RDD 定义 RDD (Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,partitioned collection of elements that can be operated on in parallel.可以认为RDD是分布式的列表List或数组Array;抽象的数据结构,RDD是一个抽象类AbstractClass和泛型Generic Type:
如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。
方式一:并行化本地集合:将本地集合(Scala中集合对象)数据存储到RDD中。通常用于测试开发:由一个已经存在的cala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。
方式二:加载外部存储系统数据:比如HDFS、LocalFS、HBase、Elasticsearch、Kafka等等。重点掌握:从HDFS加载文本文件数据
其中文件路径:最好是全路径,可以指定文件名称,可以指定文件目录,可以使用通配符指定。实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS bolck的数目的
package com.shanghaiuniversity
import org.apache.spark.{SparkConf, SparkContext}
/**
* spark 采用并行化的方式构建scala集合seq中的数据的为RDD
*/
package object SparkParalizeTest {
def main(args: Array[String]): Unit = {
//构建一个spark Content 对象
val sc: SparkContext = {
// a 创建sparkConf对象
val Sparkconf=new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b 传递sparkConf对象 创建实例
val context=SparkContext.getOrCreate(Sparkconf)
//c 返回实例对象
context
}
//关闭的spark
sc.stop()
}
}
常见的RDD的两种的方式代码
package com.shanghaiuniversity.spark
import org.apache.spark.{SparkConf, SparkContext}
object SparkParalizeTest {
def main(args: Array[String]): Unit = {
//构建一个spark Content 对象
val sc: SparkContext = {
// a 创建sparkConf对象
val Sparkconf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b 传递sparkConf对象 创建实例
val context = SparkContext.getOrCreate(Sparkconf)
//c 返回实例对象
context
}
//TODO :创建一个本地的集合 创建RDD
val seq: Seq[Int] = Seq(1, 2, 3, 4, 5, 6, 7, 8)
/**
* def parallelize[T: ClassTag](
* seq: Seq[T],
* numslices: Int = defaultParallelism 表示的分区数
* ): RDD[T]
*/
val inputRDD = sc.parallelize(seq, numSlices = 2)
//TODO:读取外部的文件的数据的例如HDFS MOngoDB ……
/**
def textFile(
path: string,
minPartitions: Int =defaultMinPartitions
): RDD[String]
*/
val intputpath = ""
sc.textFile(intputpath, minPartitions = 2)
inputRDD.foreach(item => println(item))
//关闭的spark
sc.stop()
}
}
小文件的读取
package com.shanghaiuniversity.source
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 采用的是SparkContextWholeTextFiles()方法读取小文件
* 实际项目中,可以先使用wholeTextFiles方法读取数据T设置适当RDD分区,再将数据保存到文件系统,以便后续应用读取处理,大大提升性能。
*/
object SparkWholeTextFileTest {
def main(args: Array[String]): Unit = {
//构建一个spark Content 对象
val sc: SparkContext = {
// a 创建sparkConf对象
val Sparkconf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
// b 传递sparkConf对象 创建实例
val context = SparkContext.getOrCreate(Sparkconf)
//c 返回实例对象
context
}
/**
* def wholeTextFiles(
* path: String,
* minPartitions: Int = defaultMinPartitions
* ): RDD[(String,String)]
*/
val filepath = ""
//TODO :读取小文件的数据 wholetextFiles函数
val inputRDD: RDD[(String, String)] = sc.wholeTextFiles(filepath, minPartitions = 2)
println(s"RDD 分区数目="${
inputRDD.getNumPartitions
})
//打印样本数据
inputRDD.take(10).foreach(item => println(item))
//关闭spark
sc.stop()
}
}
RDD分区数目
在讲解RDD属性时,多次提到了分区(partition)的概念。分区是一个偏物理层的概念,也是RDD并行计算的核心。数据在RDD内部被切分为多个子集合,每个子集合可以被认为是一个分区,运算逻辑最小会被应用在每一个分区上,每个分区是由一个单独的任务(task)来运行的,所以分区数越多,整个应用的并行度也会越高。
获取RDD分区数目两种方式:
1/方式一:直接获取 rdd . getNumPartitions 1/方式二:通过RDD获取 rdd.partitions.length
RDD分区的数据取决于哪些因素
第一点:RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源;
第二点:在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍;
第三点:RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关系,具体如下说明:
1)、启动的时候指定的CPU核数确定了一个参数值:spark.default.parallelism=指定的CPU核数(集群模式最小2)
2)、对于Scala集合调用parallelize(集合,分区数)方法如果没有指定分区数,就使用spark.default.parallelism如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)
3)、对于textFile(文件,分区数)defaultMinPartitions如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数rdd的分区数。
RDD的函数有一定开发经验的读者应该都使用过多线程,利用多核CPU的并行能力来加快运算速率。在开发并行程序时,可以利用类似Fork/Join的框架将一个大的任务切分成细小的任务,每个小任务模块之间是相互独立的,可以并行执行,然后将所有小任务的结果汇总起来,得到最终的结果。一个非常好的例子便是归并排序。对整个序列进行排序时,可以将序列切分成多个子序列进行排序,然后将排好序的子序列归并起来得到最终的结果。
RDD的基本函数RDD中map、filter、flatMap及foreach等函数为最基本函数,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。map函数:
map(f:T=>U):RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。flatMap函数:
flatMap(f:T=>Seq[u]): RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的 RDD,但是与map 不同,RDD中的每一个元素会被映射成新的О到多个元素(f 函数返回的是一个序列Seq)。filter函数:
filter(f:T=>Bool): RDD[T]=>RDD[T],表示将RDD经由某一函数f后,只保留f返回为true的数据,组成新的RDD。foreach函数:
foreach(func),将函数func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如Redis。关于 foreach,在后续章节中还会使用,到时会详细介绍它的使用方法及注意事项。
saveAsTextFile函数:
saveAsTextFile(path:String),数据集内部的元素会调用其 toString方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS等。
package com.shanghaiuniversity.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用的Spark实现的词频的统计 使用的是的scala语言
*/
object SparkWordCount {
def main(args: Array[String]): Unit = {
//TODO 构建一个spark的对象
val sc: SparkContext = {
//创建一个Spark对象设置应用信息
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setAppName("loacal[2]")
//传递sparkConf对象 创建实例
SparkContext.getOrCreate(sparkConf)
}
//TODO 业务操作
val inputpath = ""
//1 读取数据
val inputRDD: RDD[String] = sc.textFile(inputpath)
//2处理分析数据的 调用的RDD中的transformation函数
val resultRDD: RDD[(String, Int)] = inputRDD
//过滤空数据
.filter(line => null != line && line.trim.length != 0)
//分割单词
.flatMap(line => line.trim.split("\\s+"))
//转为二元组 表示的是每一个单词的出现的次数
.map(word => word -> 1)
//分组聚合
.reduceByKey((tmp, item) => tmp + item)
//3结果数据的输出
resultRDD.foreach(tuple => println(tuple))
//TODO 关闭spark
sc.stop()
}
}
RDD的重要函数
在Spark中的官方给的代码是基本子啊实际中的是不能使用的。
分区操作函数:mapPartitions foreachPartition
map和foreach函数的申明
//map和forach的函数都是的针对的是RDD集合的每一个元素的操作的
每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替、 foreache函数使用foreachPartition代替。
重分区操作函数:repairtition coalesce PairRDDFunctions(调整RDD分区数目,要丕增加分区,要么减少分区,在实际开发中,非常重要。)
场景一:保存结果RDD时,数据量很少,可能分区很多,需要降低分区数目
场景二:从外部存储系统读取数据时,比如读取HBase表数据,默认情况下,一个Region对应一个分区Partition,加上Reion数据是为5GB时,需要增加分区数目
聚合函数 reduce fold函数 无论何种聚合使用同一聚合函数 (分区聚合 /分区间聚合)
在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就是其中聚合函数的使用。
package com.shanghaiuniversity.rdd
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import scala.collection.mutable.ListBuffer
/**
* RDD的聚合函数 如何进行使用以及底层原理
* reduce /fold
* aggregate
* groupBykey / reduceBykey / foldByKey / aggregate /combinBykey
*
*/
object SparkAggTest {
def main(args: Array[String]): Unit = {
//构建Spark Application 应用的入口实例
val sc: SparkContext = {
val sparkConf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.setMaster("local[2]")
SparkContext.getOrCreate(sparkConf)
}
//TODO RDD中的reduce 和fold函数
val dataRDD = sc.parallelize(seq = 1 to 10, numSlices = 2)
dataRDD.foreachPartition { iter =>
val paritionID: Int = TaskContext.getPartitionId()
iter.foreach(iter => println(s"$paritionID"))
}
println("=====================================================")
//使用的reduce函数
val resultRDD = dataRDD.reduce { (tmp, item) =>
val partiionID: Int = TaskContext.getPartitionId()
println(s"$partiionID:tmp =$tmp,item=$item,sum=${tmp + item}")
tmp + item
}
println(s"RDD reduce=$resultRDD")
println("=====================================================")
/**
* def aggregateru: classTag]
* // TOD:表示娶合函数中间临时变量初始值( zeroValue: U)
* (zerovalue:U)
* (
* 分区内数据娶合时使用娶合函数
* seq0p: (U,T) =>U,
* 分区间娶合数据娶合时使用娶合函数
* comb0p: (U,U)=>u
* )
* ):U
*/
/**
* 需求是获取两个分区的最大的数据
*
* 1、定义聚合中间临时变量个数,类型
* ListBuffer
* 2、初始化中间临时变量值
* 空集合
*/
val resultRDD1 = dataRDD.aggregate(new ListBuffer[Int]())(
(tmp: ListBuffer[Int], item: Int) => {
tmp += item
tmp.sorted.takeRight(2)
},
(tmp: ListBuffer[Int], item: ListBuffer[Int]) => {
tmp ++= item
tmp.sorted.takeRight(2)
}
)
println(s"top2:${resultRDD1.toList.mkString(",")}")
//关闭应用
sc.stop()
}
}
PairRDDFunctions 聚合函数
在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数: reduceByKey、groupByKey等。*ByKey函数:将相同Key的Value进行聚合操作的,省去先分组再聚合。
关联函数 :对Key/Value类型RDD进行关联操作,按照key进行关联。
在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。
缓存函数:cache () persist()函数
将RDD数据缓存的时候,需要设置缓存级别StorageLevel,具体值如下:
释放缓存
当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:
def unpersist(blocking: Boolean = true): this.type
此函数属于eager,立即执行。
何时缓存数据
第一点:当某个RDD被使用多次的时慎建议缓存此RDD数据。比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据。
第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据。比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD: etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据 案例:etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)
复习总结RDD数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。
Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。
在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;
RDD的持久化的和缓存的区别: