public class KafkaConfig {
public static final String TOPIC_NAME = "topic-test";
private KafkaProperties kafkaProperties;
public NewTopic topicHello(){
//创建Topic : topic名字, partition数量 , replicas副本数量
return TopicBuilder.name(TOPIC_NAME).build();
public ProducerFactory kafkaProducerFactory() {
Map props = kafkaProperties.buildProducerProperties();
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(props);
String transactionIdPrefix = this.kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
return factory;
public KafkaTemplate kafkaTemplate(
ProducerFactory kafkaProducerFactory,
ProducerListener kafkaProducerListener,
ObjectProvider messageConverter) {
KafkaTemplate kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
//设置默认的topic 可以在yml指定: spring.kafka.template.default-topic: 默认Topic
return kafkaTemplate;
- KafkaProperties :该properties是SpringBoot读取yaml中的kafka配置的。
- ProducerFactory : 这个是生成Producer实例的工厂类,使用的是DefaultKafkaProducerFactory,他需要一个
的配置对象,也就是对Kafka的配置,我这里把KafkaProperties转成了map扔给他 - KafkaTemplate : 用来发消息的模板对象
Producer 拦截器(interceptor )是 个相当新的功能,它和 Consumer Interceptor 是在 Kafka 版本中被引入的,主要用于实现 clients 端的定制化控制逻辑。对于 producer 而言, interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对,消息做些定制化需求,比如修改消息等, 同时 producer 允许用户指定多个 interceptor 按顺序作用于同一条消息,从而形成 个拦截链( interceptor chain )intercetpor 的实现接口是 org.apache.kafka.clients.producer.Producerlnterceptor 。下面我们定义一个拦截器
public class MYProducerlnterceptor implements ProducerInterceptor {
private int successCount = 0 ;
private int errorCount = 0 ;
public ProducerRecord onSend(ProducerRecord producerRecord) {
log.info("拦截器执行onSend , producerRecord = {}" ,producerRecord);
return producerRecord;
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
log.info("拦截器执行 onAcknowledgement ,recordMetadata = {} , e = {}",recordMetadata ,e);
if(e == null){
log.info("successCount = {} ; errorCount = {}" ,successCount , errorCount );
public void close() {
log.info("拦截器执行 close");
public void configure(Map map) {
log.info("拦截器执行 configure ;map = {}",map);
- onSend(Producer Record):该方法封装进 KafkaProducer.send 法中,即它运行在用户主线程中 ,producer 确保在消息被序列化以计算分区前调用该方法,用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
- onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,井且通常都是在 producer 回调逻辑触发之前 onAcknowledgement 运行在 producer I/0 线程中,因此不要在该方法中放入很“重”的逻辑,否则会拖慢producer 的消息发送效率。
- close :关闭 interceptor 主要用于执行一些资源清理工作。
- configure :该方法是在构建KafkaProducer时被调用,可以通过修改map中值来修改kafka的配置。
我们不妨思考一下,拦截器和监听器有什么作用呢?我们知道拦截器就是基于 AOP 思想实现,它的onSend方法是可以接受一个 ProducerRecord 对象的,该对象是发给Kafka的一个消息记录对象(topic,partition,key,value都在里面),那如果我要为所有的生产者发送的消息做统一处理,比如:在value中加上时间戳。那在onSend方法中实现是不是就很方便呢。
public ProducerRecord onSend(ProducerRecord record){
return new ProducerRecord( record.topic(), record.partition(), record.timestamp(), record. key(),
System.currentTimeMillis () +","+ record.value() .toString());
而在 onAcknowledgement 方法中我们可以根据Exception是否为空来判断请求是否发送成功。Close方法就可以做一些资源释放等扫尾工作。
接下来就是让拦截器起作用,我们需要修改KafkaConfig,在定义 ProducerFactory 的方法中增加这么一行
Map props = kafkaProperties.buildProducerProperties();
// 配置生产者拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MYProducerlnterceptor.class.getName());
如果要添加多个拦截器,那么只需要给一个 连接器class类名的集合即可,比如:props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("拦截器","拦截器"));
Kafka提供了生产者监听器 ProducerListener
,他的作用类似于带回调的KafkaTemplate#send(callback) ;
可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:
public interface ProducerListener {
/** brocker 确认之后触发onSuccess
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
* @param producerRecord the actual sent record
* @param recordMetadata the result of the successful send operation
default void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
onSuccess(producerRecord.topic(), producerRecord.partition(),
producerRecord.key(), producerRecord.value(), recordMetadata);
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
* If the method receiving the ProducerRecord is overridden, this method won't be called
* @param topic the destination topic
* @param partition the destination partition (could be null)
* @param key the key of the outbound message
* @param value the payload of the outbound message
* @param recordMetadata the result of the successful send operation
default void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata) {
* Invoked after an attempt to send a message has failed.
* @param producerRecord the failed record
* @param exception the exception thrown
default void onError(ProducerRecord producerRecord, Exception exception) {
onError(producerRecord.topic(), producerRecord.partition(),
producerRecord.key(), producerRecord.value(), exception);
* Invoked after an attempt to send a message has failed.
* If the method receiving the ProducerRecord is overridden, this method won't be called
* @param topic the destination topic
* @param partition the destination partition (could be null)
* @param key the key of the outbound message
* @param value the payload of the outbound message
* @param exception the exception thrown
default void onError(String topic, Integer partition, K key, V value, Exception exception) {
* Return true if this listener is interested in success as well as failure.
* @deprecated the result of this method will be ignored.
* @return true to express interest in successful sends.
default boolean isInterestedInSuccess() {
return false;
public class KafkaProducerListener implements ProducerListener {
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("监听器 onError 执行 ProducerRecord= {} , exception = {}",producerRecord,exception);
public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
log.info("KafkaProducerListener 发送失败"+"topic = "+topic +" ; partion = "+partition +"; key = "+key + " ; value="+value);
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("KafkaProducerListener 发送成功 "+producerRecord.toString());
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
log.info("KafkaProducerListener 发送成功 topic = "+topic +" ; partion = "+partition +"; key = "+key + " ; value="+value);
public KafkaTemplate kafkaTemplate(
ProducerFactory kafkaProducerFactory,
ProducerListener kafkaProducerListener,
ObjectProvider messageConverter) {
KafkaTemplate kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
//设置默认的topic 可以在yml指定: spring.kafka.template.default-topic: 默认Topic
return kafkaTemplate;
然后就是测试,使用Postmain请求接口 控制台打印效果如下
拦截器可以用来对请求进行统一拦截器,对请求的数据做统一增强(增加内容前缀,时间戳什么的),而监听器是可以监听到消息发送成功或失败。都能拿到 ProducerRecord 消息记录对象。如果是发送失败,我是否可用根据ProducerRecord进行重试,或者把ProducerRecord写到数据库中,使用定时任务不停的去重试,超过一定重试次数就给指定邮箱发送消息重试失败日志。方便联系程序员进行抢修。