一、生产者命令行
#生产者
./bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092 --topic test
二、使用java生产
2.1、设置Producer的配置参数,3个必备的属性:bootstrap.servers、key.serializer、value.serializer。
来源: 实验楼 链接: https://www.shiyanlou.com/courses/859 本课程内容,由作者授权实验楼发布,未经允许,禁止转载、下载及非法传播
/**
* 初始化kafka参数配置
* @return
*/
public static Properties initConfig() {
Properties kafkaProps = new Properties();
//配置kafka集群地址
kafkaProps.put("bootstrap.servers", "master:9092, slave1:9092,slave2:9092");
//序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return kafkaProps;
}
2.2、创建Producer
//第二步:创建生成者
KafkaProducer producer = new KafkaProducer(kafkaProps);
2.3、创建一条信息
//第三步:创建一条消息
/**
* 使用ProducerRecord(String topic, String key, String value)构造函数创建消息对象
* 构造函数接受三个参数:
* topic--告诉kafkaProducer消息发送到哪个topic;
* key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配
* value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配
*/
ProducerRecord record =new ProducerRecord("test", "messageKey", "测试producer");
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
2.4、发送消息
try {
//第四步:发送消息
Future f = producer.send(record);
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
本实验的发送方式: Fire-and-forget(发送即忘记)–此方法用来发送消息到 broker,不关注消息是否成功到达。大部分情况下,消息会成功到达 broker,因为Kafka 是高可用的,并且 producer 会自动重试发送。但是,还是会有消息丢失的情况。
2.5、完整代码
package com.chb.producer;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MyFirstProducer {
/**
* 初始化kafka参数配置
* @return
*/
public static Properties initConfig() {
Properties kafkaProps = new Properties();
//配置kafka集群地址
kafkaProps.put("bootstrap.servers", "master:9092, slave1:9092,slave2:9092");
//序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return kafkaProps;
}
public static void main(String[] args) {
//第一步:初始化kafka参数
Properties kafkaProps = initConfig();
//第二步:创建生成者
KafkaProducer producer = new KafkaProducer(kafkaProps);
//第三步:创建一条消息
/**
* 使用ProducerRecord(String topic, String key, String value)构造函数创建消息对象
* 构造函数接受三个参数:
* topic--告诉kafkaProducer消息发送到哪个topic;
* key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配
* value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配
*/
ProducerRecord record =new ProducerRecord("test", "messageKey", "测试producer");
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
try {
//第四步:发送消息
Future f = producer.send(record);
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
三、同步发送消息
3.1、前三步和上面相同
3.2、同步发送消息
try {
//第四步:同步发送消息
Future future = producer.send(record);
RecordMetadata rm = future.get();
//producer的send方法返回Future对象,我们使用Future对象的get方法来实现同步发送消息。
//Future对象的get方法会产生阻塞,直到获取kafka集群的响应,响应结果分两种:
//1、响应中有异常:此时get方法会抛出异常,我们可以捕获此异常进行相应的业务处理
//2、响应中无异常:此时get方法会返回RecordMetadata对象,此对象包含了当前发送成功的消息在Topic中的offset、partition等信息
long offset = rm.offset();
int partition = rm.partition();
System.out.println("the message offset: " + offset + ", partition:" + partition);
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
本实验发送方式:Synchronous Send(同步发送)–发送一个消息,send() 方法返回一个 Future 对象,使用此对象的 get() 阻塞方法,可以根据 send() 方法是否执行成功来做出不同的业务处理。此方法关注消息是否成功到达,但是由于使用了同步发送,消息的发送速度会很低,即吞吐量降低
###3.3、完整代码
package com.chb.producer;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 同步
* @author 12285
*
*/
public class MySecondProducer {
/**
* 初始化kafka参数配置
* @return
*/
public static Properties initConfig() {
Properties kafkaProps = new Properties();
//配置kafka集群地址
kafkaProps.put("bootstrap.servers", "master:9092, slave1:9092,slave2:9092");
//序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return kafkaProps;
}
public static void main(String[] args) {
//第一步:初始化kafka参数
Properties kafkaProps = initConfig();
//第二步:创建生成者
KafkaProducer producer = new KafkaProducer(kafkaProps);
//第三步:创建一条消息
/**
* 使用ProducerRecord(String topic, String key, String value)构造函数创建消息对象
* 构造函数接受三个参数:
* topic--告诉kafkaProducer消息发送到哪个topic;
* key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配
* value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配
*/
ProducerRecord record =new ProducerRecord("test", "d", "tongbu");
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
try {
//第四步:同步发送消息
Future future = producer.send(record);
RecordMetadata rm = future.get();
//producer的send方法返回Future对象,我们使用Future对象的get方法来实现同步发送消息。
//Future对象的get方法会产生阻塞,直到获取kafka集群的响应,响应结果分两种:
//1、响应中有异常:此时get方法会抛出异常,我们可以捕获此异常进行相应的业务处理
//2、响应中无异常:此时get方法会返回RecordMetadata对象,此对象包含了当前发送成功的消息在Topic中的offset、partition等信息
long offset = rm.offset();
int partition = rm.partition();
System.out.println("the message offset: " + offset + ", partition:" + partition);
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
四、异步发送:
Asynchronous Send(异步发送)—以回调函数的形式调用 send() 方法,当收到 broker 的响应,会触发回调函数执行。此方法既关注消息是否成功到达,又提高了消息的发送速度
4.1、回调对象
package com.chb.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 回调对象
* @author 12285
*/
public class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
//在回调函数中,对RecordMetadata进行操作
long offset = metadata.offset();
int partition = metadata.partition();
System.out.println("回调函数中 the message offset: " + offset + ", partition:" + partition);
}
}
4.2、在send方法中调用回调对象
//第四步:异步发送消息
producer.send(record, new ProducerCallback());
完整代码
package com.chb.producer;
import java.lang.invoke.CallSite;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 异步
* @author 12285
*
*/
public class MyThirdProducer {
/**
* 初始化kafka参数配置
* @return
*/
public static Properties initConfig() {
Properties kafkaProps = new Properties();
//配置kafka集群地址
kafkaProps.put("bootstrap.servers", "master:9092, slave1:9092,slave2:9092");
//序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return kafkaProps;
}
public static void main(String[] args) {
//第一步:初始化kafka参数
Properties kafkaProps = initConfig();
//第二步:创建生成者
KafkaProducer producer = new KafkaProducer(kafkaProps);
//第三步:创建一条消息
/**
* 使用ProducerRecord(String topic, String key, String value)构造函数创建消息对象
* 构造函数接受三个参数:
* topic--告诉kafkaProducer消息发送到哪个topic;
* key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配
* value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配
*/
ProducerRecord record =new ProducerRecord("test", "key", "yibu");
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
try {
//第四步:异步发送消息
producer.send(record, new ProducerCallback());
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
五、多线程发送
5.1、第一、二步没有变化
/**
* 初始化kafka参数配置
* @return
*/
private Properties initConfig() {
Properties kafkaProps = new Properties();
//配置kafka集群地址
kafkaProps.put("bootstrap.servers", "master:9092, slave1:9092,slave2:9092");
//序列化器
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return kafkaProps;
}
/**
* 创建生产者
* @return
*/
private KafkaProducer createProducer() {
//第一步:初始化kafka参数
Properties kafkaProps = initConfig();
//第二步:创建生成者
KafkaProducer producer = new KafkaProducer(kafkaProps);
return producer;
}
5.2、在构造函数中创建producer
public MyMultiThreadProducer(String topic) {
this.topic = topic;
this.producer = createProducer();
}
5.3、发送消息
5.3.1、在线程体中,循环发送消息
/**
* 生产线程执行操作,循环产生消息
*/
@Override
public void run() {
System.out.println("开始发送消息");
//用于记录已经发送消息的条数, 同时作为消息的key
int messageNo = 0;
while(messageNo < messageNumToSend ) {
String key = messageNo + "";
//发送消息
sendMessage(key, "message_" + messageNo);
messageNo ++;
}
//刷新缓存,将消息发送到kafka集群
producer.flush();
}
注意: 发送消息后,需要刷新缓存,将消息push到broker集群中。
//刷新缓存,将消息发送到kafka集群
producer.flush();
5.3.2、异步发送
/**
* 生成消息, 发送到kafka集群中
* @param key
* @param message
*/
private void sendMessage(String key, String message) {
//第三步:创建一条消息
/**
* 使用ProducerRecord(String topic, String key, String value)构造函数创建消息对象
* 构造函数接受三个参数:
* topic--告诉kafkaProducer消息发送到哪个topic;
* key--告诉kafkaProducer,所发送消息的key值,注意:key值类型需与前面设置的key.serializer值匹配
* value--告诉kafkaProducer,所发送消息的value值,即消息内容。注意:value值类型需与前面设置的value.serializer值匹配
*/
ProducerRecord record =new ProducerRecord(topic, key, message);
//向kafka集群发送消息,除了消息值本身,还包括key信息,key信息用于消息在partition之间均匀分布。
try {
//第四步:异步发送消息
producer.send(record, new ProducerCallback());
//producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
启动主程序,创建两个线程
public static void main(String[] args) {
new MyMultiThreadProducer("test").start();
new MyMultiThreadProducer("test").start();
}
完整代码