一、MetricsSystem初始化
在SparkEnv中初始化, Driver端的MetricsSystem不会立刻启动, 需要task scheduler提供一个app ID, executor端的立刻启动
val metricsSystem = if (isDriver) {
//driver端立刻启动
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
// Then we can start the metrics system.
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
} else { //executor
// We need to set the executor ID before the MetricsSystem is created because sources and
// sinks specified in the metrics configuration file will want to incorporate this executor's
// ID into the metrics they report.
conf.set("spark.executor.id", executorId)
val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
ms.start() //立刻启动
ms
}
二、MetricsSystem启动
在SparkContext中启动, driver的MetricsSystem 需要设置app Id ,然后启动
_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
三、start() 中具体做了什么
def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
StaticSources.allSources.foreach(registerSource)
//注册 sources
registerSources()
//注册sink
registerSinks()
//启动sink
sinks.foreach(_.start)
}
3.1、注册 sources, 告诉测量系统 从 哪儿收集 测量数据
3.2、注册 sinks, 告诉测量系统
3.3、给sinks增加jetty的 ServlteContextHandler