您当前的位置: 首页 >  ar

段智华

暂无认证

  • 0浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

段智华 发布时间:2018-01-17 19:50:14 ,浏览量:0

Spark视频王家林第119课: Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?

本节讲解Spark Streaming性能优化:如何在生产环境下应对流数据峰值巨变?数据峰值及流量变化的不稳定有2个层面:1)第一个层面就是数据确实不稳定,例如晚上11点的时候访问流量特别高,相对其他时间而言表现为不稳定。2)第二个层面:数据是没问题的,数据流动的速度是匀速或接近于匀速,但是在处理的过程中发生了故障或者GC的时候耽误了时间,导致计算延迟,Web监控台上也会看到峰值的变化。这两种情况无论哪种情况,都不是一个稳定的系统能够避免的问题,解决这个问题有很多方式,本节提一个Spark本身自带的一个Backpressure机制,即数据的反压机制。

Backpressure机制的基本思想是根据上一次计算的Job信息来评估决定下一个Job数据接受的速度。根据上一个Job执行结束之后对Job的统计信息,例如Job延迟了多长时间等,基于这个统计信息,SparkStreaming的算法会评估下一个Batch Duration的时间窗口应该以什么样的速度接收数据。这里有一个问题,接收数据的速度怎么进行限制?接收数据的过程也是消费数据的过程,SparkStreaming在接收数据的时候必须把当前的数据接收完毕,才能接收下一条的数据,根据上一个作业执行的情况,根据自己的算法进行一个判断,下一个BatchDuration必须以什么样的速度接收数据,这就是所谓的Backpressure反压机制。本章节将透彻的讲解Backpressure反压机制如何实现。

在生产环境下,Spark Streaming有一个监控的平台,最简单的是使用SparkWeb View的原生页面,或者借助第三方的监控软件去监控,如监测到Spark Streaming以匀速的速度前进,表明SparkStreaming的处理是稳定的。如果在监控平台上监控Kafka剩余数据的情况,Kafka突然出现一个波峰,这个时候需考虑怎么去解决,如果SparkStreaming开启了Backpressure的机制,数据突然变大或者发生了GC,Backpressure能够动态改变消费的速度,对于Spark而言是非常有意义的。在过去的很多项目中,我们都会开启SparkStreaming的Backpressure反压机制。

从源码的角度,Backpressure反压机制从Driver来考虑,我们系统的看一下反压机制的内容。RateController继承至StreamingListener监听器,当一个Job完成的时候,StreamingListener是监听器的一种,肯定会进行注册,注册的时候会调用RateController。RateController有自己的子类,会调自己具体的业务。其中computeAndPublish方法计算新的限制速率并异步发布。

RateController.scala源代码:

1.         private[streaming] abstractclass RateController(val streamUID: Int, rateEstimator:

2.         RateEstimator) extendsStreamingListener with Serializable {

3.         ……

4.           private def computeAndPublish(time: Long,elems: Long, workDelay: Long, waitDelay: Long): Unit =

5.             Future[Unit] {

6.               val newRate = rateEstimator.compute(time,elems, workDelay, waitDelay)

7.               newRate.foreach { s =>

8.                 rateLimit.set(s.toLong)

9.                 publish(getLatestRate())

10.            }

11.          },

 

我们先看一下RateController在什么时候启动和注册的?从原理的角度考虑什么时候启动RateController?RateController是在JobScheduler中启动的,为什么是在JobScheduler中,原因非常简单,JobScheduler知道作业什么时候完成,作业完成之后获得上一次作业的统计信息。

在Jobscheduler中查找RateController的实例化,每次作业完成的时候,Jobscheduler知道。这是Driver级别的,从start方法开始查找,从getInputStreams中把InputStreams关联上RateController,InputStreams有RateController。

JobScheduler.scala的start方法源代码:

1.          def start(): Unit = synchronized {

2.             if (eventLoop != null) return // schedulerhas already been started

3.          

4.             logDebug("Starting JobScheduler")

5.             eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler") {

6.               override protected def onReceive(event:JobSchedulerEvent): Unit = processEvent(event)

7.          

8.               override protected def onError(e:Throwable): Unit = reportError("Error in job scheduler", e)

9.             }

10.          eventLoop.start()

11.       

12.          // attach rate controllers of input streamsto receive batch completion updates

13.          for {

14.            inputDStream  null

25.          }

26.       

27.          executorAllocationManager =ExecutorAllocationManager.createIfEnabled(

28.            executorAllocClient,

29.            receiverTracker,

30.            ssc.conf,

31.            ssc.graph.batchDuration.milliseconds,

32.            clock)

33.          executorAllocationManager.foreach(ssc.addStreamingListener)

34.          receiverTracker.start()

35.          jobGenerator.start()

36.          executorAllocationManager.foreach(_.start())

37.          logInfo("Started JobScheduler")

38.        }

 

我们先看一下InputStreams怎么来的,先看一下InputStreamsSuite的测试代码,通过ssc.socketTextStream创建一个networkStream:

InputStreamsSuite.scala源代码:

1.          val networkStream = ssc.socketTextStream(

2.                   "localhost",testServer.port, StorageLevel.MEMORY_AND_DISK)

 

ssc.socketTextStream创建的是socketTextStream。

StreamingContext.scala的socketTextStream源代码:

1.            defsocketTextStream(

2.               hostname: String,

3.               port: Int,

4.               storageLevel: StorageLevel =StorageLevel.MEMORY_AND_DISK_SER_2

5.             ): ReceiverInputDStream[String] =withNamedScope("socket text stream") {

6.             socketStream[String](hostname, port,SocketReceiver.bytesToLines, storageLevel)

7.           }

 

   其中socketStream的源代码如下。

   StreamingContext.scala的socketStream源代码:

1.            defsocketStream[T: ClassTag](

2.               hostname: String,

3.               port: Int,

4.               converter: (InputStream) =>Iterator[T],

5.               storageLevel: StorageLevel

6.             ): ReceiverInputDStream[T] = {

7.             new SocketInputDStream[T](this, hostname,port, converter, storageLevel)

8.           }

 

这里创建出SocketInputDStream,就是InputStreams的来源。SocketInputDStream中有一个getReceiver,在getReceiver方法中创建SocketReceiver。

SocketInputDStream.scala源代码:

1.          private[streaming]

2.         class SocketInputDStream[T:ClassTag](

3.             _ssc: StreamingContext,

4.             host: String,

5.             port: Int,

6.             bytesToObjects: InputStream =>Iterator[T],

7.             storageLevel: StorageLevel

8.           ) extends ReceiverInputDStream[T](_ssc) {

9.          

10.        def getReceiver(): Receiver[T] = {

11.          new SocketReceiver(host, port,bytesToObjects, storageLevel)

12.        }

13.      }

 

SocketReceiver的onStart方法会一直不断的循环,循环进行 receive()。

SocketInputDStream.scala的SocketReceiver源代码:

1.          private[streaming]

2.         class SocketReceiver[T:ClassTag](

3.             host: String,

4.             port: Int,

5.             bytesToObjects: InputStream =>Iterator[T],

6.             storageLevel: StorageLevel

7.           ) extends Receiver[T](storageLevel) withLogging {

8.          

9.           private var socket: Socket = _

10.       

11.        def onStart() {

12.       

13.          logInfo(s"Connecting to$host:$port")

14.          try {

15.            socket = new Socket(host, port)

16.          } catch {

17.            case e: ConnectException =>

18.              restart(s"Error connecting to$host:$port", e)

19.              return

20.          }

21.          logInfo(s"Connected to$host:$port")

22.       

23.          // Start the thread that receives data overa connection

24.          new Thread("Socket Receiver") {

25.            setDaemon(true)

26.            override def run() { receive() }

27.          }.start()

28.        }

 

receive方法中socket.getInputStream()是接收到的数据,放入到iterator中,只要iterator不停止就会一直循环,将数据存储起来,数据存储不是一件简单的事情。注意:receive方法运行在Executor上。

SocketInputDStream.scala的receive方法源代码:

1.            defreceive() {

2.             try {

3.               val iterator =bytesToObjects(socket.getInputStream())

4.               while(!isStopped &&iterator.hasNext) {

5.                 store(iterator.next())

6.               }

7.               if (!isStopped()) {

8.                 restart("Socket data stream had nomore data")

9.               } else {

10.              logInfo("Stopped receiving")

11.            }

12.          } catch {

13.            case NonFatal(e) =>

14.              logWarning("Error receivingdata", e)

15.              restart("Error receivingdata", e)

16.          } finally {

17.            onStop()

18.          }

19.        }

 

 Receiver的store方法将单个接收到的数据存储到Spark内存中,这些单个数据在被放入Spark内存前将被合并到数据块。

 Receiver.scala的源代码:

1.            defstore(dataItem: T) {

2.             supervisor.pushSingle(dataItem)

3.           }

 

ReceiverSupervisor的pushSingle方法将单个数据项推到后端数据存储,这里无具体实现,需看ReceiverSupervisor具体子类ReceiverSupervisorImpl的实现。

ReceiverSupervisor.scala源代码:

1.          def pushSingle(data: Any): Unit

 

子类ReceiverSupervisorImpl的pushSingle方法将一条记录放入defaultBlockGenerator中。

ReceiverSupervisorImpl.scala的源代码:

1.            defpushSingle(data: Any) {

2.             defaultBlockGenerator.addData(data)

3.           }

 

defaultBlockGenerator的addData方法将单个数据项推入缓冲区。addData中有个关键的方法 waitToPush(), waitToPush()是关键点。

BlockGenerator.scala源代码:

1.          def addData(data: Any): Unit = {

2.             if (state == Active) {

3.               waitToPush()

4.               synchronized {

5.                 if (state == Active) {

6.                   currentBuffer += data

7.                 } else {

8.                   throw new SparkException(

9.                     "Cannot add data asBlockGenerator has not been started or has been stopped")

10.              }

11.            }

12.          } else {

13.            throw new SparkException(

14.              "Cannot add data as BlockGeneratorhas not been started or has been stopped")

15.          }

16.        }

 

waitToPush方法里面调用rateLimiter.acquire()方法,如果addData的waitToPush方法不执行,则addData方法中在waitToPush方法之后的synchronized同步块代码将都不执行,数据不能存储。

RateLimiter.scala源代码:

1.           private[receiver] abstract classRateLimiter(conf: SparkConf) extends Logging {

2.          

3.           // treated as an upper limit

4.           private val maxRateLimit =conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)

5.           private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)

6.          

7.           def waitToPush() {

8.             rateLimiter.acquire()

9.           }

 

RateLimiter.java位于com.google.common.util.concurrent包中,acquire方法从{@code RateLimiter}申请获取一个许可证,一直阻塞直到请求被授予。 其中调用acquire(1)方法,acquire(1)方法中reserve有个同步互斥信号量synchronized (mutex()),如果接收数据进行存储,需拿到一个许可证Ticket,如果没有这个令牌,就无法存储数据,也就限定了接收数据的速度。在receiver中有具体限定接收数据的方式。

RateLimiter.java的源代码:

1.         public void acquire() {

2.                 acquire(1);

3.             }

4.         ......

5.             public void acquire(int permits) {

6.                 checkPermits(permits);

7.                 long microsToWait;

8.                 synchronized (mutex) {

9.                     microsToWait =reserveNextTicket(permits, readSafeMicros());

10.              }

11.              ticker.sleepMicrosUninterruptibly(microsToWait);

12.          }

 

 回到Driver端的JobScheduler,JobScheduler在Start的时候,每个inputDStream都有一个rateController,for循环遍历获得rateController,然后将rateController交给上下文的ssc.addStreamingListener监听器,进行注册。这里listenerBus是scheduler级别的listenerBus,listenerBus收到相关的信息肯定会告诉监听器。

 

 StreamingContext.scala的addStreamingListener源代码:

1.            defaddStreamingListener(streamingListener: StreamingListener) {

2.             scheduler.listenerBus.addListener(streamingListener)

3.           }

4.         ……

5.         private[spark] val listeners =new CopyOnWriteArrayList[L]

6.         …….

7.           final def addListener(listener: L): Unit = {

8.             listeners.add(listener)

9.           }

 

其中listenerBus是StreamingListenerBus,StreamingListenerBus在什么时候启动的?在构建JobScheduler的时候会获取StreamingListenerBus的实例。

JobScheduler.scala源代码:

1.         val listenerBus = newStreamingListenerBus(ssc.sparkContext.listenerBus)

 

StreamingListenerBus中有很多事件,包括receiverStarted、receiverError、receiverStopped等事件。其中batchStarted是batch处理开始,batchSubmitted是batch提交,而batchCompleted比较关键。

StreamingListenerBus.scala源代码:

1.          private[streaming] classStreamingListenerBus(sparkListenerBus: LiveListenerBus)

2.           extends SparkListener withListenerBus[StreamingListener, StreamingListenerEvent] {

3.         ......

4.         protected override defdoPostEvent(

5.               listener: StreamingListener,

6.               event: StreamingListenerEvent): Unit = {

7.             event match {

8.               case receiverStarted:StreamingListenerReceiverStarted =>

9.                 listener.onReceiverStarted(receiverStarted)

10.            case receiverError:StreamingListenerReceiverError =>

11.              listener.onReceiverError(receiverError)

12.            case receiverStopped:StreamingListenerReceiverStopped =>

13.              listener.onReceiverStopped(receiverStopped)

14.            case batchSubmitted:StreamingListenerBatchSubmitted =>

15.              listener.onBatchSubmitted(batchSubmitted)

16.            case batchStarted:StreamingListenerBatchStarted =>

17.              listener.onBatchStarted(batchStarted)

18.            case batchCompleted: StreamingListenerBatchCompleted=>

19.              listener.onBatchCompleted(batchCompleted)

20.            case outputOperationStarted:StreamingListenerOutputOperationStarted =>

21.              listener.onOutputOperationStarted(outputOperationStarted)

22.            case outputOperationCompleted: StreamingListenerOutputOperationCompleted=>

23.              listener.onOutputOperationCompleted(outputOperationCompleted)

24.            case streamingStarted:StreamingListenerStreamingStarted =>

25.              listener.onStreamingStarted(streamingStarted)

26.            case _ =>

27.          }

28.        }

 

batchCompleted的时候,从StreamingListener的角度讲,StreamingListener是一个trait,onBatchCompleted方法中无具体实现。具体方法在子类实现,StreamingListener有很多具体的子类,其中一个子类是RateController。

StreamingListener.scala源代码:

1.         trait StreamingListener {

2.         ……

3.         defonBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

 

子类RateController会调用onBatchCompleted方法,onBatchCompleted方法是个关键点。获取batchInfo的streamIdToInputInfo作为elements,for循环遍历获取workDelay、waitDelay等信息,这些信息对于第三方的监控也是非常重要的,然后通过computeAndPublish方法发布信息。在computeAndPublish方法中,rateEstimator根据传入的时间、elems、workDelay、waitDelay等信息评估新的更加合适的Rate,在newRate.foreach循环遍历中,将值赋值到rateLimit,然后publish进行发布,这里publish方法无具体实现,RateController的子类是ReceiverRateController,其publish方法将新的速率交给receiverTracker。

RateController.scala源代码:

1.            override def onBatchCompleted(batchCompleted:StreamingListenerBatchCompleted) {

2.             val elements =batchCompleted.batchInfo.streamIdToInputInfo

3.          

4.             for {

5.               processingEnd  0) {

3.               if (maxRateLimit > 0) {

4.                 rateLimiter.setRate(newRate.min(maxRateLimit))

5.               } else {

6.                 rateLimiter.setRate(newRate)

7.               }

8.             }

 

RateLimiter.java 的setRate方法中加了一个互斥信号锁,获取每秒钟能接收多少条记录及处理多少条记录,然后调用doSetRate方法根据已有的数据和最新的数据设置速率,doSetRate交给具体的子类实现。

com.google.common.util.concurrent.RateLimiter.java的源代码:

1.         public final voidsetRate(double permitsPerSecond) {

2.                 Preconditions.checkArgument(permitsPerSecond> 0.0

3.                         &&!Double.isNaN(permitsPerSecond), "rate must be positive");

4.                 synchronized (mutex) {

5.                     resync(readSafeMicros());

6.                     double stableIntervalMicros =TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;

7.                     this.stableIntervalMicros =stableIntervalMicros;

8.                     doSetRate(permitsPerSecond,stableIntervalMicros);

9.                 }

10.          }

11.       

12.          abstract void doSetRate(doublepermitsPerSecond, double stableIntervalMicros);

 

doSetRate方法由具体的子类实现,Bursty类是RateLimiter的子类,com.google.common.util.concurrent.RateLimiter.java是Google提供的,源码在guava-14.0.1-sources中。在存储数据的时候,必须根据速率rate存储数据,这就是本章节谈的Backpressure机制。整体思路较简单,每次计算BatchDuration Job执行完成的时候,就有JobCompleted的统计信息发回来,根据统计信息计算新的Rate,将新的rate进行远程通信交给Executor,Executor根据接收到的信息重新设置Rate,每次接收数据的时候肯定根据Rate决定每秒接收多少数据。这样就动态改变了速度,每次计算上一个作业的执行情况。

com.google.common.util.concurrent.RateLimiter.java的源代码:

1.         private static class Burstyextends RateLimiter {

2.                 Bursty(SleepingTicker ticker) {

3.                     super(ticker);

4.                 }

5.          

6.                 @Override

7.                 void doSetRate(double permitsPerSecond,double stableIntervalMicros) {

8.                     double oldMaxPermits =this.maxPermits;

9.                     /*

10.                   * We allow the equivalent work ofup to one second to be granted with zero waiting, if the

11.                   * rate limiter has been unused foras much. This is to avoid potentially producing tiny

12.                   * wait interval between subsequentrequests for sufficiently large rates, which would

13.                   * unnecessarily overconstrain thethread scheduler.

14.                   */

15.                  maxPermits = permitsPerSecond; //one second worth of permits

16.                  storedPermits = (oldMaxPermits ==0.0)

17.                          ? 0.0 // initial state

18.                          : storedPermits *maxPermits / oldMaxPermits;

19.              }

20.       

21.              @Override

22.              long storedPermitsToWaitTime(doublestoredPermits, double permitsToTake) {

23.                  return 0L;

24.              }

25.          }

 

 

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.0604s