1.2、启动
1)对应的初始化代码在前面提到的SparkContext类中的主要流程的createTaskScheduler方法中,构建TaskScheduler实例(这里具体子类为TaskSchedulerImpl)后,在该实例的初始化时传入同时构建的SchedulerBackend实例(这里具体子类为LocalBackend)。
2)构建出TaskScheduler实例后,会调用实例的start方法,在该方法中首先会调用SchedulerBackend的start方法。
- LauncherBackend 与 LauncherServer建立连接
- SchedulerBackend 启动
// LauncherBackend 与 LauncherServer建立连接
launcherBackend.connect()
// SchedulerBackend 启动
override def start() {
val rpcEnv = SparkEnv.get.rpcEnv
//创建LocalEndPoint
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
// 通过RpcEnv 创建Endpoint 实际是NettyRpcEnv 向dispatcher注册Endpoint, 并返回NettyRpcEndpointRef, 用于跟Endpoint通信
localEndpoint = rpcEnv.setupEndpoint("LocalSchedulerBackendEndpoint", executorEndpoint)
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
//向Server 发送AppId, 已经更新应用的状态
launcherBackend.setAppId(appId)
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
3)在SchedulerBackend的start方法中,会构建出一个LocalEndpoint实例,在该实例中就会创建一个Executor,Executor实例负责具体的任务执行。
/**
* Calls to [[LocalSchedulerBackend]] are all serialized through LocalEndpoint. Using an
* RpcEndpoint makes the calls on [[LocalSchedulerBackend]] asynchronous, which is necessary
* to prevent deadlock between [[LocalSchedulerBackend]] and the [[TaskSchedulerImpl]].
*/
private[spark] class LocalEndpoint(
override val rpcEnv: RpcEnv,
userClassPath: Seq[URL],
scheduler: TaskSchedulerImpl,
executorBackend: LocalSchedulerBackend,
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {
private var freeCores = totalCores
//Driver
val localExecutorId = SparkContext.DRIVER_IDENTIFIER
val localExecutorHostname = "localhost"
//创建 executor
private val executor = new Executor(
localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true)
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
reviveOffers()
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
}
case KillTask(taskId, interruptThread) =>
executor.killTask(taskId, interruptThread)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case StopExecutor =>
executor.stop()
context.reply(true)
}
def reviveOffers() {
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?