您当前的位置: 首页 >  kafka

段智华

暂无认证

  • 0浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

第三步:kafka的server启动过程 源代码运行内幕机制

段智华 发布时间:2016-05-03 08:02:16 ,浏览量:0

 kafka分布式集群启动比spark分布式集群启动相对简化的思考, 1、spark分布式集群节点的启动,只在spark master节点执行start-all.sh,master节点会自动ssh到分布式的其他worker节点,将cmd命令发送过去,在worker节点jvm中自动就加载了main类来启动,如CorseGrainExecutorBackend,从而完成了master与worker的通信。

2、kafka就不一样,kafka在每台broker节点上都需手工启动kafka-server-start.sh,这样就简化了许多,在每个broker节点上就可以通过socket来通信了。

这么理解吧: 1、spark相当于一个小区的一个基站,基站起来了,分布式节点(手机)就自动带起来了,手机可以上网、打电话 2、kafka相当于固定电话,要每户每户进行安装,每一户人家(分布式broker)人工安装好了,就可以打电话了。电话调度executor使用FIFO,先来的电话先通,后来的电话就等待。

以上个人想的,可能描述不一定准确。

 

 

1、 kafka.kafka ->main ->kafkaServerStartable.startup

2、进入 kafkaServerStartable.scala -〉def startup() -〉kafka.server.startup()

3、进入kafka.server.scala: canStartup启动

 

if (canStartup) {         metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime,

true)

        brokerState.newState(Starting)

        /* start scheduler */ //启动kafka调度器         kafkaScheduler.startup()

        /* setup zookeeper */ //初始化zookeeper         zkUtils = initZk()

        /* start log manager */ //启动日志管理         logManager = createLogManager(zkUtils.zkClient, brokerState)         logManager.startup()

        /* generate brokerId */ //生成 brokerId         config.brokerId =  getBrokerId         this.logIdent = "[Kafka Server " + config.brokerId + "], "

        socketServer = new SocketServer(config, metrics, kafkaMetricsTime)         socketServer.startup()  //socketServer启动

        /* start replica manager */ 启动副本管理         replicaManager = new ReplicaManager(config, metrics, time,

kafkaMetricsTime, zkUtils, kafkaScheduler, logManager,           isShuttingDown)         replicaManager.startup()

        /* start kafka controller */ //启动kafka控制器         kafkaController = new KafkaController(config, zkUtils, brokerState,

kafkaMetricsTime, metrics, threadNamePrefix)         kafkaController.startup()

        /* start kafka coordinator */ //启动kafka协调器         consumerCoordinator = GroupCoordinator.create(config, zkUtils,

replicaManager)         consumerCoordinator.startup()

        /* Get the authorizer and initialize it if one is specified.*///启动

安全认证         authorizer = Option(config.authorizerClassName).filter

(_.nonEmpty).map { authorizerClassName =>           val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)           authZ.configure(config.originals())           authZ         }

        /* start processing requests */ //启动kakfa的请求线程池         apis = new KafkaApis(socketServer.requestChannel, replicaManager,

consumerCoordinator,           kafkaController, zkUtils, config.brokerId, config, metadataCache,

metrics, authorizer)         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId,

socketServer.requestChannel, apis, config.numIoThreads)         brokerState.newState(RunningAsBroker)

        Mx4jLoader.maybeLoad()

        /* start dynamic config manager */ //kafka动态配置管理         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic

-> new TopicConfigHandler(logManager),                                                            ConfigType.Client

-> new ClientIdConfigHandler(apis.quotaManagers))

        // Apply all existing client configs to the ClientIdConfigHandler to

bootstrap the overrides         // TODO: Move this logic to DynamicConfigManager         AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach

{           case (clientId, properties) => dynamicConfigHandlers

(ConfigType.Client).processConfigChanges(clientId, properties)         }

        // Create the config manager. start listening to notifications         dynamicConfigManager = new DynamicConfigManager(zkUtils,

dynamicConfigHandlers)         dynamicConfigManager.startup()

        /* tell everyone we are alive */ // endpoint节点监听         val listeners = config.advertisedListeners.map {case(protocol,

endpoint) =>           if (endpoint.port == 0)             (protocol, EndPoint(endpoint.host, socketServer.boundPort

(protocol), endpoint.protocolType))           else             (protocol, endpoint)         }         kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners,

zkUtils)         kafkaHealthcheck.startup()

        /* register broker metrics */ 监控         registerStats()

        shutdownLatch = new CountDownLatch(1)         startupComplete.set(true)         isStartingUp.set(false)         AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString)         info("started")       }     }

4、broker的状态探秘:  brokerState.newState(Starting)->trait BrokerStates  进入trait BrokerStates.scala

 

**  * Broker states are the possible state that a kafka broker can be in.  * A broker should be only in one state at a time.  * The expected state transition with the following defined states is:  *  *                +-----------+  *                |Not Running|  *                +-----+-----+  *                      |  *                      v  *                +-----+-----+  *                |Starting   +--+  *                +-----+-----+  | +----+------------+  *                      |        +>+RecoveringFrom   |  *                      v          |UncleanShutdown  |  * +----------+     +-----+-----+  +-------+---------+  * |RunningAs |     |RunningAs  |            |  * |Controller++Broker     + |PendingControlled |  *                |Shutdown          |  *                +-----+------------+  *                      |  *                      v  *               +-----+----------+  *               |BrokerShutting  |  *               |Down            |  *               +-----+----------+  *                     |  *                     v  *               +-----+-----+  *               |Not Running|  *               +-----------+  *

case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state:

Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object RunningAsController extends BrokerStates { val state: Byte = 4 } case object PendingControlledShutdown extends BrokerStates { val state: Byte

= 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }

5、kafkaScheduler调度器 kafkaScheduler.startup()->kafkaScheduler.scala 进入kafkaScheduler.scala

 

 override def startup() {     debug("Initializing task scheduler.")     this synchronized {       if(isStarted)         throw new IllegalStateException("This scheduler has already been started!")       executor = new ScheduledThreadPoolExecutor(threads)       executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)       executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)       executor.setThreadFactory(new ThreadFactory() {                                   def newThread(runnable: Runnable): Thread =                                     Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)                                 })     }   }

 

 

 

 

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.0439s