为什么谈到Heron,因为峰值的问题,很多流处理系统不能很好的进行处理。现在我们考虑Spark Streaming,在资源分配的时候如果按照高峰的峰值进行分配,是粗粒度的,在预分配的时候造成资源的浪费,在低峰值的时候导致大量的浪费。在另外一方面,随着Spark Streaming本身不断的运行,对资源的消耗管理也是我们要考虑的因素,这里我们谈Spark Streaming资源动态申请和动态控制消费速率是高级别的特性,特性的实现对Spark Streaming的运行非常重要的。Spark Streaming本身是基于Spark Core,Spark Core的核心是Spark Context。Spark现在支持资源的动态分配。这里有个配置参数spark.dynamicAllocation.enabled,是否需要开启资源的动态分配,在程序运行的时候进行设置。如果支持动态分配,使用ExecutorAllocationManager,传入参数有ExecutorAllocationClient、listenerBus、_conf
Spark现在支持资源的动态分配。这里有个配置参数spark.dynamicAllocation.enabled,是否需要开启资源的动态分配,在程序运行的时候进行设置。如果支持动态分配,使用ExecutorAllocationManager,传入参数有ExecutorAllocationClient、listenerBus、_conf。
SparkContext.scala的源代码:
1. val dynamicAllocationEnabled =Utils.isDynamicAllocationEnabled(_conf)
2. _executorAllocationManager =
3. if(dynamicAllocationEnabled) {
4. schedulerBackend match {
5. case b:ExecutorAllocationClient =>
6. Some(newExecutorAllocationManager(
7. schedulerBackend.asInstanceOf[ExecutorAllocationClient],listenerBus, _conf))
8. case _ =>
9. None
10. }
11. } else {
12. None
13. }
14. _executorAllocationManager.foreach(_.start())
15. ……
16. Utils.scala的源代码:
17. def isDynamicAllocationEnabled(conf:SparkConf): Boolean = {
18. val dynamicAllocationEnabled =conf.getBoolean("spark.dynamicAllocation.enabled", false)
19. dynamicAllocationEnabled &&
20. (!isLocalMaster(conf) ||conf.getBoolean("spark.dynamicAllocation.testing", false))
21. }
ExecutorAllocationManager是根据工作负载动态分配和删除executors 的代理。ExecutorAllocationManager保持目标数量的executors,周期性的同步到集群管理。以配置的初始值开始,根据挂起和正在运行的任务的数量进行变化。当当前目标超过当前处理负载的需要时,减少executors 执行器的目标数量,executors 执行器的目标数目减少,可以立即运行所有当前正在运行和正在等待的任务。如果积压任务等待调度响应,那么增加executors的目标数。如果调度队列在N秒内没有耗尽,则添加新的执行器executors。如果队列持续了M秒钟,需添加更多的executors等。每一轮增加的数量从上一轮的指数增长,直到达到上限。上限基于配置的属性和如上所描述的当前的运行和待处理的任务。指数增长的原因有两方面:
(1)在开始的情况下Executors 应缓慢增加,需要额外的Executors的数量变小。否则,
我们可以添加更多的Executors ,而不是需要稍后删除它们。
(2)Executors 应迅速增加。随着时间的推移,Executors 的最大数量非常高。否则,它将采取长时间的负载下进行繁重的工作。
删除策略比较简单:如果executor 空闲时间为k秒,则意味着它没有计划运行任何任务,然后删除它。在这两种情况下都没有重试逻辑,因为我们假设集群管理器最终将完成它异步接收的所有请求。
相关Spark的属性如下:
l spark.dynamicAllocation.enabled是否启用此功能
l spark.dynamicAllocation.minExecutors executors最小的数量
l spark.dynamicAllocation.maxExecutors executors最大的数量
l spark.dynamicAllocation.initialExecutorsexecutors初始化的数量
l spark.dynamicAllocation.schedulerBacklogTimeout(M) 如果有积压的任务持续时间,增加新的executors
l spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(N) 如果积压时间持续,增加更多的executors,仅在初始积压超时后才使用此选项
l spark.dynamicAllocation.executorIdleTimeout(K) 如果executors在此期间处于空闲状态,删除它。
ExecutorAllocationManager.scala的源代码:
1. private[spark] classExecutorAllocationManager(
2. client:ExecutorAllocationClient,
3. listenerBus: LiveListenerBus,
4. conf: SparkConf)
5. extends Logging {
6. …..
7. // Clock used to schedule when executorsshould be added and removed
8. private var clock: Clock = new SystemClock()
有个定时器,定时器不断的去扫描executor的情况:正在运行的Stage,Stage运行在不同的executor中,所谓动态就是指要么增加要么减少executor。例如,减少executor的情况, 判断一个时间如60秒中executor没有一个任务在运行,就把这个executor删掉。这是去掉executor的情况,因为当前的应用程序中运行的所有executor,在Driver中有数据结构对它进行保持引用,每次任务调度的时候循环遍历一下executor可用列表,看一下executor的可用资源,由于有个时钟Clock,有时钟就可以不断的循环,循环检查是否满足增加executor或者删除executor的条件,如果满足条件,就会触发executor的增加和删除。executor的增加和删除非常简单,因为Driver中的ExecutorBackend有对executor的管理关系,例如超时,可以设置一个add的时间,或者评估一下当前的作业资源,如果不够的话申请更多的资源。之所以动态起来,类似于有一个时钟,在固定的周期里检查,如果想删除,就发一个killExecutor的信息,如果想添加,就在具体的Work上启动Executor。
Master的schedule分配资源,是默认的资源分配方式。
Master.scala的源代码:
1. private def schedule(): Unit = {
2. if (state !=RecoveryState.ALIVE) {
3. return
4. }
5. // Drivers take strictprecedence over executors
6. val shuffledAliveWorkers =Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
7. val numWorkersAlive =shuffledAliveWorkers.size
8. var curPos = 0
9. for (driver
= driver.desc.mem && worker.coresFree >= driver.desc.cores) {19. launchDriver(worker,driver)
20. waitingDrivers -= driver
21. launched = true
22. }
23. curPos = (curPos + 1) %numWorkersAlive
24. }
25. }
26. startExecutorsOnWorkers()
27. }
在 ExecutorAllocationManager.scala的源代码中也有schedule。在一个固定的时间间隔被调用来调节请求的executor的数量和executor 的运行数量。首先,根据添加时间和当前需要调整我们请求的executor 。然后,如果现有executor 的移除时间已经过期,则Kill executor 。
schedule内部方法将会被周期性的触发,将会周期性的执行。其中 removeTimes 是一个HashMap[String, Long]数据结构。
ExecutorAllocationManager.scala的源代码:
1. private def schedule(): Unit =synchronized {
2. val now = clock.getTimeMillis
3.
4. updateAndSyncNumExecutorsTarget(now)
5.
6. val executorIdsToBeRemoved =ArrayBuffer[String]()
7. removeTimes.retain { case(executorId, expireTime) =>
8. val expired = now >=expireTime
9. if (expired) {
10. initializing = false
11. executorIdsToBeRemoved +=executorId
12. }
13. !expired
14. }
15. if(executorIdsToBeRemoved.nonEmpty) {
16. removeExecutors(executorIdsToBeRemoved)
17. }
18. }
在ExecutorAllocationManager运行scheduleTask的时候直接进行schedule,而scheduleTask的运行是在executor.scheduleWithFixedDelay调用的。其中executor是一个线程池。在这个池子中只有一条线程。scheduleWithFixedDelay做了一个定时器,不断的调用schedule。里面有个参数是intervalMillis,默认是100毫秒,每隔100毫秒调整1次。
ExecutorAllocationManager.scala的start源代码:
1. def start(): Unit = {
2. listenerBus.addListener(listener)
3.
4. val scheduleTask = newRunnable() {
5. override def run(): Unit = {
6. try {
7. schedule()
8. } catch {
9. case ct: ControlThrowable =>
10. throw ct
11. case t: Throwable =>
12. logWarning(s"Uncaughtexception in thread ${Thread.currentThread().getName}", t)
13. }
14. }
15. }
16. executor.scheduleWithFixedDelay(scheduleTask,0, intervalMillis, TimeUnit.MILLISECONDS)
17.
18. client.requestTotalExecutors(numExecutorsTarget,localityAwareTasks, hostToLocalTaskCount)
19. }
20. ……
21. private val executor =
22. ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
23. …..
24. // Polling loop interval (ms)
25. private val intervalMillis: Long = 100
调整资源的时候考虑一下资源的粒度,增加Executor或减少Executor。一般情况下生产环境下给每个Executor分配的Cores是3到5个,通常设置奇数个。如果动态资源调整,不需要申请很多的Cores。从Spark streaming的角度考虑,Executor动态调整巨大的挑战是Spark streaming是按照Batch Duration的方式运行的,可能这个Batch Duration需要很多的资源,下一个Batch Duration就不需要那么多的资源,言外之意是如果调整资源的时候,还没来得及调整完,这个Batch Duration运行已经过期了。这个确实是问题,那调整周期的时间间隔,如Batch Duration 10 秒钟考虑增加或减少Executor,这也是非常简单的方式。
对你的数据规模进行评估,对已有资源是否闲置进行评估, Batch Duration的数据流进来,每个数据流进行分片的时候,可以计算已有的Core,如果不够,申请增加Executor,运行任务的时候可能分发到新申请的Executor上。也可检查上一个Batch Duration的处理时间,也是动态资源调整的依据。参照检查上一个Batch Duration的处理时间、流量,可以自己搞一套算法。如果在Batch Duration的时间范围之内,那很轻松的处理完,如果不在Batch Duration的时间范围之内,那需要更多的资源。也可以参考一下Spark Streaming的代码。StreamingContext就是一个class,StreamingContext 可以自定义,如可以定义一个类继承至StreamingContext ,命名的时候注意一下包。
StreamingContext.scala的源代码
1. class StreamingContextprivate[streaming] (
2. _sc: SparkContext,
3. _cp: Checkpoint,
4. _batchDur: Duration
5. ) extends Logging {
关于动态控制消费速率,Sparkstreaming提供了一个弹性的机制,可以看一下流进来的数据和处理的数据的关系,是否来得及进行处理,如果不能来得及处理,会自己控制数据流进来的速度。这里有一个配置参数:spark.streaming.backpressure.enabled。建议打开这个参数。Sparkstreaming本身有一个关于rate的控制,在运行的时候手动调整流进的速度,如感知delay太严重了,就控制流进来的数据慢一点,如果能轻松的运行完,那就让Batch Duration流进更多的数据,既然我们自己可以观察,Spark streaming就提供一套算法,在数据流进来的数据和处理时间的比例关系,在某个特定比例下,让流进来的rate速率提高或者降低。
一个问题:如果可以动态控制消费速率,那资源是否不可以动态调整?这个不是的,可以同时开启资源的动态分配和速率的动态控制,都有自己的定时器,该怎么调整就会怎么调整。例如资源来不及分配,如果几次资源都来不及分配就可以动态控制数据消费的速率。为何专门搞一个动态的速率控制,一个很重要的原因,Spark streaming不太好控制的流式处理,不太适合做动态资源控制,因为一个Batch Duration变一下,下一个Batch Duration又变了,因此不合适。推荐使用backpressure动态速率控制,资源的动态控制比较适合于长时间耗时的任务,Spark streaming是一个又一个的微批处理,不适合资源的动态分配。流进的速度具体根据处理的延时和Batch或者windows的大小设定,不可能一下就调整得很完美的符合需要,这是不可能的。例如资源的动态分配,如果缺资源,不会一下就分配10个Executor,可能开始的时候分配一个Executor,再次运行发现还是不够,下次分配2个Executor,如果还是不够,下次就分配4个Executor,然后8个Executor,16个Executor,这个是算法。
具体内容可以看论文(AdaptiveStream Processing using Dynamic Batch Sizing 30多页)说的很清楚。