Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?
本节讲解Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?数据峰值及流量变化的不稳定有2个层面:1)第一个层面就是数据确实不稳定,例如晚上11点的时候访问流量特别高,相对其他时间而言表现为不稳定。2)第二个层面:数据是没问题的,数据流动的速度是匀速或接近于匀速,但是在处理的过程中发生了故障或者GC的时候耽误了时间,导致计算延迟,Web监控台上也会看到峰值的变化。这两种情况无论哪种情况,都不是一个稳定的系统能够避免的问题,解决这个问题有很多方式,本节提一个Spark本身自带的一个Backpressure机制,即数据的反压机制。
Backpressure机制的基本思想是根据上一次计算的Job信息来评估决定下一个Job数据接受的速度。根据上一个Job执行结束之后对Job的统计信息,例如Job延迟了多长时间等,基于这个统计信息,SparkStreaming的算法会评估下一个Batch Duration的时间窗口应该以什么样的速度接收数据。这里有一个问题,接收数据的速度怎么进行限制?接收数据的过程也是消费数据的过程,SparkStreaming在接收数据的时候必须把当前的数据接收完毕,才能接收下一条的数据,根据上一个作业执行的情况,根据自己的算法进行一个判断,下一个BatchDuration必须以什么样的速度接收数据,这就是所谓的Backpressure反压机制。本章节将透彻的讲解Backpressure反压机制如何实现。
在生产环境下,Spark Streaming有一个监控的平台,最简单的是使用SparkWeb View的原生页面,或者借助第三方的监控软件去监控,如监测到Spark Streaming以匀速的速度前进,表明SparkStreaming的处理是稳定的。如果在监控平台上监控Kafka剩余数据的情况,Kafka突然出现一个波峰,这个时候需考虑怎么去解决,如果SparkStreaming开启了Backpressure的机制,数据突然变大或者发生了GC,Backpressure能够动态改变消费的速度,对于Spark而言是非常有意义的。在过去的很多项目中,我们都会开启SparkStreaming的Backpressure反压机制。
从源码的角度,Backpressure反压机制从Driver来考虑,我们系统的看一下反压机制的内容。RateController继承至StreamingListener监听器,当一个Job完成的时候,StreamingListener是监听器的一种,肯定会进行注册,注册的时候会调用RateController。RateController有自己的子类,会调自己具体的业务。其中computeAndPublish方法计算新的限制速率并异步发布。
RateController.scala源代码:
1. private[streaming] abstractclass RateController(val streamUID: Int, rateEstimator:
2. RateEstimator) extendsStreamingListener with Serializable {
3. ……
4. private def computeAndPublish(time: Long,elems: Long, workDelay: Long, waitDelay: Long): Unit =
5. Future[Unit] {
6. val newRate = rateEstimator.compute(time,elems, workDelay, waitDelay)
7. newRate.foreach { s =>
8. rateLimit.set(s.toLong)
9. publish(getLatestRate())
10. }
11. },
我们先看一下RateController在什么时候启动和注册的?从原理的角度考虑什么时候启动RateController?RateController是在JobScheduler中启动的,为什么是在JobScheduler中,原因非常简单,JobScheduler知道作业什么时候完成,作业完成之后获得上一次作业的统计信息。
在Jobscheduler中查找RateController的实例化,每次作业完成的时候,Jobscheduler知道。这是Driver级别的,从start方法开始查找,从getInputStreams中把InputStreams关联上RateController,InputStreams有RateController。
JobScheduler.scala的start方法源代码:
1. def start(): Unit = synchronized {
2. if (eventLoop != null) return // schedulerhas already been started
3.
4. logDebug("Starting JobScheduler")
5. eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler") {
6. override protected def onReceive(event:JobSchedulerEvent): Unit = processEvent(event)
7.
8. override protected def onError(e:Throwable): Unit = reportError("Error in job scheduler", e)
9. }
10. eventLoop.start()
11.
12. // attach rate controllers of input streamsto receive batch completion updates
13. for {
14. inputDStream null
25. }
26.
27. executorAllocationManager =ExecutorAllocationManager.createIfEnabled(
28. executorAllocClient,
29. receiverTracker,
30. ssc.conf,
31. ssc.graph.batchDuration.milliseconds,
32. clock)
33. executorAllocationManager.foreach(ssc.addStreamingListener)
34. receiverTracker.start()
35. jobGenerator.start()
36. executorAllocationManager.foreach(_.start())
37. logInfo("Started JobScheduler")
38. }
我们先看一下InputStreams怎么来的,先看一下InputStreamsSuite的测试代码,通过ssc.socketTextStream创建一个networkStream:
InputStreamsSuite.scala源代码:
1. val networkStream = ssc.socketTextStream(
2. "localhost",testServer.port, StorageLevel.MEMORY_AND_DISK)
ssc.socketTextStream创建的是socketTextStream。
StreamingContext.scala的socketTextStream源代码:
1. defsocketTextStream(
2. hostname: String,
3. port: Int,
4. storageLevel: StorageLevel =StorageLevel.MEMORY_AND_DISK_SER_2
5. ): ReceiverInputDStream[String] =withNamedScope("socket text stream") {
6. socketStream[String](hostname, port,SocketReceiver.bytesToLines, storageLevel)
7. }
其中socketStream的源代码如下。
StreamingContext.scala的socketStream源代码:
1. defsocketStream[T: ClassTag](
2. hostname: String,
3. port: Int,
4. converter: (InputStream) =>Iterator[T],
5. storageLevel: StorageLevel
6. ): ReceiverInputDStream[T] = {
7. new SocketInputDStream[T](this, hostname,port, converter, storageLevel)
8. }
这里创建出SocketInputDStream,就是InputStreams的来源。SocketInputDStream中有一个getReceiver,在getReceiver方法中创建SocketReceiver。
SocketInputDStream.scala源代码:
1. private[streaming]
2. class SocketInputDStream[T:ClassTag](
3. _ssc: StreamingContext,
4. host: String,
5. port: Int,
6. bytesToObjects: InputStream =>Iterator[T],
7. storageLevel: StorageLevel
8. ) extends ReceiverInputDStream[T](_ssc) {
9.
10. def getReceiver(): Receiver[T] = {
11. new SocketReceiver(host, port,bytesToObjects, storageLevel)
12. }
13. }
SocketReceiver的onStart方法会一直不断的循环,循环进行 receive()。
SocketInputDStream.scala的SocketReceiver源代码:
1. private[streaming]
2. class SocketReceiver[T:ClassTag](
3. host: String,
4. port: Int,
5. bytesToObjects: InputStream =>Iterator[T],
6. storageLevel: StorageLevel
7. ) extends Receiver[T](storageLevel) withLogging {
8.
9. private var socket: Socket = _
10.
11. def onStart() {
12.
13. logInfo(s"Connecting to$host:$port")
14. try {
15. socket = new Socket(host, port)
16. } catch {
17. case e: ConnectException =>
18. restart(s"Error connecting to$host:$port", e)
19. return
20. }
21. logInfo(s"Connected to$host:$port")
22.
23. // Start the thread that receives data overa connection
24. new Thread("Socket Receiver") {
25. setDaemon(true)
26. override def run() { receive() }
27. }.start()
28. }
receive方法中socket.getInputStream()是接收到的数据,放入到iterator中,只要iterator不停止就会一直循环,将数据存储起来,数据存储不是一件简单的事情。注意:receive方法运行在Executor上。
SocketInputDStream.scala的receive方法源代码:
1. defreceive() {
2. try {
3. val iterator =bytesToObjects(socket.getInputStream())
4. while(!isStopped &&iterator.hasNext) {
5. store(iterator.next())
6. }
7. if (!isStopped()) {
8. restart("Socket data stream had nomore data")
9. } else {
10. logInfo("Stopped receiving")
11. }
12. } catch {
13. case NonFatal(e) =>
14. logWarning("Error receivingdata", e)
15. restart("Error receivingdata", e)
16. } finally {
17. onStop()
18. }
19. }
Receiver的store方法将单个接收到的数据存储到Spark内存中,这些单个数据在被放入Spark内存前将被合并到数据块。
Receiver.scala的源代码:
1. defstore(dataItem: T) {
2. supervisor.pushSingle(dataItem)
3. }
ReceiverSupervisor的pushSingle方法将单个数据项推到后端数据存储,这里无具体实现,需看ReceiverSupervisor具体子类ReceiverSupervisorImpl的实现。
ReceiverSupervisor.scala源代码:
1. def pushSingle(data: Any): Unit
子类ReceiverSupervisorImpl的pushSingle方法将一条记录放入defaultBlockGenerator中。
ReceiverSupervisorImpl.scala的源代码:
1. defpushSingle(data: Any) {
2. defaultBlockGenerator.addData(data)
3. }
defaultBlockGenerator的addData方法将单个数据项推入缓冲区。addData中有个关键的方法 waitToPush(), waitToPush()是关键点。
BlockGenerator.scala源代码:
1. def addData(data: Any): Unit = {
2. if (state == Active) {
3. waitToPush()
4. synchronized {
5. if (state == Active) {
6. currentBuffer += data
7. } else {
8. throw new SparkException(
9. "Cannot add data asBlockGenerator has not been started or has been stopped")
10. }
11. }
12. } else {
13. throw new SparkException(
14. "Cannot add data as BlockGeneratorhas not been started or has been stopped")
15. }
16. }
waitToPush方法里面调用rateLimiter.acquire()方法,如果addData的waitToPush方法不执行,则addData方法中在waitToPush方法之后的synchronized同步块代码将都不执行,数据不能存储。
RateLimiter.scala源代码:
1. private[receiver] abstract classRateLimiter(conf: SparkConf) extends Logging {
2.
3. // treated as an upper limit
4. private val maxRateLimit =conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
5. private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
6.
7. def waitToPush() {
8. rateLimiter.acquire()
9. }
RateLimiter.java位于com.google.common.util.concurrent包中,acquire方法从{@code RateLimiter}申请获取一个许可证,一直阻塞直到请求被授予。 其中调用acquire(1)方法,acquire(1)方法中reserve有个同步互斥信号量synchronized (mutex()),如果接收数据进行存储,需拿到一个许可证Ticket,如果没有这个令牌,就无法存储数据,也就限定了接收数据的速度。在receiver中有具体限定接收数据的方式。
RateLimiter.java的源代码:
1. public void acquire() {
2. acquire(1);
3. }
4. ......
5. public void acquire(int permits) {
6. checkPermits(permits);
7. long microsToWait;
8. synchronized (mutex) {
9. microsToWait =reserveNextTicket(permits, readSafeMicros());
10. }
11. ticker.sleepMicrosUninterruptibly(microsToWait);
12. }
回到Driver端的JobScheduler,JobScheduler在Start的时候,每个inputDStream都有一个rateController,for循环遍历获得rateController,然后将rateController交给上下文的ssc.addStreamingListener监听器,进行注册。这里listenerBus是scheduler级别的listenerBus,listenerBus收到相关的信息肯定会告诉监听器。
StreamingContext.scala的addStreamingListener源代码:
1. defaddStreamingListener(streamingListener: StreamingListener) {
2. scheduler.listenerBus.addListener(streamingListener)
3. }
4. ……
5. private[spark] val listeners =new CopyOnWriteArrayList[L]
6. …….
7. final def addListener(listener: L): Unit = {
8. listeners.add(listener)
9. }
其中listenerBus是StreamingListenerBus,StreamingListenerBus在什么时候启动的?在构建JobScheduler的时候会获取StreamingListenerBus的实例。
JobScheduler.scala源代码:
1. val listenerBus = newStreamingListenerBus(ssc.sparkContext.listenerBus)
StreamingListenerBus中有很多事件,包括receiverStarted、receiverError、receiverStopped等事件。其中batchStarted是batch处理开始,batchSubmitted是batch提交,而batchCompleted比较关键。
StreamingListenerBus.scala源代码:
1. private[streaming] classStreamingListenerBus(sparkListenerBus: LiveListenerBus)
2. extends SparkListener withListenerBus[StreamingListener, StreamingListenerEvent] {
3. ......
4. protected override defdoPostEvent(
5. listener: StreamingListener,
6. event: StreamingListenerEvent): Unit = {
7. event match {
8. case receiverStarted:StreamingListenerReceiverStarted =>
9. listener.onReceiverStarted(receiverStarted)
10. case receiverError:StreamingListenerReceiverError =>
11. listener.onReceiverError(receiverError)
12. case receiverStopped:StreamingListenerReceiverStopped =>
13. listener.onReceiverStopped(receiverStopped)
14. case batchSubmitted:StreamingListenerBatchSubmitted =>
15. listener.onBatchSubmitted(batchSubmitted)
16. case batchStarted:StreamingListenerBatchStarted =>
17. listener.onBatchStarted(batchStarted)
18. case batchCompleted: StreamingListenerBatchCompleted=>
19. listener.onBatchCompleted(batchCompleted)
20. case outputOperationStarted:StreamingListenerOutputOperationStarted =>
21. listener.onOutputOperationStarted(outputOperationStarted)
22. case outputOperationCompleted: StreamingListenerOutputOperationCompleted=>
23. listener.onOutputOperationCompleted(outputOperationCompleted)
24. case streamingStarted:StreamingListenerStreamingStarted =>
25. listener.onStreamingStarted(streamingStarted)
26. case _ =>
27. }
28. }
batchCompleted的时候,从StreamingListener的角度讲,StreamingListener是一个trait,onBatchCompleted方法中无具体实现。具体方法在子类实现,StreamingListener有很多具体的子类,其中一个子类是RateController。
StreamingListener.scala源代码:
1. trait StreamingListener {
2. ……
3. defonBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
子类RateController会调用onBatchCompleted方法,onBatchCompleted方法是个关键点。获取batchInfo的streamIdToInputInfo作为elements,for循环遍历获取workDelay、waitDelay等信息,这些信息对于第三方的监控也是非常重要的,然后通过computeAndPublish方法发布信息。在computeAndPublish方法中,rateEstimator根据传入的时间、elems、workDelay、waitDelay等信息评估新的更加合适的Rate,在newRate.foreach循环遍历中,将值赋值到rateLimit,然后publish进行发布,这里publish方法无具体实现,RateController的子类是ReceiverRateController,其publish方法将新的速率交给receiverTracker。
RateController.scala源代码:
1. override def onBatchCompleted(batchCompleted:StreamingListenerBatchCompleted) {
2. val elements =batchCompleted.batchInfo.streamIdToInputInfo
3.
4. for {
5. processingEnd 0) {
3. if (maxRateLimit > 0) {
4. rateLimiter.setRate(newRate.min(maxRateLimit))
5. } else {
6. rateLimiter.setRate(newRate)
7. }
8. }
RateLimiter.java 的setRate方法中加了一个互斥信号锁,获取每秒钟能接收多少条记录及处理多少条记录,然后调用doSetRate方法根据已有的数据和最新的数据设置速率,doSetRate交给具体的子类实现。
com.google.common.util.concurrent.RateLimiter.java的源代码:
1. public final voidsetRate(double permitsPerSecond) {
2. Preconditions.checkArgument(permitsPerSecond> 0.0
3. &&!Double.isNaN(permitsPerSecond), "rate must be positive");
4. synchronized (mutex) {
5. resync(readSafeMicros());
6. double stableIntervalMicros =TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;
7. this.stableIntervalMicros =stableIntervalMicros;
8. doSetRate(permitsPerSecond,stableIntervalMicros);
9. }
10. }
11.
12. abstract void doSetRate(doublepermitsPerSecond, double stableIntervalMicros);
doSetRate方法由具体的子类实现,Bursty类是RateLimiter的子类,com.google.common.util.concurrent.RateLimiter.java是Google提供的,源码在guava-14.0.1-sources中。在存储数据的时候,必须根据速率rate存储数据,这就是本章节谈的Backpressure机制。整体思路较简单,每次计算BatchDuration Job执行完成的时候,就有JobCompleted的统计信息发回来,根据统计信息计算新的Rate,将新的rate进行远程通信交给Executor,Executor根据接收到的信息重新设置Rate,每次接收数据的时候肯定根据Rate决定每秒接收多少数据。这样就动态改变了速度,每次计算上一个作业的执行情况。
com.google.common.util.concurrent.RateLimiter.java的源代码:
1. private static class Burstyextends RateLimiter {
2. Bursty(SleepingTicker ticker) {
3. super(ticker);
4. }
5.
6. @Override
7. void doSetRate(double permitsPerSecond,double stableIntervalMicros) {
8. double oldMaxPermits =this.maxPermits;
9. /*
10. * We allow the equivalent work ofup to one second to be granted with zero waiting, if the
11. * rate limiter has been unused foras much. This is to avoid potentially producing tiny
12. * wait interval between subsequentrequests for sufficiently large rates, which would
13. * unnecessarily overconstrain thethread scheduler.
14. */
15. maxPermits = permitsPerSecond; //one second worth of permits
16. storedPermits = (oldMaxPermits ==0.0)
17. ? 0.0 // initial state
18. : storedPermits *maxPermits / oldMaxPermits;
19. }
20.
21. @Override
22. long storedPermitsToWaitTime(doublestoredPermits, double permitsToTake) {
23. return 0L;
24. }
25. }