一、生产者数据发送整体流程
kafka 生产者客户端核心的数据发送流程主要为三个部分:
主线程调用 KafkaProducer 发送数据,数据不是直接发送给 kafka broker 服务端,而是先缓冲起来。
有一个单独的线程(sender)专门负责将缓冲数据发往 kafka broker 服务端。
缓冲的目的是:为避免高并发请求造成的服务端压力,所以数据不是一条一条发给服务端,而是缓冲后批量发送。
单独线程负责数据发送的目的是:避免造成主线程发送数据时阻塞,造成核心业务响应延时。
我们来查一下 KafkaProducer,java 源码,其核心构造方法为:
KafkaProducer(Map<String, Object> configs,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
//1.记录累加器
this.accumulator = new RecordAccumulator(……)
//2. 数据发送线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
}
复制代码
我们只抽取了这个构造函数中的核心代码
记录累加器 RecordAccumulator,也就是生产者 KafkaProducer 生产的数据不是直接发送给 kafka broker。而是批量累加先放入 RecordAccumulator,然后分批次发送给 kafka broker。
数据发送有一个单独的线程来完成,这个线程为 sender,一个 KafkaProducer 对象对应一个 Sender 线程。
从上面的源码可以部分验证我们对生产者数据整体流程的概括,后文会继续进行解析说明。
二、ProducerRecord 与 ProducerBatch 与 RecordAccumulator
上文我们提到了数据累加器,也就是数据缓冲区。下面我们就来深入学习一下数据缓冲区的构造,了解数据缓冲区的构造对于理解 kafka 整个架构都会有很大的帮助。下文是 RecordAccumulator 类定义的一个成员变量 batches。
public final class RecordAccumulator {
……
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
……
}
复制代码
从这个定义中我们可以看到
下面我们再看看 RecordAccumulator 类的 append 方法的片段 ,该方法用于向缓冲区中追加数据。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
……
try {
//创建或获取已有的缓冲区(双端队列)
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
//synchronized同步队列,避免用户编程异步操作导致数据发送数据顺序错乱的问题
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//tryAppend方法将一条消息数据的时间戳、key、value、
//header等信息追加到缓冲区中(Deque<ProducerBatch> )
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
复制代码
最后我们再看一下一个生产者生产的一条消息数据包含哪些方面的信息?
public class ProducerRecord<K, V> {
private final String topic; //消息属于哪个的主题
private final Integer partition; //消息属于哪个分区
private final Headers headers; //消息的headers,可以理解为附加信息
private final K key; //消息的key
private final V value; //消息的数据值
private final Long timestamp; //消息时间戳
}
复制代码
看完上面的源码,我们再看下面的这张图
一个缓存区 RecordAccumulator,包含若干 Deque 双端队列
针对一个主题的一个分区,在 kafka 生产者客户端维护一个 Deque 双端队列
每个队列 Deque 里面放入若干个批次 ProducerBatch 的数据
每一个批次 ProducerBatch 包含若干条数据记录 ProducerRecord
具有相同的 key 的数据会被发往主题的同一个分区。
由此可以看出 kafka 生产者在数据发送给服务端之前,就已将数据分类、分批次的缓冲好,然后由单独线程将数据异步发送到 kafka 服务端,从而提升数据的发送效率。这就好像快递公司在快件配送之前,就将邮件按照区域、小区等信息分配好了。对于 kafka 的生产者而言,消息 value 是按照 topic、partition、key 进行分类的,按照 timestamp 的顺序进行投递。
三、定时发送与定量发送
从上文的介绍我们可以知道,kafka 生产者的数据先放入缓冲区,然后由单独的线程 sender 负责发往 kafka 服务端。这就涉及到一个问题:什么条件可以触发一次缓冲数据的批量发送?
所以 kafka 生产者采用的数据批量发送的方式是:定时或者定量,满足其中一个标准数据就会被发往服务端。
需要注意的是:linger.ms
的默认值是 0,也就是有数据就发送。有的朋友可能会提出疑问:有数据就发,数据是一条一条放入缓冲区的,那不就是一条一条发送么?所以 kafka 的批量数据发送机制就失效了啊。其实不是的,我们要考虑发送线程 sender 是单线程,生产者有很多缓冲队列 Deque,所以缓冲区里面的数据需要等待 sender 线程空闲后才能被发送。linger.ms
的默认值是 0 的灵活性就在这,数据量的时候 sender 线程忙,缓冲区机制可以保证吞吐量;数据量小的时候 sender 线程闲处理速度快,可以保证消息的低延时。
最后为大家介绍的参数buffer.memory
,他配置的是整个生产者缓冲区的大小,默认值 32MB。
这个buffer.memory
参数非常重要,特别是当你的 kafka 集群主题与分区非常多的时候,对应的生产者分区缓冲队列也就非常多。如果该值设置的比较小,消息数据产生的速率又比较快,就会导致缓冲区被填满,一旦填满就会发生线程阻塞,影响效率。
四、生产者数据发送流程环节
生产者数据发送的流程环节如下图所示,其中拦截器、序列化器、分区器是可以自定义的。
我们结合源码来理解上面的图示。
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
//调用拦截器对record进行预处理
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
//真实进行消息的发送操作
return doSend(interceptedRecord, callback);
}
复制代码
在 doSend 方法中,又进行了如下的一系列步骤,参考源码中的中文注释信息
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//1、检测生产者是否已经关闭
throwIfProducerClosed();
//2、检查正要将数据发往的主题在kafka集群中的包含哪些分区
//获取集群中一些元数据信息
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
//为了避免阅读障碍,这里去掉了一些异常处理代码
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
//3、对消息的key进行序列化,序列化key的目的是进行网络传输
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
//为了避免阅读障碍,这里去掉了一些异常处理代码
}
//4、对消息的vlaue进行序列化
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
//为了避免阅读障碍,这里去掉了一些异常处理代码
}
//5、根据分区器决定此条数据发往哪个TopicPartition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
//6、预估发送消息的大小,内容包括key、value以及头部信息
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
//7、检查发送的消息大小没有超过设置标准
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
// 拦截器回调函数
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
//8、将消息添加到消息累加器(缓冲区)中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
//9、如果添加进的缓冲队列已经满了或者是首次创建的,那么唤醒sender线程进行数据发送
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
//10、返回异步对象
return result.future;
} catch (ApiException e) {
}
复制代码
当然上面所述的生产者数据发送流程中,我们的讲解内容省略了一些细节:比如消息发送失败之后的异常处理机制,重试机制等,会在后面介绍。
评论