您当前的位置: 首页 >  大数据

庄小焱

暂无认证

  • 0浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

大数据云计算——SparkCore核心

庄小焱 发布时间:2020-11-04 20:14:03 ,浏览量:0

SparkCore的深入学习 RDD基本的理解

RDD:弹性式分布式数据集当做集合  例如列表 list

现在文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html 

中文变量:http://spark.apachecn.org/#/

Flink的文档:https://flink.apache.org/zh/

SparkCore的核心框架的来说 三大抽象的概念:

数据集:RDD

共享变量:Shared Variables

累加器 / 广播变量

RDD的创建和使用

1.1 RDD 定义 RDD (Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,partitioned collection of elements that can be operated on in parallel.可以认为RDD是分布式的列表List或数组Array;抽象的数据结构,RDD是一个抽象类AbstractClass和泛型Generic Type:

如何将数据封装到RDD集合中,主要有两种方式:并行化本地集合(Driver Program中)和引用加载外部存储系统(如HDFS、Hive、HBase、Kafka、Elasticsearch等)数据集。

方式一:并行化本地集合:将本地集合(Scala中集合对象)数据存储到RDD中。通常用于测试开发:由一个已经存在的cala 集合创建,集合并行化,集合必须时Seq本身或者子类对象。

方式二:加载外部存储系统数据:比如HDFS、LocalFS、HBase、Elasticsearch、Kafka等等。重点掌握:从HDFS加载文本文件数据

其中文件路径:最好是全路径,可以指定文件名称,可以指定文件目录,可以使用通配符指定。实际项目中如果从HDFS读取海量数据,应用运行在YARN上,默认情况下,RDD分区数目等于HDFS bolck的数目的

package com.shanghaiuniversity

import org.apache.spark.{SparkConf, SparkContext}

/**
 * spark 采用并行化的方式构建scala集合seq中的数据的为RDD
 */
package object SparkParalizeTest {
  def main(args: Array[String]): Unit = {
    //构建一个spark Content 对象
    val sc: SparkContext = {
      // a 创建sparkConf对象
      val Sparkconf=new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b 传递sparkConf对象 创建实例
      val  context=SparkContext.getOrCreate(Sparkconf)
      //c 返回实例对象
      context
    }
    //关闭的spark
    sc.stop()
  }
}

常见的RDD的两种的方式代码

package com.shanghaiuniversity.spark

import org.apache.spark.{SparkConf, SparkContext}

object SparkParalizeTest {
  def main(args: Array[String]): Unit = {
    //构建一个spark Content 对象
    val sc: SparkContext = {
      // a 创建sparkConf对象
      val Sparkconf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b 传递sparkConf对象 创建实例
      val context = SparkContext.getOrCreate(Sparkconf)
      //c 返回实例对象
      context
    }

    //TODO :创建一个本地的集合 创建RDD
    val seq: Seq[Int] = Seq(1, 2, 3, 4, 5, 6, 7, 8)

    /**
     * def parallelize[T: ClassTag](
     * seq: Seq[T],
     * numslices: Int = defaultParallelism   表示的分区数
     * ): RDD[T]
     */
    val inputRDD = sc.parallelize(seq, numSlices = 2)

    //TODO:读取外部的文件的数据的例如HDFS MOngoDB ……
    /**
    def textFile(
       path: string,
       minPartitions: Int =defaultMinPartitions
    ): RDD[String]
    */
    val intputpath = ""
    sc.textFile(intputpath, minPartitions = 2)
    inputRDD.foreach(item => println(item))

    //关闭的spark
    sc.stop()
  }
}

小文件的读取

package com.shanghaiuniversity.source

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 采用的是SparkContextWholeTextFiles()方法读取小文件
 * 实际项目中,可以先使用wholeTextFiles方法读取数据T设置适当RDD分区,再将数据保存到文件系统,以便后续应用读取处理,大大提升性能。
 */
object SparkWholeTextFileTest {
  def main(args: Array[String]): Unit = {
    //构建一个spark Content 对象
    val sc: SparkContext = {
      // a 创建sparkConf对象
      val Sparkconf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      // b 传递sparkConf对象 创建实例
      val context = SparkContext.getOrCreate(Sparkconf)
      //c 返回实例对象
      context
    }

    /**
     * def wholeTextFiles(
     * path: String,
     * minPartitions: Int = defaultMinPartitions
     * ): RDD[(String,String)]
     */
    val filepath = ""
    //TODO :读取小文件的数据  wholetextFiles函数
    val inputRDD: RDD[(String, String)] = sc.wholeTextFiles(filepath, minPartitions = 2)
    println(s"RDD 分区数目="${
      inputRDD.getNumPartitions
    })

    //打印样本数据
    inputRDD.take(10).foreach(item => println(item))
    //关闭spark
    sc.stop()
  }
}

RDD分区数目

在讲解RDD属性时,多次提到了分区(partition)的概念。分区是一个偏物理层的概念,也是RDD并行计算的核心。数据在RDD内部被切分为多个子集合,每个子集合可以被认为是一个分区,运算逻辑最小会被应用在每一个分区上,每个分区是由一个单独的任务(task)来运行的,所以分区数越多,整个应用的并行度也会越高。

获取RDD分区数目两种方式:

1/方式一:直接获取           rdd . getNumPartitions 1/方式二:通过RDD获取   rdd.partitions.length

RDD分区的数据取决于哪些因素

第一点:RDD分区的原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以充分利用CPU的计算资源;

第二点:在实际中为了更加充分的压榨CPU的计算资源,会把并行度设置为cpu核数的2~3倍;

第三点:RDD分区数和启动时指定的核数、调用方法时指定的分区数、如文件本身分区数有关系,具体如下说明:

1)、启动的时候指定的CPU核数确定了一个参数值:spark.default.parallelism=指定的CPU核数(集群模式最小2)

2)、对于Scala集合调用parallelize(集合,分区数)方法如果没有指定分区数,就使用spark.default.parallelism如果指定了就使用指定的分区数(不要指定大于spark.default.parallelism)

3)、对于textFile(文件,分区数)defaultMinPartitions如果没有指定分区数sc.defaultMinPartitions=min(defaultParallelism,2)如果指定了就使用指定的分区数sc.defaultMinPartitions=指定的分区数rdd的分区数。

RDD的函数

有一定开发经验的读者应该都使用过多线程,利用多核CPU的并行能力来加快运算速率。在开发并行程序时,可以利用类似Fork/Join的框架将一个大的任务切分成细小的任务,每个小任务模块之间是相互独立的,可以并行执行,然后将所有小任务的结果汇总起来,得到最终的结果。一个非常好的例子便是归并排序。对整个序列进行排序时,可以将序列切分成多个子序列进行排序,然后将排好序的子序列归并起来得到最终的结果。

RDD的基本函数

RDD中map、filter、flatMap及foreach等函数为最基本函数,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。map函数:

               map(f:T=>U):RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。flatMap函数:

             flatMap(f:T=>Seq[u]): RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的 RDD,但是与map 不同,RDD中的每一个元素会被映射成新的О到多个元素(f 函数返回的是一个序列Seq)。filter函数:

             filter(f:T=>Bool): RDD[T]=>RDD[T],表示将RDD经由某一函数f后,只保留f返回为true的数据,组成新的RDD。foreach函数:

             foreach(func),将函数func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如Redis。关于 foreach,在后续章节中还会使用,到时会详细介绍它的使用方法及注意事项。

saveAsTextFile函数:

             saveAsTextFile(path:String),数据集内部的元素会调用其 toString方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS等。

package com.shanghaiuniversity.rdd

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 使用的Spark实现的词频的统计 使用的是的scala语言
 */
object SparkWordCount {
  def main(args: Array[String]): Unit = {
    //TODO 构建一个spark的对象
    val sc: SparkContext = {
      //创建一个Spark对象设置应用信息
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setAppName("loacal[2]")
      //传递sparkConf对象 创建实例
      SparkContext.getOrCreate(sparkConf)
    }
    //TODO 业务操作
    val inputpath = ""
    //1 读取数据
    val inputRDD: RDD[String] = sc.textFile(inputpath)
    //2处理分析数据的 调用的RDD中的transformation函数
    val resultRDD: RDD[(String, Int)] = inputRDD
      //过滤空数据
      .filter(line => null != line && line.trim.length != 0)
      //分割单词
      .flatMap(line => line.trim.split("\\s+"))
      //转为二元组 表示的是每一个单词的出现的次数
      .map(word => word -> 1)
      //分组聚合
      .reduceByKey((tmp, item) => tmp + item)
    //3结果数据的输出
    resultRDD.foreach(tuple => println(tuple))

    //TODO 关闭spark
    sc.stop()
  }
}
RDD的重要函数

在Spark中的官方给的代码是基本子啊实际中的是不能使用的。

分区操作函数:mapPartitions foreachPartition

map和foreach函数的申明

//map和forach的函数都是的针对的是RDD集合的每一个元素的操作的

每个RDD由多分区组成的,实际开发建议对每个分区数据的进行操作,map函数使用mapPartitions代替、 foreache函数使用foreachPartition代替。

重分区操作函数:repairtition   coalesce PairRDDFunctions(调整RDD分区数目,要丕增加分区,要么减少分区,在实际开发中,非常重要。)

场景一:保存结果RDD时,数据量很少,可能分区很多,需要降低分区数目

场景二:从外部存储系统读取数据时,比如读取HBase表数据,默认情况下,一个Region对应一个分区Partition,加上Reion数据是为5GB时,需要增加分区数目

聚合函数 reduce  fold函数   无论何种聚合使用同一聚合函数 (分区聚合 /分区间聚合)

在数据分析领域中,对数据聚合操作是最为关键的,在Spark框架中各个模块使用时,主要就是其中聚合函数的使用。

package com.shanghaiuniversity.rdd

import org.apache.spark.{SparkConf, SparkContext, TaskContext}

import scala.collection.mutable.ListBuffer

/**
 * RDD的聚合函数 如何进行使用以及底层原理
 * reduce /fold
 * aggregate
 * groupBykey / reduceBykey / foldByKey / aggregate  /combinBykey
 *
 */
object SparkAggTest {
  def main(args: Array[String]): Unit = {
    //构建Spark Application 应用的入口实例
    val sc: SparkContext = {
      val sparkConf: SparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
        .setMaster("local[2]")
      SparkContext.getOrCreate(sparkConf)
    }

    //TODO RDD中的reduce 和fold函数
    val dataRDD = sc.parallelize(seq = 1 to 10, numSlices = 2)
    dataRDD.foreachPartition { iter =>
      val paritionID: Int = TaskContext.getPartitionId()
      iter.foreach(iter => println(s"$paritionID"))
    }
    println("=====================================================")
    //使用的reduce函数
    val resultRDD = dataRDD.reduce { (tmp, item) =>
      val partiionID: Int = TaskContext.getPartitionId()
      println(s"$partiionID:tmp =$tmp,item=$item,sum=${tmp + item}")
      tmp + item
    }
    println(s"RDD reduce=$resultRDD")
    println("=====================================================")
    /**
     * def aggregateru: classTag]
     * // TOD:表示娶合函数中间临时变量初始值( zeroValue: U)
     * (zerovalue:U)
     * (
     * 分区内数据娶合时使用娶合函数
     * seq0p: (U,T) =>U,
     * 分区间娶合数据娶合时使用娶合函数
     * comb0p: (U,U)=>u
     * )
     * ):U
     */
    /**
     * 需求是获取两个分区的最大的数据
     *
     * 1、定义聚合中间临时变量个数,类型
     * ListBuffer
     * 2、初始化中间临时变量值
     * 空集合
     */
    val resultRDD1 = dataRDD.aggregate(new ListBuffer[Int]())(
      (tmp: ListBuffer[Int], item: Int) => {
        tmp += item
        tmp.sorted.takeRight(2)
      },
      (tmp: ListBuffer[Int], item: ListBuffer[Int]) => {
        tmp ++= item
        tmp.sorted.takeRight(2)
      }
    )

    println(s"top2:${resultRDD1.toList.mkString(",")}")

    //关闭应用
    sc.stop()
  }
}

PairRDDFunctions 聚合函数

在Spark中有一个object对象PairRDDFunctions,主要针对RDD的数据类型是Key/Value对的数据提供函数,方便数据分析处理。比如使用过的函数: reduceByKey、groupByKey等。*ByKey函数:将相同Key的Value进行聚合操作的,省去先分组再聚合。

关联函数 :对Key/Value类型RDD进行关联操作,按照key进行关联。

RDD得持久化

在实际开发中某些RDD的计算或转换可能会比较耗费时间,如果这些RDD后续还会频繁的被使用到,那么可以将这些RDD进行持久化/缓存,这样下次再使用到的时候就不用再重新计算了,提高了程序运行的效率。

缓存函数:cache ()  persist()函数 

将RDD数据缓存的时候,需要设置缓存级别StorageLevel,具体值如下:

释放缓存

当缓存的RDD数据,不再被使用时,考虑释资源,使用如下函数:

def unpersist(blocking: Boolean = true): this.type

此函数属于eager,立即执行。

何时缓存数据

第一点:当某个RDD被使用多次的时慎建议缓存此RDD数据。比如,从HDFS上读取网站行为日志数据,进行多维度的分析,最好缓存数据。

第二点:当某个RDD来之不易,并且使用不止一次,建议缓存此RDD数据。比如,从HBase表中读取历史订单数据,与从MySQL表中商品和用户维度信息数据,进行关联Join等聚合操作,获取RDD: etlRDD,后续的报表分析使用此RDD,此时建议缓存RDD数据 案例:etlRDD.persist(StoageLeval.MEMORY_AND_DISK_2)

复习总结

RDD的checkpoint

RDD数据可以持久化,但是持久化/缓存可以把数据放在内存中,虽然是快速的,但是也是最不可靠的;也可以把数据放在磁盘上,也不是完全可靠的!例如磁盘会损坏等。

Checkpoint的产生就是为了更加可靠的数据持久化,在Checkpoint的时候一般把数据放在在HDFS上,这就天然的借助了HDFS天生的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

在Spark Core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;

RDD的持久化的和缓存的区别:

 

关注
打赏
1657692713
查看更多评论
立即登录/注册

微信扫码登录

0.0452s