RDD(Resilient Distributed Datasets),弹性分布式数据集,是分布式内存的一个抽象概念。RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。
对开发者而言,RDD可以看作是Spark的一个对象,它本身运行于内存中,如读文件是一个RDD,对文件计算是一个RDD,结果集也是一个RDD ,不同的分片、 数据之间的依赖 、key-value类型的map数据都可以看做RDD。
RDD具备像MapReduce等数据流模型的容错特性,并且允许开发人员在大型集群上执行基于内存的计算。
现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。
为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型(如Pregel)等。
RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称之为转换,如map、filter、groupBy、join。
RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),据此可以从物理存储的数据计算出相应的RDD分区。
RDD作为数据结构,本质上是一个只读的分区记录集合。一个RDD可以包含多个分区,每个分区就是一个dataset片段。
RDD是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据和分区。逻辑上可以认为RDD是一个分布式的集合
Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如MappedRDD、MapPartitionsRDD、ShuffledRDD、ReliableCheckpointRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。
RDD的几种创建方式 从稳定的文件存储系统(比如HDFS、Hive、HBase)中创建RDD,如下://从hdfs文件中创建
JavaRDD textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt");
从本地文件系统的文件中,注意file:后面肯定是至少三个///,四个也行,不能是两个。如果指定第二个参数的话,表示创建的RDD的最小的分区数,如果文件分块的数量大于指定的分区数的话则以文件的分块数量为准 JavaRDD textFileRDD = sc.textFile(“hdfs://master:9999/users/hadoop-twq/word.txt” 2 );
从父RDD转换得到新的RDD。可以经过transformation api从一个已经存在的RDD上创建一个新的RDD,以下是map这个转换apiJavaRDD mapRDD = textFileRDD.map(new Function() {
@Override
public String call(String s) throws Exception {
return s + "test";
}
});
System.out.println("mapRDD = " + mapRDD.collect());
调用SparkContext的parallelize方法将Driver上的数据集并行化,形成分布式的RDD。
从内存中的列表数据创建一个RDD,可以指定RDD的分区数,如果不指定的话,则取所有Executor的所有cores数量
//创建一个单类型的JavaRDD
JavaRDD integerJavaRDD = sc.parallelize(Arrays.asList(1, 2, 3, 3, 4), 2);
System.out.println("integerJavaRDD = " + integerJavaRDD.glom().collect());
//创建一个单类型且类型为Double的JavaRDD
JavaDoubleRDD doubleJavaDoubleRDD = sc.parallelizeDoubles(Arrays.asList(2.0, 3.3, 5.6));
System.out.println("doubleJavaDoubleRDD = " + doubleJavaDoubleRDD.collect());
//创建一个key-value类型的RDD
import scala.Tuple2;
JavaPairRDD javaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("test", 3), new Tuple2("kkk", 3)));
System.out.println("javaPairRDD = " + javaPairRDD.collect());
注:对于第三种情况,scala中还提供了makeRDD api,这个api可以指定创建RDD每一个分区所在的机器。 基于DB(MySQL)、NoSQL(HBase)、S3、数据流创建。
RDD的两种操作算子对于RDD可以有两种计算操作算子:Transformation(变换)与Action(行动)。只有行动(Action)算子才会触发作业(Job)提交。
RDD嵌套是不被支持的,也即不能在一个RDD操作的内部再使用RDD。 在spark中trasaction是不能嵌套的,这是因为并行式函数将以闭包的形式发送至各个worker。若并行式函数使用了rdd的引用,spark将会把当前rdd对象闭包给worker.然而,对rdd对象的执行只能由driver进行,worker并不能执行,所以会导致错误
RDD的五大属性通过 RDD 的内部属性,可以获取相应的元数据信息。通过这些信息可以支持更复杂的算法或优化。
- partitions(分区) 每个RDD都有多个分区,这既是RDD的数据单位, 也是计算粒度。每个分区是由一个Task线程处理。在RDD创建的时候可以指定分区的个数, 如果没有指定, 那么默认分区个数由参数spark.default.parallelism指定(如果未设置这个参数 ,则在yarn或者standalone模式下有如下推导:spark.default.parallelism = max(所有executor使用的core总数,2))。 spark任务计算是以分区为单位,一个分区就对应上一个task线程。 通过val rdd1=sc.textFile(文件) ,如果文件大小的block个数小于等于2,它产生的RDD的分区数就是2;如果文件大小的block个数大于2,它产生的RDD的分区数跟文件的block相同。 通过分区列表可以找到RDD中包含的所有分区及其所在地址。
- partitioner(分区方法) RDD的分区方式,这个属性指的是RDD的partitioner函数(分片函数),分区函数就是将数据分配到指定的分区。这个目前实现了HashPartitioner和RangePartitioner, 只有key-value的RDD才会有分区函数, 否则为none。分区函数不仅决定了当前分区的个数, 同时决定parent shuffle RDD的输出的分区个数. (可选项) 对于kv类型的RDD才会有分区函数(必须要产生shuffle),对于不是kv类型的RDD分区函数是None。 分区函数决定了原始RDD的数据会流入到下面RDD的哪些分区中。 spark的分区函数有2种:第一种hashPartitioner(默认值), 通过 key.hashcode % 分区数=分区号;第二种RangePartitioner,是基于一定的范围进行分区。 由一个函数计算每一个分片 比如: rdd2=rdd1.map(x=>(x,1)) ,这里指的就是每个单词计为1的函数 通过函数可以对每个数据块进行RDD,需要进行的用户自定义函数运算。
- dependencies(依赖关系) Spark的运行过程就是RDD之间的转换过程。 因此, 必须记录RDD之间的生成关系(新RDD是由哪个或哪几个父RDD生成),这就是所谓的依赖关系。这样既有助于阶段和任务的划分,也有助于在某个分区出错的时候,只需要重新计算与当前出错的分区有关的分区,而不需要计算所有的分区。 对父 RDD 的依赖列表。为了能够回溯到父 RDD,为容错等提供支持。 一个RDD会依赖于其他多个RDD,这里就涉及到RDD与RDD之间的依赖关系,后期spark任务的容错机制就是根据这个特性而来。 比如: rdd2=rdd1.map(x=>(x,1)), rdd2的结果是通过rdd1调用了map方法生成,那么rdd2就依赖于rdd1的结果。依赖还具体分为宽依赖和窄依赖,但并不是所有的RDD都有依赖。
- compute(获取分区迭代列表) 计算属性: 当调用 RDD#iterator 方法无法从缓存或 checkpoint 中获取指定 partition 的迭代器时,就需要调用 compute 方法来获取 RDD不仅包含有数据, 还有在数据上的计算, 每个RDD以分区为计算粒度, 每个RDD会实现compute函数, compute函数会和迭代器(RDD之间转换的迭代器)进行复合, 这样就不需要保存每次compute运行的结果. 对 key-value对数据类型 RDD 的分区器,控制分区策略和分区数。通过分区函数可以确定数据记录在各个分区和节点上的分配,减少分布不平衡。
- preferedLocations(优先分配节点列表) 对于分区而言返回数据本地化计算的节点列表。也就是说, 每个RDD会报出一个列表(Seq), 而这个列表保存着分片优先分配给哪个Worker节点计算,spark坚持移动计算而非移动数据的原则,也就是尽量在存储数据的节点上进行计算。 要注意的是,并不是每个 RDD 都有 preferedLocation,比如从 Scala 集合中创建的 RDD 就没有,而从 HDFS 读取的 RDD 就有。 每个数据分区的地址列表(如 HDFS 上的数据块的地址)。如果数据有副本,则通过地址列表可以获知单个数据块的所有副本地址,为负载均衡和容错提供支持。 (可选项) 一组最优的数据块的位置,这里涉及到数据的本地性和数据位置最优。spark后期在进行任务调度的时候,会优先考虑存有数据的worker节点来进行任务的计算。大大减少数据的网络传输,提升性能。 spark 本地化级别:PROCESS_LOCAL=>NODE_LOCAL=>NO_PREF=> RACK_LOCAL=>ANY
- PROCESS_LOCAL进程本地化:task要计算的数据在同一个Executor中
- NODE_LOCAL节点本地化:速度比PROCESS_LOCAL稍慢,因为数据需要在不同进程之间传递或从文件中读取 NODE_PREF没有最佳位置这一说:数据从哪里访问都一样快,不需要位置优先。比如说SparkSQL读取MySql中的数据 RACK_LOCAL机架本地化:数据在同一机架的不同节点上。需要通过网络传输数据及文件IO,比 NODE_LOCAL 慢 ANY跨机架:数据在非同一机架的网络上,速度最慢
RDD可以相互依赖。如果RDD的每个分区最多只能被一个子RDD的一个分区使用,则称之为narrow dependency;若多个子RDD分区都可以依赖,则称之为wide dependency。不同的操作依据其特性,可能会产生不同的依赖。例如map操作会产生narrow dependency,而join操作则产生wide dependency。 窄依赖是一对一的关系,所以可以直接从父分区中获取;宽依赖则不行
- 窄依赖 子RDD分区只由一个或多个父RDD中的一个分区转换而来。map操作就是一个父RDD的一个分区,union操作就是两个父RDD的一个分区。 父 RDD 的 partition 至多被一个子 RDD partition 依赖(OneToOneDependency,RangeDependency)
- 宽依赖: 子RDD的分区由父RDD的所有分区转换而来,即经过过shuffle操作。如reduceByKey,groupByKey等。 父 RDD 的 partition 被多个子 RDD partitions 依赖(ShuffleDependency)
依赖关系的特性: 窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成,且父RDD的计算结果进行hash并传到对方节点上之后才能计算子RDD。 数据丢失时,对于窄依赖,只要重新计算丢失的那一块数据就可以完成恢复;对于宽依赖,则要通过对祖先RDD中的所有数据块全部重新计算来恢复。
对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。
SparkSessionSparkSession为用户提供了一个相对统一的切入点来使用Spark的各项功能(流除外),而不需要显式地创建SparkConf、SparkContext、SQLContext和HiveContext了,因为这些对象已经被封装在了SparkSession中。
SparkContext:
- Spark功能的主要入口点。代表到Spark集群的连接,可以创建RDD、累加器和广播变量
- 每个JVM只能激活一个SparkContext对象,在创建sc之前需要stop掉active的sc。 SparkConf:
- spark配置对象,设置Spark应用各种参数,kv形式。
示例:
//2.0版本之后
SparkSession sparkSession = SparkSession.builder()
.master("local")
.appName("Spark WordCount written by java!")
.getOrCreate();
SparkContext sparkContext = sparkSession.sparkContext();
JavaSparkContext sc = JavaSparkContext.fromSparkContext(sparkContext);
//2.0版本之前
SparkConf conf = new SparkConf()
.setAppName("Spark WordCount written by java!")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);