kafka2.10-0.10.2.1
kafak–producer生产消息中学习了Producer发送消息,基本流程为5步 第一步:初始化kafka参数 第二步:创建生产者 第三步:创建一条消息 第四步:发送消息 第五步:异步发送中处理消息发送是否成功结果
下面使用一个流程图来表示发送消息到 Kafka 集群的基本步骤:
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future doSend(ProducerRecord record, Callback callback) {
TopicPartition tp = null;
try {
//序列化key, value
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer");
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer");
}
//获取partition
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
ensureValidRecordSize(serializedSize);
//TopicPartition 包含 topic name and partition number
tp = new TopicPartition(record.topic(), partition);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
//调用回调函数
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback(callback, this.interceptors, tp);
//追加消息
//向累加器添加一条记录,返回附加结果
//附加结果将包含将来的元数据,以及是否附加的批处理已满或新的批创建的标志。
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
//返回响应结果
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
/**
* A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and
* notifies producer interceptors about the request completion.
*/
private static class InterceptorCallback implements Callback {
private final Callback userCallback;
private final ProducerInterceptors interceptors;
private final TopicPartition tp;
public InterceptorCallback(Callback userCallback, ProducerInterceptors interceptors,
TopicPartition tp) {
this.userCallback = userCallback;
this.interceptors = interceptors;
this.tp = tp;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (this.interceptors != null) {
if (metadata == null) {
this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
exception);
} else {
this.interceptors.onAcknowledgement(metadata, exception);
}
}
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
}
}