第四步:kafka中建立kafka-topics 源代码内幕解密
kafka-topics.sh --create 脚本命令执行create topic语句,进入到kafka.admin.TopicCommand类,main方法中执行createTopic方法,然后一步一步的跟踪下去,topic在zooker中持久化保存topic的元数据。从而建立一个topic主题
1、kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181
--replication-factor 1 --partitions 1 -- topic SparkStreamingDirected
2、kafka-topics.sh -〉 exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
3、kafka.admin.TopicCommand -〉 if(opts.options.has(opts.createOpt)) createTopic(zkUtils, opts)
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) { val topic = opts.options.valueOf(opts.topicOpt) val configs = parseTopicConfigsToBeAdded(opts) if (Topic.hasCollisionChars(topic)) println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.") if (opts.options.has(opts.replicaAssignmentOpt)) { val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt)) warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false) } else { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt) val partitions = opts.options.valueOf(opts.partitionsOpt).intValue val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue warnOnMaxMessagesChange(configs, replicas) AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs) } println("Created topic \"%s\".".format(topic)) }
def createTopic(zkUtils: ZkUtils, topic: String, partitions: Int, replicationFactor: Int, topicConfig: Properties = new Properties) { val brokerList = zkUtils.getSortedBrokerList() val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor) AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig) }
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils, topic: String, partitionReplicaAssignment: Map[Int, Seq[Int]], config: Properties = new Properties, update: Boolean = false) { // validate arguments Topic.validate(topic) require(partitionReplicaAssignment.values.map(_.size).toSet.size == 1, "All partitions should have the same number of replicas.")
val topicPath = getTopicPath(topic)
if (!update) { if (zkUtils.zkClient.exists(topicPath)) throw new TopicExistsException("Topic \"%s\" already exists.".format(topic)) else if (Topic.hasCollisionChars(topic)) { val allTopics = zkUtils.getAllTopics() val collidingTopics = allTopics.filter(t => Topic.hasCollision(topic, t)) if (collidingTopics.nonEmpty) { throw new InvalidTopicException("Topic \"%s\" collides with existing topics: %s".format(topic, collidingTopics.mkString(", "))) } } }
partitionReplicaAssignment.values.foreach(reps => require(reps.size == reps.toSet.size, "Duplicate replica assignment found: " + partitionReplicaAssignment))
// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported if (!update) { // write out the config if there is any, this isn't transactional with the partition assignments LogConfig.validate(config) writeEntityConfig(zkUtils, ConfigType.Topic, topic, config) }
// create the partition assignment writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update) }
private def writeTopicPartitionAssignment(zkUtils: ZkUtils, topic: String, replicaAssignment: Map[Int, Seq[Int]], update: Boolean) { try { val zkPath = getTopicPath(topic) val jsonPartitionData = zkUtils.replicaAssignmentZkData(replicaAssignment.map(e => (e._1.toString -> e._2)))
if (!update) { info("Topic creation " + jsonPartitionData.toString) zkUtils.createPersistentPath(zkPath, jsonPartitionData) } else { info("Topic update " + jsonPartitionData.toString) zkUtils.updatePersistentPath(zkPath, jsonPartitionData) } debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionData)) } catch { case e: ZkNodeExistsException => throw new TopicExistsException("topic %s already exists".format(topic)) case e2: Throwable => throw new AdminOperationException(e2.toString) } }
9/到了ZkUtils 更新保存topic
def updatePersistentPath(path: String, data: String, acls: java.util.List[ACL] = DefaultAcls) = { try { zkClient.writeData(path, data) } catch { case e: ZkNoNodeException => { createParentPath(path) try { ZkPath.createPersistent(zkClient, path, data, acls) } catch { case e: ZkNodeExistsException => zkClient.writeData(path, data) case e2: Throwable => throw e2 } } case e2: Throwable => throw e2 } }