您当前的位置: 首页 >  ar

段智华

暂无认证

  • 3浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

段智华 发布时间:2017-06-05 07:29:13 ,浏览量:3

第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结

一:任务调度与资源调度的区别:

l  任务调度是通过DAGScheduler、TaskScheduler、SchedulerBackend等进行的作业调度;

l  资源调度是指应用程序如何获得资源;

l  任务调度是在资源调度的基础上进行的,没有资源调度那么任务调度就成为了无源之水无本之木!

二:资源调度内幕天机解密

(1)因为Master负责资源管理和调度,所以资源调度的方法shedule位于Master.scala这个类中,当注册程序或者资源发生改变的时候都会导致schedule的调用,例如注册程序的时候:

1.             case RegisterApplication(description,driver) =>

2.             // TODO Prevent repeated registrationsfrom some driver

3.             if (state == RecoveryState.STANDBY) {

4.               // ignore, don't send response

5.             } else {

6.               logInfo("Registering app " +description.name)

7.               val app =createApplication(description, driver)

8.               registerApplication(app)

9.               logInfo("Registered app " +description.name + " with ID " + app.id)

10.             persistenceEngine.addApplication(app)

11.             driver.send(RegisteredApplication(app.id,self))

12.             schedule()

13.           }

 

(2)Schedule调用的时机:每次有新的应用程序提交或者集群资源状况发生改变的时候(包括Executor增加或者减少、Worker增加或者减少等);

进入schedule(),schedule为当前等待的应用程序分配可用的资源。每当一个新的应用程序进来的时候,schedule都会被调用。或者资源发生变化的时候(例如Executor挂掉,Worker挂掉,或者新增加机器),schedule都会被调用。schedule():源码如下:

1.             privatedef schedule(): Unit = {

2.           if (state != RecoveryState.ALIVE) {

3.             return

4.           }

5.           // Drivers take strict precedence overexecutors

6.           val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

7.           val numWorkersAlive =shuffledAliveWorkers.size

8.           var curPos = 0

9.           for (driver

= driver.desc.mem && worker.coresFree >= driver.desc.cores) {

19.               launchDriver(worker,driver)

20.               waitingDrivers -=driver

21.               launched = true

22.             }

23.             curPos = (curPos + 1)% numWorkersAlive

24.           }

25.         }

26.         startExecutorsOnWorkers()

27.       }

 

(3)当前Master必须是Alive的方式采用进行资源的调度,如果不是ALIVE的状态会直接返回,也就是StandbyMaster不会进行Application的资源调用!

1.              if (state != RecoveryState.ALIVE) {

2.             return

3.           }

 

(4)接下来通过workers.toSeq.filter(_.state == WorkerState.ALIVE)过滤判断所有Worker中哪些是ALIVE级别的Worker,ALIVE才能够参与资源的分配工作:

1.         valshuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state ==WorkerState.ALIVE))

 

(5)使用Random.shuffle把Master中保留的集群中所有ALIVE级别的Worker的信息随机打乱;Master的schedule()方法中:workers是一个数据结构,打乱workers有利于负载均衡,例如不是以固定的顺序启动launchDriver。WorkerInfo 是Worker注册的时候将信息注册过来。

1.       val workers = newHashSet[WorkerInfo]

2.       …….   

3.        val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))

 WorkerInfo.scala的源码:

1.        private[spark] class WorkerInfo(

2.           val id: String,

3.           val host: String,

4.           val port: Int,

5.           val cores: Int,

6.           val memory: Int,

7.           val endpoint: RpcEndpointRef,

8.           val webUiAddress: String)

9.         extends Serializable {

 我们看一下随机打乱的算法:将Worker的信息传进来,先new出来一个ArrayBuffer,将所有的信息放进去。然后将两个索引位置的内容进行交换。例如:如果有4个Worker,依次分别为第一个Worker至第四个Worker,第一个位置是第1个Worker,第2个位置是第2个Worker,第3个位置是第3个Worker,第4个位置是第4个Worker;通过shuffle以后,现在第一个位置可能是第3个Worker,第2个位置可能是第1个Worker,第3个位置可能是第4个Worker,第4个位置可能是第2个Worker,位置信息打乱。

Random.scala中shuffle方法,其算法内部是循环随机交换所有Worker在Master缓存数据结构中的位置:

1.        def shuffle[T, CC[X]

= app.desc.memoryPerExecutorMB &&

9.                 worker.coresFree >=coresPerExecutor.getOrElse(1))

10.             .sortBy(_.coresFree).reverse

11.           val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

12.      

13.           // Now that we'vedecided how many cores to allocate on each worker, let's allocate them

14.           for (pos 0) {

15.             allocateWorkerResourceToExecutors(

16.               app,assignedCores(pos), coresPerExecutor, usableWorkers(pos))

17.           }

18.         }

19.       }

 

(9)为应用程序具体分配Executor之前要判断应用程序是否还需要分配Core,如果不需要则不会为应用程序分配Executor;

startExecutorsOnWorkers中的coresLeft 是请求的requestedCores和可用的 coresGranted的相减值。例如如果整个程序要求1000个Cores,但是目前集群可用的只有100个Cores,如果coresLeft不为0,就放入等待队列中;如果coresLeft是0那么就不需要调度。

1.         private[master]def coresLeft: Int = requestedCores - coresGranted

 

(10)Master.scala的startExecutorsOnWorkers中,具体分配Executor之前要对要求Worker必须是ALIVE的状态且必须满足Application对每个Executor的内存和Cores的要求,并且在此基础上进行排序产生计算资源由大到小的usableWorkers数据结构:

1.             val usableWorkers =workers.toArray.filter(_.state == WorkerState.ALIVE)

2.               .filter(worker => worker.memoryFree>= app.desc.memoryPerExecutorMB &&

3.                 worker.coresFree >=coresPerExecutor.getOrElse(1))

4.               .sortBy(_.coresFree).reverse

5.       val assignedCores =scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

 

然后是调用scheduleExecutorsOnWorkers, 在FIFO的情况下默认是spreadOutApps来让应用程序尽可能多的运行在所有的Node上:

1.           private val spreadOutApps =conf.getBoolean("spark.deploy.spreadOut", true)

 

 

scheduleExecutorsOnWorker中,minCoresPerExecutor 每个Executor最小分配的core个数。scheduleExecutorsOnWorker源码如下:

1.          private def scheduleExecutorsOnWorkers(

2.             app: ApplicationInfo,

3.             usableWorkers: Array[WorkerInfo],

4.             spreadOutApps: Boolean): Array[Int] = {

5.           val coresPerExecutor =app.desc.coresPerExecutor

6.           val minCoresPerExecutor =coresPerExecutor.getOrElse(1)

7.           val oneExecutorPerWorker =coresPerExecutor.isEmpty

8.           val memoryPerExecutor =app.desc.memoryPerExecutorMB

9.           val numUsable = usableWorkers.length

10.         val assignedCores = newArray[Int](numUsable) // Number of cores to give to each worker

11.         val assignedExecutors =new Array[Int](numUsable) // Number of new executors on each worker

12.         var coresToAssign =math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

13.     ……

 

 

(11) 为应用程序分配Executors有两种方式,第一种方式是尽可能在集群的所有Worker上分配Executor,这种方式往往会带来潜在的更好的数据本地性;第二种方式,尝试运行在尽可能少的Worker上。

(12)具体在集群上分配Cores的时候会尽可能的满足我们的要求:math.min计算最小值: coresToAssig是计算app.coresLeft与可用的Worker中可用的Cores的和的最小值。例如应用程序要求1000个Cores,但整个集群中只有100个Cores,所以只能先分配100个Cores。

scheduleExecutorsOnWorkers方法:

1.            var coresToAssign = math.min(app.coresLeft,usableWorkers.map(_.coresFree).sum)

2.       ……

 

(13)如果是每个Worker下面只能够为当前的应用程序分配一个Executor的话,每次是分配一个Core!scheduleExecutorsOnWorkers方法:

1.              if (oneExecutorPerWorker) {

2.                   assignedExecutors(pos) = 1

3.                 } else {

4.                   assignedExecutors(pos) += 1

5.                 }

 

总结一下2种情况:一个情况是尽可能在一台机器上去运行程序的所有功能,另一种情况尽可能在所有的节点上。无论是哪种情况,每次给Executor增加Cores是增加一个,如果是spreadOutApps的方式,循环一轮再下一轮,例如有4个Worker,第一次为每个Executor启动一个线程,第二次循环分配一个线程,第三次循环再分配一个线程......;

scheduleExecutorsOnWorkers方法:

1.                    while(freeWorkers.nonEmpty) {

2.             freeWorkers.foreach { pos =>

3.               var keepScheduling = true

4.               while (keepScheduling &&canLaunchExecutor(pos)) {

5.                 coresToAssign -= minCoresPerExecutor

6.                 assignedCores(pos) +=minCoresPerExecutor

7.        

8.                 // If we are launching one executorper worker, then every iteration assigns 1 core

9.                 // to the executor. Otherwise, everyiteration assigns cores to a new executor.

10.               if(oneExecutorPerWorker) {

11.                 assignedExecutors(pos) = 1

12.               } else {

13.                 assignedExecutors(pos) += 1

14.               }

15.      

16.               // Spreading out anapplication means spreading out its executors across as

17.               // many workers aspossible. If we are not spreading out, then we should keep

18.               // schedulingexecutors on this worker until we use all of its resources.

19.               // Otherwise, justmove on to the next worker.

20.               if (spreadOutApps) {

21.                 keepScheduling =false

22.               }

23.             }

24.           }

 

回到Master.scala的startExecutorsOnWorkers,现在已经决定每个 worker分配多少个cores ,那进行资源分配:

1.                   for (pos 0) {

2.               allocateWorkerResourceToExecutors(

3.                 app, assignedCores(pos), coresPerExecutor,usableWorkers(pos))

4.             }

allocateWorkerResourceToExecutors源码如下:

1.            privatedef allocateWorkerResourceToExecutors(

2.             app: ApplicationInfo,

3.             assignedCores: Int,

4.             coresPerExecutor: Option[Int],

5.             worker: WorkerInfo): Unit = {

6.           // If the number of cores per executor isspecified, we divide the cores assigned

7.           // to this worker evenly among theexecutors with no remainder.

8.           // Otherwise, we launch a single executorthat grabs all the assignedCores on this worker.

9.           val numExecutors = coresPerExecutor.map {assignedCores / _ }.getOrElse(1)

10.         val coresToAssign =coresPerExecutor.getOrElse(assignedCores)

11.         for (i

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.1267s