您当前的位置: 首页 > 

宝哥大数据

暂无认证

  • 0浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

LocalSchedulerBackend

宝哥大数据 发布时间:2019-04-12 09:27:39 ,浏览量:0

1.2、启动

1)对应的初始化代码在前面提到的SparkContext类中的主要流程的createTaskScheduler方法中,构建TaskScheduler实例(这里具体子类为TaskSchedulerImpl)后,在该实例的初始化时传入同时构建的SchedulerBackend实例(这里具体子类为LocalBackend)。

2)构建出TaskScheduler实例后,会调用实例的start方法,在该方法中首先会调用SchedulerBackend的start方法。

  • LauncherBackend 与 LauncherServer建立连接
  • SchedulerBackend 启动

        // LauncherBackend 与 LauncherServer建立连接
        launcherBackend.connect()

        // SchedulerBackend 启动
        override def start() {
            val rpcEnv = SparkEnv.get.rpcEnv

            //创建LocalEndPoint
            val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)

            // 通过RpcEnv 创建Endpoint 实际是NettyRpcEnv 向dispatcher注册Endpoint, 并返回NettyRpcEndpointRef, 用于跟Endpoint通信
            localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)

            listenerBus.post(SparkListenerExecutorAdded(
                System.currentTimeMillis,
                executorEndpoint.localExecutorId,
                new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))

            //向Server 发送AppId, 已经更新应用的状态
            launcherBackend.setAppId(appId)
            launcherBackend.setState(SparkAppHandle.State.RUNNING)
        }

3)在SchedulerBackend的start方法中,会构建出一个LocalEndpoint实例,在该实例中就会创建一个Executor,Executor实例负责具体的任务执行。

        /**
         * Calls to [[LocalSchedulerBackend]] are all serialized through LocalEndpoint. Using an
         * RpcEndpoint makes the calls on [[LocalSchedulerBackend]] asynchronous, which is necessary
         * to prevent deadlock between [[LocalSchedulerBackend]] and the [[TaskSchedulerImpl]].
         */
        private[spark] class LocalEndpoint(
                override val rpcEnv: RpcEnv,
                userClassPath: Seq[URL],
                scheduler: TaskSchedulerImpl,
                executorBackend: LocalSchedulerBackend,
                private val totalCores: Int)
                extends ThreadSafeRpcEndpoint with Logging {

            private var freeCores = totalCores

            //Driver
            val localExecutorId = SparkContext.DRIVER_IDENTIFIER
            val localExecutorHostname = "localhost"

            //创建 executor
            private val executor = new Executor(
                localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)

            override def receive: PartialFunction[Any, Unit] = {
                case ReviveOffers =>
                    reviveOffers()

                case StatusUpdate(taskId, state, serializedData) =>
                    scheduler.statusUpdate(taskId, state, serializedData)
                    if (TaskState.isFinished(state)) {
                        freeCores += scheduler.CPUS_PER_TASK
                        reviveOffers()
                    }

                case KillTask(taskId, interruptThread) =>
                    executor.killTask(taskId, interruptThread)
            }

            override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
                case StopExecutor =>
                    executor.stop()
                    context.reply(true)
            }

            def reviveOffers() {
                val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
                for (task             
关注
打赏
1587549273
查看更多评论
0.0371s