序言
批量处理的添加域参数设置,也算是kafka调优的一种切入点cuiyaonan2000@163.com
- 添加批处理来提升任务处理效率。
- 具体针对批量处理的一些参数设置,来进一步优化处理任务的效率。(kafka的服务的参数配置种类很多,有很多参数配置的技术点还是挺深的,但是我们可以根据批处理的参数设置为切入点,熟悉了解全面的参数设置cuiyaonan2000@163.com)
spring:
kafka:
#kafka的访问地址,多个用","隔开 这个是生产者和消费者一起设置的kafka集群地址
#bootstrap-servers: 192.168.5.10:9092
listener:
#指定listener 容器中的线程数,用于提高并发量
#对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数
#concurrency * @KafkaListener的数量(默认监听全部的partition)
#当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
#当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
#当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
#集群情况下,一个jvm多个consumer都算在一个group中,不会出现重复消费的情况。
concurrency: 3
#说明:使用批量消费需要将listener的type设置为batch,该值默认为single
#在这里是全局设置,所以在这里设置成batch好,单条消费就被影响了cuiyaonan2000@163.com
type: single
consumer:
#关闭自动提交 改由spring-kafka提交
#kafka.consumer.enable.auto.commit=false 意思是让spring自动去提交offset
#kafka.consumer.enable.auto.commit=true 意识是让kafka去自动提交offset
enable-auto-commit: false
#群组ID #默认组id 后面会配置多个消费者组
group-id: kafka
#earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
#批量消费 一次接收的最大数量
max-poll-records: 20
bootstrap-servers: 192.168.137.100:9092, 192.168.137.100:9093
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
手动创建批处理
package cui.yao.nan.highlevetest.kafka.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.stereotype.Component;
/**
* @Title: KafkaConsumer.java
* @Package nan.yao.cui.kafka.consumer
* @Description: TODO
* @author cuiyaonan2000@163.com
* @date 2019年8月27日 下午1:42:56
* @version V1.0
*/
@Component
public class KafkaConsumer {
@KafkaListener(topics = { "singleMessage" })
public void listen(String message) {
System.out.println("topic:singleMessage收到的信息是:" + message);
}
@KafkaListener(topics = { "batchMessage" },containerFactory = "myBatchFactory")
public void listenMore(List message) {
System.out.println("topic:batchMessage收到的信息个数:" + message.size());
}
@Bean(name="myBatchFactory")
KafkaListenerContainerFactory batchFactory() {
ConcurrentKafkaListenerContainerFactory factory = new
ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
factory.setBatchListener(true); // 开启批量监听
factory.setConcurrency(2);//设置监听线程数
return factory;
}
/**
* @author 崔耀男
* @des: TODO
* @date 2021年7月1日 下午5:35:14
* @params: @return
* @return: Map
*/
@Bean
public Map consumerConfigs() {
Map props = new HashMap();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "IAmGroupId");
//配置文件中有针对该类型的说明,对应auto_offset_reset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//这里就是kafka的链接地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.100:9092, 192.168.137.100:9093");
//设置每次接收Message的数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
//max.poll.interval.ms参数用于指定consumer两次poll的最大时间间隔(默认5分钟),
//如果超过了该间隔consumer client会主动向coordinator发起LeaveGroup请求,触发rebalance;
//然后consumer重新发送JoinGroup请求
//这里解释下,当两次poll的时间间隔超过MAX_POLL_INTERVAL_MS_CONFIG时,
//kafka服务认为该consume已经脱离了consumer group,会重新reblance 其它所有的consumer,
//让他们重新获取对应的topic内的分区。然后在该consumer处理完数据后又会重新加入刚才的Group consumer
//还有就是这批次的消息可能会被重复消费,因为迟迟没有commit,且在reblance后,其它的consumer会重复收到该数据
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 180000);
//自动提交的时间间隔,这个参数与enable.auto.commit 密切相关
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
//session.timeout.ms 消费组存活的时间--可以不设置
//props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
//消费者发送心跳的时间间隔,与上面的紧密相连
//props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 120000);
//consumer请求超时时间
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
Batch Process On Producer
KafkaTemplate没有提供批量发送的方法。事实上,Kafka的java producer自己也没有提供批量发送。不过由于消息发送是异步而且本身在内存中已经做了批量化处理,因此我们通常不需要关心发送时是否是批量的。
至于高效的发送消息,Kafka producer提供了一些参数帮助你调优它的性能。常见的参数包括但不限于:batch.size, linger.ms, compression.type, buffer.memory等
所以如何批量发送消息,spring的client会根据我们的参数配置来决定,不需要我们在代码中去体现批量发送了cuiyaonan2000@163.com
- batch.size:通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送-----针对的是某一个分区cuiyaonan2000@163.com
- linger.ms:这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms(就是有消息就立即发送)
- max.request.size:默认是1M,请求的最大字节数
当这batch.size和linger.ms 同时设置的时候,只要两个条件中满足一个就会发送。比如说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。
max.request.size:默认是1M,请求的最大字节数
spring:
kafka:
#kafka的访问地址,多个用","隔开 这个是生产者和消费者一起设置的kafka集群地址
#bootstrap-servers: 192.168.5.10:9092
listener:
#指定listener 容器中的线程数,用于提高并发量
#对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数
#concurrency * @KafkaListener的数量(默认监听全部的partition)
#当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
#当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
#当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
#集群情况下,一个jvm多个consumer都算在一个group中,不会出现重复消费的情况。
concurrency: 3
#说明:使用批量消费需要将listener的type设置为batch,该值默认为single
#在这里是全局设置,所以在这里设置成batch好,单条消费就被影响了cuiyaonan2000@163.com
type: single
producer:
bootstrap-servers: 192.168.137.100:9092, 192.168.137.100:9093
#生产者重试次数
retries: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 每次批量发送消息的数量
batch-size: 1000
buffer-memory: 33554432