您当前的位置: 首页 >  kafka

顧棟

暂无认证

  • 2浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

kafka 1.1 创建Topic时分区分配分析

顧棟 发布时间:2021-06-23 23:30:55 ,浏览量:2

文章目录
  • kafka 1.1 创建Topic时 分区分配分析
    • 分区副本分配方式
      • 不考虑机架因素进行分区分配
        • 主要方法`assignReplicasToBrokersRackUnaware`代码
        • 分区下标计算方法
      • 情况模拟
    • 考虑机架因素进行分区分配

kafka 1.1 创建Topic时 分区分配分析

分区分配指的是为集群创建Topic时的partition的副本分配,就是Topic的partition分配在哪些broker。

分区副本分配方式 不考虑机架因素进行分区分配 主要方法assignReplicasToBrokersRackUnaware代码
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
                                               replicationFactor: Int,
                                               brokerList: Seq[Int],
                                               fixedStartIndex: Int,
                                               startPartitionId: Int): Map[Int, Seq[Int]] = {
  val ret = mutable.Map[Int, Seq[Int]]()
  val brokerArray = brokerList.toArray
  // 根据brokers长度随机产生一个数 作为开始下标
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  var currentPartitionId = math.max(0, startPartitionId)
  // 代表副本之间的broker间隔数,为了将副本分片更均匀的分配到brokers中
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  // 轮询所有分区,将每个分区的副本分配到broker中
  for (_  0 && (currentPartitionId % brokerArray.length == 0))
      nextReplicaShift += 1
    // 第一个副本的下标是 当前分区编号+startIndex 后 与broker的个数取余数
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
    // 确定了分区的第一个副本的broker之后 通过 replicaIndex获取其余副本的broker
    for (j = 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
  var currentPartitionId = math.max(0, startPartitionId)
  //  代表副本之间的broker间隔数,为了将副本分片更均匀的分配到brokers中
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(arrangedBrokerList.size)
  for (_  0 && (currentPartitionId % arrangedBrokerList.size == 0))
      nextReplicaShift += 1
    val firstReplicaIndex = (currentPartitionId + startIndex) % arrangedBrokerList.size
    // 找到分区的第一个副本的broker编号
    val leader = arrangedBrokerList(firstReplicaIndex)
    val replicaBuffer = mutable.ArrayBuffer(leader)
    // 找到已分配的broker的机架的信息
    val racksWithReplicas = mutable.Set(brokerRackMap(leader))
    val brokersWithReplicas = mutable.Set(leader)
    var k = 0
    // 继续为分区分配剩余的副本
    for (_             
关注
打赏
1663402667
查看更多评论
0.0419s