您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 3浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

reduceByKey和groupByKey的区别

宝哥大数据 发布时间:2018-08-04 10:19:17 ,浏览量:3

reduceByKey

reduceByKey,相较于普通的shuffle操作(比如groupByKey),它的一个特点,就是说,会进行map端的本地聚合。

对map端给下个stage每个task创建的输出文件中,写数据之前,就会进行本地的combiner操作,也就是说对每一个key,对应的values,都会执行你的算子函数(_+_)

这里写图片描述

1.1、用reduceByKey对性能的提升:

1、在本地进行聚合以后,在map端的数据量就变少了,减少磁盘IO。而且可以减少磁盘空间的占用。 2、下一个stage,拉取数据的量,也就变少了。减少网络的数据传输的性能消耗。 3、在reduce端进行数据缓存的内存占用变少了。 4、reduce端,要进行聚合的数据量也变少了。

1.2、PairRDDFunctions.scala文件中reduceByKey源码
/**
 * Merge the values for each key using an associative reduce function. This will also perform
 * the merging locally on each mapper before sending results to a reducer, similarly to a
 * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
 * parallelism level.
 */
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
  reduceByKey(defaultPartitioner(self), func)
}
groupByKey

groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。同时如果数据量十分大,可能还会造成OutOfMemoryError。


/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * Note: This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
 * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.
 *
 * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].
 */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
  // groupByKey shouldn't use map side combine because map side combine does not
  // reduce the amount of data shuffled and requires all map side data be inserted
  // into a hash table, leading to more objects in the old gen.
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKey[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0964s