您当前的位置: 首页 >  kafka

段智华

暂无认证

  • 0浏览

    0关注

    1232博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

第四步:kafka中建立kafka-topics 源代码内幕解密

段智华 发布时间:2016-05-03 09:54:51 ,浏览量:0

第四步:kafka中建立kafka-topics 源代码内幕解密

kafka-topics.sh --create 脚本命令执行create topic语句,进入到kafka.admin.TopicCommand类,main方法中执行createTopic方法,然后一步一步的跟踪下去,topic在zooker中持久化保存topic的元数据。从而建立一个topic主题

 

1、kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181

--replication-factor 1 --partitions 1 --  topic SparkStreamingDirected

2、kafka-topics.sh -〉    exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

3、kafka.admin.TopicCommand -〉     if(opts.options.has(opts.createOpt))         createTopic(zkUtils, opts)

4、createTopic方法

  def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {     val topic = opts.options.valueOf(opts.topicOpt)     val configs = parseTopicConfigsToBeAdded(opts)     if (Topic.hasCollisionChars(topic))       println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")     if (opts.options.has(opts.replicaAssignmentOpt)) {       val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))       warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)     } else {       CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)       val partitions = opts.options.valueOf(opts.partitionsOpt).intValue       val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue       warnOnMaxMessagesChange(configs, replicas)       AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs)     }     println("Created topic \"%s\".".format(topic))   }

 

5、AdminUtils.scala

def createTopic(zkUtils: ZkUtils,                   topic: String,                   partitions: Int,                   replicationFactor: Int,                   topicConfig: Properties = new Properties) {     val brokerList = zkUtils.getSortedBrokerList()     val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)   }

6\createOrUpdateTopicPartitionAssignmentPathInZK方法

 

def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,                                                      topic: String,                                                      partitionReplicaAssignment: Map[Int, Seq[Int]],                                                      config: Properties = new Properties,                                                      update: Boolean = false) {     // validate arguments     Topic.validate(topic)     require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")

    val topicPath = getTopicPath(topic)

    if (!update) {       if (zkUtils.zkClient.exists(topicPath))         throw new TopicExistsException("Topic \"%s\" already exists.".format(topic))       else if (Topic.hasCollisionChars(topic)) {         val allTopics = zkUtils.getAllTopics()         val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t))         if (collidingTopics.nonEmpty) {           throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", ")))         }       }     }

    partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: "  + partitionReplicaAssignment))

    // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported     if (!update) {       // write out the config if there is any, this isn't transactional with the partition assignments       LogConfig.validate(config)       writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)     }

    // create the partition assignment     writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)   }

7、writeTopicPartitionAssignment方法

private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) {     try {       val zkPath = getTopicPath(topic)       val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))

      if (!update) {         info("Topic creation " + jsonPartitionData.toString)         zkUtils.createPersistentPath(zkPath, jsonPartitionData)       } else {         info("Topic update " + jsonPartitionData.toString)         zkUtils.updatePersistentPath(zkPath, jsonPartitionData)       }       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData))     } catch {       case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic))       case e2: Throwable => throw new AdminOperationException(e2.toString)     }   }

9/到了ZkUtils 更新保存topic

  def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls) = {     try {       zkClient.writeData(path, data)     } catch {       case e: ZkNoNodeException => {         createParentPath(path)         try {           ZkPath.createPersistent(zkClient, path, data, acls)         } catch {           case e: ZkNodeExistsException =>             zkClient.writeData(path, data)           case e2: Throwable => throw e2         }       }       case e2: Throwable => throw e2     }   }

 

关注
打赏
1659361485
查看更多评论
立即登录/注册

微信扫码登录

0.0405s