一、默认分区器DefaultPartitioner
org.apache.kafka.clients.producer.internals.DefaultPartitioner
二、默认分区器获取分区
如果消息的 key 为 null,此时 producer 会使用默认的 partitioner 分区器将消息随机分布到 topic 的可用 partition 中。
如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值,使用 hash 值与 partition 数量取模,从而确定发送到哪个分区。注意:此时 key 相同的消息会发送到相同的分区(只要 partition 的数量不变化)。
/**
* Compute the partition for the given record.
* 计算partition
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//获取指定topic的partitions
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
//key=null
if (keyBytes == null) {
int nextValue = nextValue(topic);
//可用分区
List availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
//消息随机分布到topic可用的partition中
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
//如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值
} else {//通过hash获取partition
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
三、自定义分区器: 在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区
package com.chb.partitioner;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class MyPartitioner implements Partitioner {
public static void main(String[] args) {
//org.apache.kafka.clients.producer.internals.DefaultPartitioner
}
@Override
public void configure(Map configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
/**
*由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
*/
if(keyBytes == null) {
throw new InvalidRecordException("key cannot be null");
}
if(((String)key).equals("1")) {
return 1;
}
//如果消息的key值不为1,那么使用hash值取模,确定分区。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
}
3.1、在生产者中使用自定义分区器
在kafka配置参数时设置分区器的类
//设置自定义分区
kafkaProps.put("partitioner.class", "com.chb.partitioner.MyPartitioner");