package com.chb.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkCommandLine;
/**
* 缓存策略
* @author 12285
*
*/
public class Test {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("hc").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//使用缓存策略与否, 测试效率
JavaRDD lines = sc.textFile("IDCLog0012201704042133_45376.AVL");
//第一次Action
long beginTime = System.currentTimeMillis();
long count = lines.count();
System.out.println(count);
long endTime = System.currentTimeMillis();
System.out.println("cost " + (endTime - beginTime) + " milliseconds");
//第二次Action
beginTime = System.currentTimeMillis();
count = lines.count();
System.out.println(count);
endTime = System.currentTimeMillis();
System.out.println("cost " + (endTime - beginTime) + " milliseconds");
sc.close();
}
}
1.1、不使用缓存策略
JavaRDD lines = sc.textFile("IDCLog0012201704042133_45376.AVL");
###测试结果
#第一次Action用时
142530
cost 383 milliseconds
#第二次Action用时
142530
cost 97 milliseconds
1.2、使用缓存策略.cache()
JavaRDD lines = sc.textFile("IDCLog0012201704042133_45376.AVL").cache();
###测试结果
#第一次Action用时
142530
cost 499 milliseconds
#第二次Action用时
142530
cost 27 milliseconds
两次对比,使用缓存策略的第二次用时27ms , 提高了499/27=18,使用的测试文件是26M, 效率大大提高了。
persist 缓存策略的类StorageLevel
三、RDD 容错 3.1、Lineage(血统)RDD之间具有依赖关系, 像人类的血缘继承关系
利用内存加快数据加载,在众多的其它的 In-Memory 类数据库或 Cache 类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD 中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时 ,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制 了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。
3.2、缓存(cache) 与 检查点 (checkpoint)RDD 的转换操作都是在内存中进行,如果在某个环节失败, 导致某个RDD丢失, 我们根据血统, 向上回溯, 找到所依赖的RDD, 但是内存中的转化是稍纵即逝, 是无法找到的。
解决方法:
- 1、使用cache
- 如果我们将她的依赖RDD 缓存到内存中, 出现故障时, 可以根据血统依赖, 找到可能的最近的依赖RDD, 如:wordcount案例中, 我们将linesRDD进行了Cache, 失败的时候,不用再读取文件, 直接再cache中读取linesRDD, 这将省略了读取文件。节约了时间。
- 新的问题:如果进行缓存的RDD, 丢失了,怎么办? 由于缓存是在内存中进行的, 出现宕机,会导致缓存的RDD丢失
- 2、Lineage过长 ,对RDD进行doCheckPoint()
- 由于缓存也可能发生RDD丢失, 所以,将Lineage的某个RDD,将其存储到磁盘中,使用
SparkContext.setCheckPointDir()
设置存储数据的路径。
- 由于缓存也可能发生RDD丢失, 所以,将Lineage的某个RDD,将其存储到磁盘中,使用
应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。
原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。
Checkpoint首先会调用SparkContext.setCheckPointDIR()
方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。
检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:
-
控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。
-
提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续。
最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的 lineage 就改变了。
持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低
3.2.3、checkpoint 两种方式在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式
checkpoint data
logging the updates
用户可以控制采用哪种方式来实现容错,默认是 logging the updates
方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个 RDD lineage(血统)来重新计算 生成丢失的分区数据
// 设置ck目录
sc.setCheckpointDir("hdfs:///tmp/ch")
val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'),
(4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)
val result = pairs1.join(pairs2)
result.cache() // 官方建议 checkpoint 之前进行 cache
result.checkpoint
// 执行checkpoint 发现hdfs中还是没有数据,通过collect然后hdfs就有数据了,说明checkpoint也是个transformation的算子
// 通过action 触发 checkpoint
result.count
从 stage 中发现 两个job
但是执行的时候相当于走了两次流程,count 的时候前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中
-
Narrow Dependencies:是指父RDD的每一个分区最多被一个子RDD的分区所用(父分区和子分区一对一),表现为一个父 RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父 RDD的一个分区不可能对应一个子RDD的多个分区。
-
Wide Dependencies: 是指子RDD的分区依赖于父 RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。
- 对与 Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出 节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上 其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开 销要远小于Wide Dependencies的数据重算开销。