kafka分布式集群启动比spark分布式集群启动相对简化的思考, 1、spark分布式集群节点的启动,只在spark master节点执行start-all.sh,master节点会自动ssh到分布式的其他worker节点,将cmd命令发送过去,在worker节点jvm中自动就加载了main类来启动,如CorseGrainExecutorBackend,从而完成了master与worker的通信。
2、kafka就不一样,kafka在每台broker节点上都需手工启动kafka-server-start.sh,这样就简化了许多,在每个broker节点上就可以通过socket来通信了。
这么理解吧: 1、spark相当于一个小区的一个基站,基站起来了,分布式节点(手机)就自动带起来了,手机可以上网、打电话 2、kafka相当于固定电话,要每户每户进行安装,每一户人家(分布式broker)人工安装好了,就可以打电话了。电话调度executor使用FIFO,先来的电话先通,后来的电话就等待。
以上个人想的,可能描述不一定准确。
1、 kafka.kafka ->main ->kafkaServerStartable.startup
2、进入 kafkaServerStartable.scala -〉def startup() -〉kafka.server.startup()
3、进入kafka.server.scala: canStartup启动
if (canStartup) { metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime,
true)
brokerState.newState(Starting)
/* start scheduler */ //启动kafka调度器 kafkaScheduler.startup()
/* setup zookeeper */ //初始化zookeeper zkUtils = initZk()
/* start log manager */ //启动日志管理 logManager = createLogManager(zkUtils.zkClient, brokerState) logManager.startup()
/* generate brokerId */ //生成 brokerId config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], "
socketServer = new SocketServer(config, metrics, kafkaMetricsTime) socketServer.startup() //socketServer启动
/* start replica manager */ 启动副本管理 replicaManager = new ReplicaManager(config, metrics, time,
kafkaMetricsTime, zkUtils, kafkaScheduler, logManager, isShuttingDown) replicaManager.startup()
/* start kafka controller */ //启动kafka控制器 kafkaController = new KafkaController(config, zkUtils, brokerState,
kafkaMetricsTime, metrics, threadNamePrefix) kafkaController.startup()
/* start kafka coordinator */ //启动kafka协调器 consumerCoordinator = GroupCoordinator.create(config, zkUtils,
replicaManager) consumerCoordinator.startup()
/* Get the authorizer and initialize it if one is specified.*///启动
安全认证 authorizer = Option(config.authorizerClassName).filter
(_.nonEmpty).map { authorizerClassName => val authZ = CoreUtils.createObject[Authorizer](authorizerClassName) authZ.configure(config.originals()) authZ }
/* start processing requests */ //启动kakfa的请求线程池 apis = new KafkaApis(socketServer.requestChannel, replicaManager,
consumerCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache,
metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,
socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker)
Mx4jLoader.maybeLoad()
/* start dynamic config manager */ //kafka动态配置管理 dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic
-> new TopicConfigHandler(logManager), ConfigType.Client
-> new ClientIdConfigHandler(apis.quotaManagers))
// Apply all existing client configs to the ClientIdConfigHandler to
bootstrap the overrides // TODO: Move this logic to DynamicConfigManager AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach
{ case (clientId, properties) => dynamicConfigHandlers
(ConfigType.Client).processConfigChanges(clientId, properties) }
// Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils,
dynamicConfigHandlers) dynamicConfigManager.startup()
/* tell everyone we are alive */ // endpoint节点监听 val listeners = config.advertisedListeners.map {case(protocol,
endpoint) => if (endpoint.port == 0) (protocol, EndPoint(endpoint.host, socketServer.boundPort
(protocol), endpoint.protocolType)) else (protocol, endpoint) } kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,
zkUtils) kafkaHealthcheck.startup()
/* register broker metrics */ 监控 registerStats()
shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString) info("started") } }
4、broker的状态探秘: brokerState.newState(Starting)->trait BrokerStates 进入trait BrokerStates.scala
** * Broker states are the possible state that a kafka broker can be in. * A broker should be only in one state at a time. * The expected state transition with the following defined states is: * * +-----------+ * |Not Running| * +-----+-----+ * | * v * +-----+-----+ * |Starting +--+ * +-----+-----+ | +----+------------+ * | +>+RecoveringFrom | * v |UncleanShutdown | * +----------+ +-----+-----+ +-------+---------+ * |RunningAs | |RunningAs | | * |Controller++Broker + |PendingControlled | * |Shutdown | * +-----+------------+ * | * v * +-----+----------+ * |BrokerShutting | * |Down | * +-----+----------+ * | * v * +-----+-----+ * |Not Running| * +-----------+ *
case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state:
Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object RunningAsController extends BrokerStates { val state: Byte = 4 } case object PendingControlledShutdown extends BrokerStates { val state: Byte
= 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
5、kafkaScheduler调度器 kafkaScheduler.startup()->kafkaScheduler.scala 进入kafkaScheduler.scala
override def startup() { debug("Initializing task scheduler.") this synchronized { if(isStarted) throw new IllegalStateException("This scheduler has already been started!") executor = new ScheduledThreadPoolExecutor(threads) executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) executor.setThreadFactory(new ThreadFactory() { def newThread(runnable: Runnable): Thread = Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon) }) } }