您当前的位置: 首页 >  kafka

cuiyaonan2000

暂无认证

  • 0浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Kafka batch processing

cuiyaonan2000 发布时间:2021-07-05 15:02:58 ,浏览量:0

序言

批量处理的添加域参数设置,也算是kafka调优的一种切入点cuiyaonan2000@163.com

  1. 添加批处理来提升任务处理效率。
  2. 具体针对批量处理的一些参数设置,来进一步优化处理任务的效率。(kafka的服务的参数配置种类很多,有很多参数配置的技术点还是挺深的,但是我们可以根据批处理的参数设置为切入点,熟悉了解全面的参数设置cuiyaonan2000@163.com)
Batch Processing On Consumer 批处理的参数设置
spring:
  kafka:
    #kafka的访问地址,多个用","隔开  这个是生产者和消费者一起设置的kafka集群地址
    #bootstrap-servers: 192.168.5.10:9092 
    listener:
       #指定listener 容器中的线程数,用于提高并发量
       #对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数
       #concurrency * @KafkaListener的数量(默认监听全部的partition)
       #当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
       #当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
       #当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
       #集群情况下,一个jvm多个consumer都算在一个group中,不会出现重复消费的情况。
       concurrency: 3
       #说明:使用批量消费需要将listener的type设置为batch,该值默认为single
       #在这里是全局设置,所以在这里设置成batch好,单条消费就被影响了cuiyaonan2000@163.com
       type: single
    consumer:
      #关闭自动提交 改由spring-kafka提交
      #kafka.consumer.enable.auto.commit=false 意思是让spring自动去提交offset
      #kafka.consumer.enable.auto.commit=true 意识是让kafka去自动提交offset
      enable-auto-commit: false
      #群组ID #默认组id  后面会配置多个消费者组
      group-id: kafka
#earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
#latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
#none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      #批量消费 一次接收的最大数量
      max-poll-records: 20      
      bootstrap-servers: 192.168.137.100:9092, 192.168.137.100:9093
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
手动创建批处理

package cui.yao.nan.highlevetest.kafka.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.stereotype.Component;

/**
 * @Title: KafkaConsumer.java
 * @Package nan.yao.cui.kafka.consumer
 * @Description: TODO
 * @author cuiyaonan2000@163.com
 * @date 2019年8月27日 下午1:42:56
 * @version V1.0
 */
@Component
public class KafkaConsumer {

    @KafkaListener(topics = { "singleMessage" })
    public void listen(String message) {
	System.out.println("topic:singleMessage收到的信息是:" + message);
    }

    @KafkaListener(topics = { "batchMessage" },containerFactory = "myBatchFactory")
    public void listenMore(List message) {
	System.out.println("topic:batchMessage收到的信息个数:" + message.size());
    }
    
    @Bean(name="myBatchFactory")
    KafkaListenerContainerFactory batchFactory() {
      ConcurrentKafkaListenerContainerFactory factory = new 
            ConcurrentKafkaListenerContainerFactory();
      factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerConfigs()));
      factory.setBatchListener(true); // 开启批量监听
      factory.setConcurrency(2);//设置监听线程数
      
      return factory;
    }

    /**
     * @author 崔耀男
     * @des: TODO
     * @date 2021年7月1日 下午5:35:14
     * @params: @return
     * @return: Map
     */
    @Bean
    public Map consumerConfigs() {
      Map props = new HashMap();
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "IAmGroupId");
      
      //配置文件中有针对该类型的说明,对应auto_offset_reset
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      
      //这里就是kafka的链接地址
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.137.100:9092, 192.168.137.100:9093");
      
      //设置每次接收Message的数量
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); 
      //max.poll.interval.ms参数用于指定consumer两次poll的最大时间间隔(默认5分钟),
      //如果超过了该间隔consumer client会主动向coordinator发起LeaveGroup请求,触发rebalance;
      //然后consumer重新发送JoinGroup请求
      
      //这里解释下,当两次poll的时间间隔超过MAX_POLL_INTERVAL_MS_CONFIG时,
      //kafka服务认为该consume已经脱离了consumer group,会重新reblance 其它所有的consumer,
      //让他们重新获取对应的topic内的分区。然后在该consumer处理完数据后又会重新加入刚才的Group consumer
      //还有就是这批次的消息可能会被重复消费,因为迟迟没有commit,且在reblance后,其它的consumer会重复收到该数据
      props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 180000);

      
      //自动提交的时间间隔,这个参数与enable.auto.commit 密切相关
      props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
      
      //session.timeout.ms 消费组存活的时间--可以不设置
      //props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
      //消费者发送心跳的时间间隔,与上面的紧密相连
      //props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 120000);
      //consumer请求超时时间
      props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);

      
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      return props;
    }


}

Batch Process On Producer

KafkaTemplate没有提供批量发送的方法。事实上,Kafka的java producer自己也没有提供批量发送。不过由于消息发送是异步而且本身在内存中已经做了批量化处理,因此我们通常不需要关心发送时是否是批量的。

至于高效的发送消息,Kafka producer提供了一些参数帮助你调优它的性能。常见的参数包括但不限于:batch.size, linger.ms, compression.type, buffer.memory等

所以如何批量发送消息,spring的client会根据我们的参数配置来决定,不需要我们在代码中去体现批量发送了cuiyaonan2000@163.com

  1. batch.size:通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送-----针对的是某一个分区cuiyaonan2000@163.com
  2. linger.ms:这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms(就是有消息就立即发送)
  3. max.request.size:默认是1M,请求的最大字节数

      当这batch.size和linger.ms 同时设置的时候,只要两个条件中满足一个就会发送。比如说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。

max.request.size:默认是1M,请求的最大字节数  

spring:
  kafka:
    #kafka的访问地址,多个用","隔开  这个是生产者和消费者一起设置的kafka集群地址
    #bootstrap-servers: 192.168.5.10:9092 
    listener:
       #指定listener 容器中的线程数,用于提高并发量
       #对于spring.kafka.listener.concurrency=3这个参数来说,它设置的是每个@KafkaListener的并发个数
       #concurrency * @KafkaListener的数量(默认监听全部的partition)
       #当concurrency < partition 的数量,会出现消费不均的情况,一个消费者的线程可能消费多个partition 的数据
       #当concurrency = partition 的数量,最佳状态,一个消费者的线程消费一个 partition 的数据
       #当concurrency > partition 的数量,会出现有的消费者的线程没有可消费的partition, 造成资源的浪费
       #集群情况下,一个jvm多个consumer都算在一个group中,不会出现重复消费的情况。
       concurrency: 3
       #说明:使用批量消费需要将listener的type设置为batch,该值默认为single
       #在这里是全局设置,所以在这里设置成batch好,单条消费就被影响了cuiyaonan2000@163.com
       type: single
    producer:
      bootstrap-servers: 192.168.137.100:9092, 192.168.137.100:9093
      #生产者重试次数
      retries: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 每次批量发送消息的数量
      batch-size: 1000 
      buffer-memory: 33554432

 

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0384s