您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ContextCleaner

宝哥大数据 发布时间:2019-04-13 15:59:24 ,浏览量:1

ContextCleaner用于清理超出应用范围的RDD、ShuffleDependency和Broadcast对象。
/**
 * Classes that represent cleaning tasks.
 */
private sealed trait CleanupTask
private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
private case class CleanAccum(accId: Long) extends CleanupTask
private case class CleanCheckpoint(rddId: Int) extends CleanupTask
1.1、创建与启动

        _cleaner =
                if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
                    Some(new ContextCleaner(this))
                } else {
                    None
                }
        _cleaner.foreach(_.start())
1.2、ContextCleaner的组成

ContextCleaner的工作原理, 也是采用监听器模式, 有线程来处理

        /**
         * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
         * have not been handled by the reference queue.
         * 缓存AnyRef的虚引用
         */
        private val referenceBuffer =
                Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)

        //缓存顶级的AnyRef引用
        private val referenceQueue = new ReferenceQueue[AnyRef]

        //缓存清理工作的监听器数组
        private val listeners = new ConcurrentLinkedQueue[CleanerListener]()

        //具体清理共组线程
        // 实际调用的是keepCleaning方法
        private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
1.2.1、keepCleaning方法
        /** Keep cleaning RDD, shuffle, and broadcast state. */
        private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
            while (!stopped) {
                try {
                    val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
                            .map(_.asInstanceOf[CleanupTaskWeakReference])
                    // Synchronize here to avoid being interrupted on stop()
                    synchronized {
                        reference.foreach { ref =>
                            logDebug("Got cleaning task " + ref.task)
                            referenceBuffer.remove(ref)
                            ref.task match {
                                case CleanRDD(rddId) =>
                                    doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
                                case CleanShuffle(shuffleId) =>
                                    doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
                                case CleanBroadcast(broadcastId) =>
                                    doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
                                case CleanAccum(accId) =>
                                    doCleanupAccum(accId, blocking = blockOnCleanupTasks)
                                case CleanCheckpoint(rddId) =>
                                    doCleanCheckpoint(rddId)
                            }
                        }
                    }
                } catch {
                    case ie: InterruptedException if stopped => // ignore
                    case e: Exception => logError("Error in cleaning thread", e)
                }
            }
        }
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0402s