第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
一:任务调度与资源调度的区别:
l 任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度;
l 资源调度是指应用程序如何获得资源;
l 任务调度是在资源调度的基础上进行的,没有资源调度那么任务调度就成为了无源之水无本之木!
二:资源调度内幕天机解密
(1)因为Master负责资源管理和调度,所以资源调度的方法shedule位于Master.scala这个类中,当注册程序或者资源发生改变的时候都会导致schedule的调用,例如注册程序的时候:
1. case RegisterApplication(description,driver) =>
2. // TODO Prevent repeated registrationsfrom some driver
3. if (state == RecoveryState.STANDBY) {
4. // ignore, don't send response
5. } else {
6. logInfo("Registering app " +description.name)
7. val app =createApplication(description, driver)
8. registerApplication(app)
9. logInfo("Registered app " +description.name + " with ID " + app.id)
10. persistenceEngine.addApplication(app)
11. driver.send(RegisteredApplication(app.id,self))
12. schedule()
13. }
(2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等);
进入schedule(),schedule为当前等待的应用程序分配可用的资源。每当一个新的应用程序进来的时候,schedule都会被调用。或者资源发生变化的时候(例如Executor挂掉,Worker挂掉,或者新增加机器),schedule都会被调用。schedule():源码如下:
1. privatedef schedule(): Unit = {
2. if (state != RecoveryState.ALIVE) {
3. return
4. }
5. // Drivers take strict precedence overexecutors
6. val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
7. val numWorkersAlive =shuffledAliveWorkers.size
8. var curPos = 0
9. for (driver
= driver.desc.mem && worker.coresFree >= driver.desc.cores) {19. launchDriver(worker,driver)
20. waitingDrivers -=driver
21. launched = true
22. }
23. curPos = (curPos + 1)% numWorkersAlive
24. }
25. }
26. startExecutorsOnWorkers()
27. }
(3)当前Master必须是Alive的方式采用进行资源的调度,如果不是ALIVE的状态会直接返回,也就是StandbyMaster不会进行Application的资源调用!
1. if (state != RecoveryState.ALIVE) {
2. return
3. }
(4)接下来通过workers.toSeq.filter(_.state == WorkerState.ALIVE)过滤判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作:
1. valshuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state ==WorkerState.ALIVE))
(5)使用Random.shuffle把Master中保留的集群中所有ALIVE级别的Worker的信息随机打乱;Master的schedule()方法中:workers是一个数据结构,打乱workers有利于负载均衡,例如不是以固定的顺序启动launchDriver。WorkerInfo 是Worker注册的时候将信息注册过来。
1. val workers = newHashSet[WorkerInfo]
2. …….
3. val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
WorkerInfo.scala的源码:
1. private[spark] class WorkerInfo(
2. val id: String,
3. val host: String,
4. val port: Int,
5. val cores: Int,
6. val memory: Int,
7. val endpoint: RpcEndpointRef,
8. val webUiAddress: String)
9. extends Serializable {
我们看一下随机打乱的算法:将Worker的信息传进来,先new出来一个ArrayBuffer,将所有的信息放进去。然后将两个索引位置的内容进行交换。例如:如果有4个Worker,依次分别为第一个Worker至第四个Worker,第一个位置是第1个Worker,第2个位置是第2个Worker,第3个位置是第3个Worker,第4个位置是第4个Worker;通过shuffle以后,现在第一个位置可能是第3个Worker,第2个位置可能是第1个Worker,第3个位置可能是第4个Worker,第4个位置可能是第2个Worker,位置信息打乱。
Random.scala中shuffle方法,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置:
1. def shuffle[T, CC[X]
= app.desc.memoryPerExecutorMB &&9. worker.coresFree >=coresPerExecutor.getOrElse(1))
10. .sortBy(_.coresFree).reverse
11. val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
12.
13. // Now that we'vedecided how many cores to allocate on each worker, let's allocate them
14. for (pos 0) {
15. allocateWorkerResourceToExecutors(
16. app,assignedCores(pos), coresPerExecutor, usableWorkers(pos))
17. }
18. }
19. }
(9)为应用程序具体分配Executor之前要判断应用程序是否还需要分配Core,如果不需要则不会为应用程序分配Executor;
startExecutorsOnWorkers中的coresLeft 是请求的requestedCores和可用的 coresGranted的相减值。例如如果整个程序要求1000个Cores,但是目前集群可用的只有100个Cores,如果coresLeft不为0,就放入等待队列中;如果coresLeft是0那么就不需要调度。
1. private[master]def coresLeft: Int = requestedCores - coresGranted
(10)Master.scala的startExecutorsOnWorkers中,具体分配Executor之前要对要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序产生计算资源由大到小的usableWorkers数据结构:
1. val usableWorkers =workers.toArray.filter(_.state == WorkerState.ALIVE)
2. .filter(worker => worker.memoryFree>= app.desc.memoryPerExecutorMB &&
3. worker.coresFree >=coresPerExecutor.getOrElse(1))
4. .sortBy(_.coresFree).reverse
5. val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
然后是调用scheduleExecutorsOnWorkers, 在FIFO的情况下默认是spreadOutApps来让应用程序尽可能多的运行在所有的Node上:
1. private val spreadOutApps =conf.getBoolean("spark.deploy.spreadOut", true)
scheduleExecutorsOnWorker中,minCoresPerExecutor 每个Executor最小分配的core个数。scheduleExecutorsOnWorker源码如下:
1. private def scheduleExecutorsOnWorkers(
2. app: ApplicationInfo,
3. usableWorkers: Array[WorkerInfo],
4. spreadOutApps: Boolean): Array[Int] = {
5. val coresPerExecutor =app.desc.coresPerExecutor
6. val minCoresPerExecutor =coresPerExecutor.getOrElse(1)
7. val oneExecutorPerWorker =coresPerExecutor.isEmpty
8. val memoryPerExecutor =app.desc.memoryPerExecutorMB
9. val numUsable = usableWorkers.length
10. val assignedCores = newArray[Int](numUsable) // Number of cores to give to each worker
11. val assignedExecutors =new Array[Int](numUsable) // Number of new executors on each worker
12. var coresToAssign =math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
13. ……
(11) 为应用程序分配Executors有两种方式,第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的更好的数据本地性;第二种方式,尝试运行在尽可能少的Worker上。
(12)具体在集群上分配Cores的时候会尽可能的满足我们的要求:math.min计算最小值: coresToAssig是计算app.coresLeft与可用的Worker中可用的Cores的和的最小值。例如应用程序要求1000个Cores,但整个集群中只有100个Cores,所以只能先分配100个Cores。
scheduleExecutorsOnWorkers方法:
1. var coresToAssign = math.min(app.coresLeft,usableWorkers.map(_.coresFree).sum)
2. ……
(13)如果是每个Worker下面只能够为当前的应用程序分配一个Executor的话,每次是分配一个Core!scheduleExecutorsOnWorkers方法:
1. if (oneExecutorPerWorker) {
2. assignedExecutors(pos) = 1
3. } else {
4. assignedExecutors(pos) += 1
5. }
总结一下2种情况:一个情况是尽可能在一台机器上去运行程序的所有功能,另一种情况尽可能在所有的节点上。无论是哪种情况,每次给Executor增加Cores是增加一个,如果是spreadOutApps的方式,循环一轮再下一轮,例如有4个Worker,第一次为每个Executor启动一个线程,第二次循环分配一个线程,第三次循环再分配一个线程......;
scheduleExecutorsOnWorkers方法:
1. while(freeWorkers.nonEmpty) {
2. freeWorkers.foreach { pos =>
3. var keepScheduling = true
4. while (keepScheduling &&canLaunchExecutor(pos)) {
5. coresToAssign -= minCoresPerExecutor
6. assignedCores(pos) +=minCoresPerExecutor
7.
8. // If we are launching one executorper worker, then every iteration assigns 1 core
9. // to the executor. Otherwise, everyiteration assigns cores to a new executor.
10. if(oneExecutorPerWorker) {
11. assignedExecutors(pos) = 1
12. } else {
13. assignedExecutors(pos) += 1
14. }
15.
16. // Spreading out anapplication means spreading out its executors across as
17. // many workers aspossible. If we are not spreading out, then we should keep
18. // schedulingexecutors on this worker until we use all of its resources.
19. // Otherwise, justmove on to the next worker.
20. if (spreadOutApps) {
21. keepScheduling =false
22. }
23. }
24. }
回到Master.scala的startExecutorsOnWorkers,现在已经决定每个 worker分配多少个cores ,那进行资源分配:
1. for (pos 0) {
2. allocateWorkerResourceToExecutors(
3. app, assignedCores(pos), coresPerExecutor,usableWorkers(pos))
4. }
allocateWorkerResourceToExecutors源码如下:
1. privatedef allocateWorkerResourceToExecutors(
2. app: ApplicationInfo,
3. assignedCores: Int,
4. coresPerExecutor: Option[Int],
5. worker: WorkerInfo): Unit = {
6. // If the number of cores per executor isspecified, we divide the cores assigned
7. // to this worker evenly among theexecutors with no remainder.
8. // Otherwise, we launch a single executorthat grabs all the assignedCores on this worker.
9. val numExecutors = coresPerExecutor.map {assignedCores / _ }.getOrElse(1)
10. val coresToAssign =coresPerExecutor.getOrElse(assignedCores)
11. for (i
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?