您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 2浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

spark算子---mapPartitions

宝哥大数据 发布时间:2018-05-11 12:28:19 ,浏览量:2

mapPartitions算子

这里写图片描述

1、首先看下f: Iterator[T] => Iterator[U], Scala的解释器在解析函数参数(function arguments)时有两种方式:
  • 传值调用(call-by-value):先计算参数表达式的值,再应用到函数内部;
  • 传名调用(call-by-name):将未计算的参数表达式直接应用到函数内部

在进入函数内部前,传值调用方式就已经将参数表达式的值计算完毕,而传名调用是在函数内部进行参数表达式的值计算的。

这就造成了一种现象,每次使用传名调用时,解释器都会计算一次表达式的值。

object Test {
   def main(args: Array[String]) {
        delayed(time());
   }

   def time() = {
      println("获取时间,单位为纳秒")
      System.nanoTime
   }
   def delayed( t: => Long ) = {
      println("在 delayed 方法内")
      println("参数: " + t)
      t
   }
}

以上实例中我们声明了 delayed 方法, 该方法在变量名和变量类型使用 => 符号来设置传名调用。执行以上代码,输出结果如下:

$ scalac Test.scala 
$ scala Test
在 delayed 方法内
获取时间,单位为纳秒
参数: 241550840475831
获取时间,单位为纳秒

实例中 delay 方法打印了一条信息表示进入了该方法,接着 delay 方法打印接收到的值,最后再返回 t。

高阶函数
    //===============高阶函数start====================
    /**
     * 高阶函数
     * Scala允许定义高阶函数。它是将其他函数作为参数或其结果是函数的函数。
     * 尝试以下示例程序,apply()函数接受另一个函数f和值v,并将函数f应用于v。
     * 调用:  println( apply( layout, 10) )
     */
     def apply(f: Int => String, v: Int) = f(v)
     def layout[A](x: A) = "[" + x.toString() + "]"
     //===============高阶函数end====================

再来看mapPartitions算子

这里写图片描述

调用

这里写图片描述

传入的又是一个匿名函数

二、map与mapPartitions的区别
  • map: 比如一个partition中有1万条数据;那么你的function要执行和计算1万次。

  • MapPartitions:一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。

如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。

MapPartitions的缺点:一定是有的。

如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下,比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。

所以说普通的map操作通常不会导致内存的OOM异常。

但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。

关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0412s