您当前的位置: 首页 >  kafka

消息中间件系列教程(22) -Kafka- SpringBoot集成Kafka

杨林伟 发布时间:2019-12-16 16:59:28 ,浏览量:2

引言

代码已提交至Github,有兴趣的同学可以下载来看看:https://github.com/ylw-github/SpringBoot-Kafka-Demo

搭建教程上两篇博客又讲,可以参考:

  • 《消息中间件系列教程(20) -Kafka-集群搭建》
  • 《消息中间件系列教程(21) -Kafka- 集群搭建(自带Zookeeper)》

为了方便起见,建议参考: 《消息中间件系列教程(21) -Kafka- 集群搭建(自带Zookeeper)》

SpringBoot集成Kafka

1.新建Maven项目

2.添加Maven依赖:


	4.0.0
	ylw
	com.ylw.springboot.kafka
	0.0.1-SNAPSHOT
	
		org.springframework.boot
		spring-boot-starter-parent
		2.0.1.RELEASE
	
	
		
		
			org.springframework.kafka
			spring-kafka
		
		
		
			org.springframework.boot
			spring-boot-starter-web
		
	


3.添加application.yml

# kafka
spring:
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 192.168.162.131:9092,192.168.162.132:9092,192.168.162.133:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka2
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      # 服务器地址
      bootstrap-servers: 192.168.162.131:9092,192.168.162.132:9092,192.168.162.133:9092

4.Controller代码:

package com.ylw;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaController {

	/**
	 * 注入kafkaTemplate
	 */
	@Autowired
	private KafkaTemplate kafkaTemplate;

	/**
	 * 发送消息的方法
	 *
	 * @param key
	 *            推送数据的key
	 * @param data
	 *            推送数据的data
	 */
	private void send(String key, String data) {
		// topic 名称 key data 消息数据
		kafkaTemplate.send("my_test", key, data);

	}
	// test 主题 1 my_test 3

	@RequestMapping("/kafka")
	public String testKafka() {
		int iMax = 6;
		for (int i = 1; i             
关注
打赏
1688896170
查看更多评论
0.8142s