您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spark之缓存策略

宝哥大数据 发布时间:2017-04-29 09:37:14 ,浏览量:0

一、案例:使用读取文件来测试缓存策略的效率
package com.chb.java;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.repl.SparkCommandLine;

/**
 * 缓存策略
 * @author 12285
 *
 */
public class Test {
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("hc").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		//使用缓存策略与否, 测试效率
		JavaRDD lines = sc.textFile("IDCLog0012201704042133_45376.AVL");

		//第一次Action
		long beginTime = System.currentTimeMillis();
		long count = lines.count();
		System.out.println(count);
		long endTime = System.currentTimeMillis();
		System.out.println("cost " + (endTime - beginTime) + " milliseconds");
		//第二次Action
		beginTime = System.currentTimeMillis();
		count = lines.count();
		System.out.println(count);
		endTime = System.currentTimeMillis();
		System.out.println("cost " + (endTime - beginTime) + " milliseconds");

		sc.close();

	}

}

1.1、不使用缓存策略

JavaRDD lines = sc.textFile("IDCLog0012201704042133_45376.AVL");

###测试结果

#第一次Action用时
142530
cost 383 milliseconds

#第二次Action用时
142530
cost 97 milliseconds
1.2、使用缓存策略.cache()
JavaRDD lines = sc.textFile("IDCLog0012201704042133_45376.AVL").cache();

###测试结果

#第一次Action用时
142530
cost 499 milliseconds
#第二次Action用时
142530
cost 27 milliseconds

两次对比,使用缓存策略的第二次用时27ms , 提高了499/27=18,使用的测试文件是26M, 效率大大提高了。

persist 缓存策略的类StorageLevel

三、RDD 容错 3.1、Lineage(血统)

RDD之间具有依赖关系, 像人类的血缘继承关系

这里写图片描述

  利用内存加快数据加载,在众多的其它的 In-Memory 类数据库或 Cache 类系统中也有实现,Spark的主要区别在于它处理分布式运算环境下的数据容错性(节点实效/数据丢失)问题时采用的方案。为了保证RDD 中数据的鲁棒性,RDD数据集通过所谓的血统关系(Lineage)记住了它是如何从其它RDD中演变过来的。 相比其它系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据转换(Transformation)操作(filter, map, join etc.)行为。当这个RDD的部分分区数据丢失时 ,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制 了Spark的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

3.2、缓存(cache) 与 检查点 (checkpoint)

RDD 的转换操作都是在内存中进行,如果在某个环节失败, 导致某个RDD丢失, 我们根据血统, 向上回溯, 找到所依赖的RDD, 但是内存中的转化是稍纵即逝, 是无法找到的。

解决方法:

  • 1、使用cache
    • 如果我们将她的依赖RDD 缓存到内存中, 出现故障时, 可以根据血统依赖, 找到可能的最近的依赖RDD, 如:wordcount案例中, 我们将linesRDD进行了Cache, 失败的时候,不用再读取文件, 直接再cache中读取linesRDD, 这将省略了读取文件。节约了时间。
    • 新的问题:如果进行缓存的RDD, 丢失了,怎么办? 由于缓存是在内存中进行的, 出现宕机,会导致缓存的RDD丢失
  • 2、Lineage过长 ,对RDD进行doCheckPoint()
    • 由于缓存也可能发生RDD丢失, 所以,将Lineage的某个RDD,将其存储到磁盘中,使用SparkContext.setCheckPointDir() 设置存储数据的路径。
3.2.1、 checkpoint 检查点机制?

应用场景:当spark应用程序特别复杂,从初始的RDD开始到最后整个应用程序完成有很多的步骤,而且整个应用运行时间特别长,这种情况下就比较适合使用checkpoint功能。

原因:对于特别复杂的Spark应用,会出现某个反复使用的RDD,即使之前持久化过但由于节点的故障导致数据丢失了,没有容错机制,所以需要重新计算一次数据。

Checkpoint首先会调用SparkContext.setCheckPointDIR()方法,设置一个容错的文件系统的目录,比如说HDFS;然后对RDD调用checkpoint()方法。之后在RDD所处的job运行结束之后,会启动一个单独的job,来将checkpoint过的RDD数据写入之前设置的文件系统,进行高可用、容错的类持久化操作。

检查点机制是我们在spark streaming中用来保障容错性的主要机制,它可以使spark streaming阶段性的把应用数据存储到诸如HDFS等可靠存储系统中,以供恢复时使用。具体来说基于以下两个目的服务:

  • 控制发生失败时需要重算的状态数。Spark streaming可以通过转化图的谱系图来重算状态,检查点机制则可以控制需要在转化图中回溯多远。

  • 提供驱动器程序容错。如果流计算应用中的驱动器程序崩溃了,你可以重启驱动器程序并让驱动器程序从检查点恢复,这样spark streaming就可以读取之前运行的程序处理数据的进度,并从那里继续。

3.2.2、checkpoint和持久化机制的区别?

最主要的区别在于持久化只是将数据保存在BlockManager中,但是RDD的lineage(血缘关系,依赖关系)是不变的。但是checkpoint执行完之后,rdd已经没有之前所谓的依赖rdd了,而只有一个强行为其设置的checkpointRDD,checkpoint之后rdd的 lineage 就改变了。

持久化的数据丢失的可能性更大,因为节点的故障会导致磁盘、内存的数据丢失。但是checkpoint的数据通常是保存在高可用的文件系统中,比如HDFS中,所以数据丢失可能性比较低

3.2.3、checkpoint 两种方式

在RDD计算,通过checkpoint进行容错,做checkpoint有两种方式

  • checkpoint data
  • logging the updates

用户可以控制采用哪种方式来实现容错,默认是 logging the updates 方式,通过记录跟踪所有生成RDD的转换(transformations)也就是记录每个 RDD lineage(血统)来重新计算 生成丢失的分区数据

3.2.4、checkpoint 用法

// 设置ck目录
sc.setCheckpointDir("hdfs:///tmp/ch")



val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'), (3, 'c'), 
    (4, 'd'), (5, 'e'), (3, 'f'), (2, 'g'), (1, 'h'))
val pairs1 = sc.parallelize(data1, 3)
 
val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'), (3, 'C'), (4, 'D'))
val pairs2 = sc.parallelize(data2, 2)


val result = pairs1.join(pairs2)
result.cache() // 官方建议 checkpoint 之前进行 cache 
result.checkpoint

// 执行checkpoint 发现hdfs中还是没有数据,通过collect然后hdfs就有数据了,说明checkpoint也是个transformation的算子
// 通过action 触发 checkpoint
result.count


从 stage 中发现 两个job

在这里插入图片描述 但是执行的时候相当于走了两次流程,count 的时候前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中

3.4、窄依赖和宽依赖
  • Narrow Dependencies:是指父RDD的每一个分区最多被一个子RDD的分区所用(父分区和子分区一对一),表现为一个父 RDD的分区对应于一个子RDD的分区或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父 RDD的一个分区不可能对应一个子RDD的多个分区。

  • Wide Dependencies: 是指子RDD的分区依赖于父 RDD的多个分区或所有分区,也就是说存在一个父RDD的一个分区对应一个子RDD的多个分区。

    • 对与 Wide Dependencies,这种计算的输入和输出在不同的节点上,lineage方法对与输入节点完好,而输出 节点宕机时,通过重新计算,这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上 其祖先追溯看是否可以重试(这就是lineage,血统的意思),Narrow Dependencies对于数据的重算开 销要远小于Wide Dependencies的数据重算开销。

这里写图片描述

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0458s