您当前的位置: 首页 >  ar

宝哥大数据

暂无认证

  • 1浏览

    0关注

    1029博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

kafka---partitioner及自定义

宝哥大数据 发布时间:2018-01-04 17:18:16 ,浏览量:1

一、默认分区器DefaultPartitioner
org.apache.kafka.clients.producer.internals.DefaultPartitioner
二、默认分区器获取分区 如果消息的 key 为 null,此时 producer 会使用默认的 partitioner 分区器将消息随机分布到 topic 的可用 partition 中。 如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值,使用 hash 值与 partition 数量取模,从而确定发送到哪个分区。注意:此时 key 相同的消息会发送到相同的分区(只要 partition 的数量不变化)。
    /**
     * Compute the partition for the given record.
     * 计算partition
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取指定topic的partitions
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //key=null 
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            //可用分区
            List availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                //消息随机分布到topic可用的partition中
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
            //如果 key 不为 null,并且使用了默认的分区器,kafka 会使用自己的 hash 算法对 key 取 hash 值
        } else {//通过hash获取partition
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
三、自定义分区器: 在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区
package com.chb.partitioner;

import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

public class MyPartitioner implements Partitioner {
    public static void main(String[] args) {
        //org.apache.kafka.clients.producer.internals.DefaultPartitioner
    }

    @Override
    public void configure(Map configs) {
    }


    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
            Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        /**
         *由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
         */
        if(keyBytes == null) {
            throw new InvalidRecordException("key cannot be null");
        }
        if(((String)key).equals("1")) {
            return 1;
        }
        //如果消息的key值不为1,那么使用hash值取模,确定分区。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {
    }

}
3.1、在生产者中使用自定义分区器 在kafka配置参数时设置分区器的类
        //设置自定义分区
        kafkaProps.put("partitioner.class", "com.chb.partitioner.MyPartitioner");
关注
打赏
1587549273
查看更多评论
立即登录/注册

微信扫码登录

0.0432s