上一章讲了一下Kafka在SpringBoot如何使用,本篇文章在上一篇文章的基础上讲解如何给生产者添加监听器和拦截器
自定义KafkaTemplate我这里是使用的是SpringBoot整合Kafka,SpringBoot对Kafka已经进行了自动配置,也就是我们在使用的是是否直接注入KafkaTemplate即可使用,如果需要给KafkaTemplte添加额外配置,则需要自定义KafkaTemplate的Bean。我这里仿照SpringBoot的自动配置源码写了一个Kafka的自定义配置类,你可以直接拷贝过去使用。
@Configuration
@EnableKafka
public class KafkaConfig {
//Topic的名字
public static final String TOPIC_NAME = "topic-test";
//读取yml中的kafka配置的属性对象
@Autowired
private KafkaProperties kafkaProperties;
//通过定义Bean的方式创建Topic
@Bean
public NewTopic topicHello(){
//创建Topic : topic名字, partition数量 , replicas副本数量
return TopicBuilder.name(TOPIC_NAME).build();
}
@Bean
public ProducerFactory kafkaProducerFactory() {
//kafka的配置
Map props = kafkaProperties.buildProducerProperties();
//创建生产者工厂
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(props);
//处理事务ID前缀
String transactionIdPrefix = this.kafkaProperties.getProducer().getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
@Bean
public KafkaTemplate kafkaTemplate(
//生产者工厂
ProducerFactory kafkaProducerFactory,
//生产者监听器
ProducerListener kafkaProducerListener,
//消息转换器
ObjectProvider messageConverter) {
//创建kafkaTemplate
KafkaTemplate kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
//设置消息转换器
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
//设置消息监听器
kafkaTemplate.setProducerListener(kafkaProducerListener);
//设置默认的topic 可以在yml指定: spring.kafka.template.default-topic: 默认Topic
kafkaTemplate.setDefaultTopic(this.kafkaProperties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
}
- KafkaProperties :该properties是SpringBoot读取yaml中的kafka配置的。
- ProducerFactory : 这个是生成Producer实例的工厂类,使用的是DefaultKafkaProducerFactory,他需要一个
Map
的配置对象,也就是对Kafka的配置,我这里把KafkaProperties转成了map扔给他 - KafkaTemplate : 用来发消息的模板对象
Producer 拦截器(interceptor )是 个相当新的功能,它和 Consumer Interceptor 是在 Kafka 0.10.0.0 版本中被引入的,主要用于实现 clients 端的定制化控制逻辑。对于 producer 而言, interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对,消息做些定制化需求,比如修改消息等, 同时 producer 允许用户指定多个 interceptor 按顺序作用于同一条消息,从而形成 个拦截链( interceptor chain )intercetpor 的实现接口是 org.apache.kafka.clients.producer.Producerlnterceptor 。下面我们定义一个拦截器
@Slf4j
public class MYProducerlnterceptor implements ProducerInterceptor {
private int successCount = 0 ;
private int errorCount = 0 ;
//发送消息的时候被执行,producerRecord:消息对象
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
log.info("拦截器执行onSend , producerRecord = {}" ,producerRecord);
//可以对producerRecord做修改
return producerRecord;
}
//会在消息被应答之前或消息发送失败时调用
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
log.info("拦截器执行 onAcknowledgement ,recordMetadata = {} , e = {}",recordMetadata ,e);
//统计成功或失败次数
if(e == null){
successCount++;
}else{
errorCount++;
}
log.info("successCount = {} ; errorCount = {}" ,successCount , errorCount );
}
@Override
public void close() {
log.info("拦截器执行 close");
}
@Override
public void configure(Map map) {
log.info("拦截器执行 configure ;map = {}",map);
}
}
- onSend(Producer Record):该方法封装进 KafkaProducer.send 法中,即它运行在用户主线程中 ,producer 确保在消息被序列化以计算分区前调用该方法,用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。
- onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,井且通常都是在 producer 回调逻辑触发之前 onAcknowledgement 运行在 producer I/0 线程中,因此不要在该方法中放入很“重”的逻辑,否则会拖慢producer 的消息发送效率。
- close :关闭 interceptor 主要用于执行一些资源清理工作。
- configure :该方法是在构建KafkaProducer时被调用,可以通过修改map中值来修改kafka的配置。
我们不妨思考一下,拦截器和监听器有什么作用呢?我们知道拦截器就是基于 AOP 思想实现,它的onSend方法是可以接受一个 ProducerRecord 对象的,该对象是发给Kafka的一个消息记录对象(topic,partition,key,value都在里面),那如果我要为所有的生产者发送的消息做统一处理,比如:在value中加上时间戳。那在onSend方法中实现是不是就很方便呢。
@Override
public ProducerRecord onSend(ProducerRecord record){
return new ProducerRecord( record.topic(), record.partition(), record.timestamp(), record. key(),
System.currentTimeMillis () +","+ record.value() .toString());
}
而在 onAcknowledgement 方法中我们可以根据Exception是否为空来判断请求是否发送成功。Close方法就可以做一些资源释放等扫尾工作。
接下来就是让拦截器起作用,我们需要修改KafkaConfig,在定义 ProducerFactory 的方法中增加这么一行
Map props = kafkaProperties.buildProducerProperties();
// 配置生产者拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MYProducerlnterceptor.class.getName());
如果要添加多个拦截器,那么只需要给一个 连接器class类名的集合即可,比如:props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList("拦截器","拦截器"));
Kafka提供了生产者监听器 ProducerListener
,他的作用类似于带回调的KafkaTemplate#send(callback) ;
可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:
public interface ProducerListener {
/** brocker 确认之后触发onSuccess
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
* @param producerRecord the actual sent record
* @param recordMetadata the result of the successful send operation
*/
default void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
onSuccess(producerRecord.topic(), producerRecord.partition(),
producerRecord.key(), producerRecord.value(), recordMetadata);
}
/**
* Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).
* If the method receiving the ProducerRecord is overridden, this method won't be called
* @param topic the destination topic
* @param partition the destination partition (could be null)
* @param key the key of the outbound message
* @param value the payload of the outbound message
* @param recordMetadata the result of the successful send operation
*/
default void onSuccess(String topic, Integer partition, K key, V value, RecordMetadata recordMetadata) {
}
/**
* Invoked after an attempt to send a message has failed.
* @param producerRecord the failed record
* @param exception the exception thrown
*/
default void onError(ProducerRecord producerRecord, Exception exception) {
onError(producerRecord.topic(), producerRecord.partition(),
producerRecord.key(), producerRecord.value(), exception);
}
/**
* Invoked after an attempt to send a message has failed.
* If the method receiving the ProducerRecord is overridden, this method won't be called
* @param topic the destination topic
* @param partition the destination partition (could be null)
* @param key the key of the outbound message
* @param value the payload of the outbound message
* @param exception the exception thrown
*/
default void onError(String topic, Integer partition, K key, V value, Exception exception) {
}
/**
* Return true if this listener is interested in success as well as failure.
* @deprecated the result of this method will be ignored.
* @return true to express interest in successful sends.
*/
@Deprecated
default boolean isInterestedInSuccess() {
return false;
}
}
注意:根据onSuccess方法上的注释我们可以了解到,该方法是在消息达到brocker,brocker应答之后才会触发。下面我们来编写一个监听器
@Component
@Slf4j
public class KafkaProducerListener implements ProducerListener {
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("监听器 onError 执行 ProducerRecord= {} , exception = {}",producerRecord,exception);
}
@Override
public void onError(String topic, Integer partition, Object key, Object value, Exception exception) {
log.info("KafkaProducerListener 发送失败"+"topic = "+topic +" ; partion = "+partition +"; key = "+key + " ; value="+value);
System.out.println(exception.getMessage());
}
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("KafkaProducerListener 发送成功 "+producerRecord.toString());
}
@Override
public void onSuccess(String topic, Integer partition, Object key, Object value, RecordMetadata recordMetadata) {
log.info("KafkaProducerListener 发送成功 topic = "+topic +" ; partion = "+partition +"; key = "+key + " ; value="+value);
}
}
接着在定义KafkaTemplate的时候需要添加ProducerListener
@Bean
public KafkaTemplate kafkaTemplate(
//生产者工厂
ProducerFactory kafkaProducerFactory,
//生产者监听器
ProducerListener kafkaProducerListener,
//消息转换器
ObjectProvider messageConverter) {
//创建kafkaTemplate
KafkaTemplate kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
//设置消息转换器
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
//设置消息监听器
kafkaTemplate.setProducerListener(kafkaProducerListener);
//设置默认的topic 可以在yml指定: spring.kafka.template.default-topic: 默认Topic
kafkaTemplate.setDefaultTopic(this.kafkaProperties.getTemplate().getDefaultTopic());
return kafkaTemplate;
}
然后就是测试,使用Postmain请求接口 控制台打印效果如下
可以看到拦截器是在监听器之前被执行的。那监听器可以用来做什么?
拦截器可以用来对请求进行统一拦截器,对请求的数据做统一增强(增加内容前缀,时间戳什么的),而监听器是可以监听到消息发送成功或失败。都能拿到 ProducerRecord 消息记录对象。如果是发送失败,我是否可用根据ProducerRecord进行重试,或者把ProducerRecord写到数据库中,使用定时任务不停的去重试,超过一定重试次数就给指定邮箱发送消息重试失败日志。方便联系程序员进行抢修。
就写到这里把,如果文章对你有所帮助请给个好评。