写点什么

Kafka 的生产者原理及重要参数说明,Java 码农是如何进入腾讯的

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:3586 字

    阅读完需:约 12 分钟

然后 controller 也会监听 ZooKeeper 集群的变化,在集群产生变化时更改自己的元数据信息。并且 follower 也会去它们的老大 controller 那里去同步元数据信息,所以一个 Kafka 集群中所有服务器上的元数据信息都是一致的。



上述准备完成后,我们正式开始我们生产者的内容。


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


名词 1——ProducerRecord


=======================================================================================


生产者需要往集群发送消息前,要先把每一条消息封装成 ProducerRecord 对象,这是生产者内部完成的。之后会经历一个序列化的过程。之前好几篇专栏也是有提到过了,需要经过网络传输的数据都是二进制的一些字节数据,需要进行序列化才能传输。


此时就会有一个问题,我们需要把消息发送到一个 Topic 下的一个 leader partition 中,可是生产者是怎样 get 到这个 topic 下哪个分区才是 leader partition 呢?


可能有些小伙伴忘了,提醒一下,controller 可以视作为 broker 的领导,负责管理集群的元数据,而 leader partition 是做负载均衡用的,它们会分布式地存储在不同的服务器上面。集群中生产数据也好,消费数据也好,都是针对 leader partition 而操作的。


名词 2——partitioner


====================================================================================


怎么知道哪个才是 leader partition,只需要获取到元数据不就好了嘛。


说来要怎么获取元数据也不难,只要随便找到集群下某一台服务器就可以了(因为集群中的每一台服务器元数据都是一样的)。



名词 3——缓冲区


============================================================================


此时生产者不着急把消息发送出去,而是先放到一个缓冲区。


名词 4——Sender


===============================================================================


把消息放进缓冲区之后,与此同时会有一个独立线程 Sender 去把消息分批次包装成一个个 Batch,不难想到如果 Kafka 真的是一条消息一条消息地传输,一条消息就是一个网络连接,那性能就会被拉得很差。为了提升吞吐量,所以采取了分批次的做法。


整好一个个 batch 之后,就开始发送给对应的主机上面。此时经过第一篇所提到的 Kakfa 的网络设计中的模型,然后再写到 os cache,再写到磁盘上面。



下图是当时我们已经说明过的 Kafka 网络设计模型。



生产者代码


=========================================================================


设置参数部分


// 创建配置文件对象


Properties props = new Properties();


// 这个参数目的是为了获取 kafka 集群的元数据


// 写一台主机也行,多个更加保险


// 这里使用的是主机名,要根据 server.properties 来决定


// 使用主机名的情况需要配置电脑的 hosts 文件(重点)


props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");


// 这个就是负责把发送的 key 从字符串序列化为字节数组


// 我们可以给每个消息设置 key,作用之后再阐述


props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");


// 这个就是负责把你发送的实际的 message 从字符串序列化为字节数组


props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


// 以下属于调优,之后再解释


props.put("acks", "-1");


props.put("retries", 3);


props.put("batch.size", 323840);


props.put("linger.ms", 10);


props.put("buffer.memory", 33554432);


props.put("max.block.ms", 3000);


创建生产者实例


// 创建一个 Producer 实例:线程资源,跟各个 broker 建立 socket 连接资源


KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);


创建消息


ProducerRecord<String, String> record = new ProducerRecord<>(


"test-topic", "test-value");


当然你也可以指定一个 key,作用之后会说明:


ProducerRecord<String, String> record = new ProducerRecord<>(


"test-topic", "test-key", "test-value")


发送消息


带有一个回调函数,如果没有异常就返回消息发送成功。


// 这是异步发送的模式


producer.send(record, new Callback() {


@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null) {


// 消息发送成功


System.out.println("消息发送成功");


} else {


// 消息发送失败,需要重新发送


}}});Thread.sleep(10 * 1000);


// 这是同步发送的模式(是一般不会使用的,性能很差,测试可以使用)


// 你要一直等待人家后续一系列的步骤都做完,发送消息之后


// 有了消息的回应返回给你,你这个方法才会退出来


producer.send(record).get();


关闭连接


producer.close();


干货时间:调优部分的代码


================================================================================


区分是不是一个勤于思考的打字员的部分其实就是在 1 那里还没有讲到的那部分调优,一个个拿出来单独解释,就是下面这一大串。


props.put("acks", "-1");


props.put("retries", 3);


props.put("batch.size", 32384);


props.put("linger.ms", 100);


props.put("buffer.memory", 33554432);


props.put("max.block.ms", 3000);


acks 消息验证


=============================================================================


props.put("acks", "-1");



这个 acks 参数有 3 个值,分别是-1,0,1,设置这 3 个不同的值会成为 kafka 判断消息发送是否成功的依据。Kafka 里面的分区是有副本的,如果 acks 为-1.则说明消息在写入一个分区的 leader partition 后,这些消息还需要被另外所有这个分区的副本同步完成后,才算发送成功(对应代码就是输出 System.out.println(“消息发送成功”)),此时发送数据的性能降低。


如果设置 acks 为 1,需要发送的消息只要写入了 leader partition,即算发送成功,但是这个方式存在丢失数据的风险,比如在消息刚好发送成功给 leader partition 之后,这个 leader partition 立刻宕机了,此时剩余的 follower 无论选举谁成为 leader,都不存在刚刚发送的那一条消息。


如果设置 acks 为 0,消息只要是发送出去了,就默认发送成功了。啥都不管了。


retries 重试次数(重要)


====================================================================================


这个参数还是非常重要的,在生产环境中是必须设置的参数,为设置消息重发的次数。


props.put("retries", 3);


在 Kafka 中可能会遇到各种各样的异常(可以直接跳到下方的补充异常类型),但是无论是遇到哪种异常,消息发送此时都出现了问题,特别是网络突然出现问题,但是集群不可能每次出现异常都抛出,可能在下一秒网络就恢复了呢,所以我们要设置重试机制。


这里补充一句:设置了 retries 之后,集群中 95%的异常都会自己乘风飞去,我真没开玩笑!


代码中我配置了 3 次,其实设置 5~10 次都是合理的,补充说明一个,如果我们需要设置隔多久重试一次,也有参数,没记错的话是 retry.backoff.ms,下面我设置了 100 毫秒重试一次,也就是 0.1 秒。


props.put("retry.backoff.ms",100);


batch.size 批次大小


===================================================================================


批次的大小默认是 16K,这里设置了 32K,设置大一点可以稍微提高一下吞吐量,设置这个批次的大小还和消息的大小有关,假设一条消息的大小为 16K,一个批次也是 16K,这样的话批次就失去意义了。所以我们要事先估算一下集群中消息的大小,正常来说都会设置几倍的大小。


props.put("batch.size", 32384);


linger.ms 发送时间限制


====================================================================================


比如我现在设置了批次大小为 32K,而一条消息是 2K,此时已经有了 3 条消息发送过来,总大小为 6K,而生产者这边就没有消息过来了,那在没够 32K 的情况下就不发送过去集群了吗?显然不是,linger.ms 就是设置了固定多长时间,就算没塞满 Batch,也会发送,下面我设置了 100 毫秒,所以就算我的 Batch 迟迟没有满 32K,100 毫秒过后都会向集群发送 Batch。


props.put("linger.ms", 100);


buffer.memory 缓冲区大小


=======================================================================================


当我们的 Sender 线程处理非常缓慢,而生产数据的速度很快时,我们中间的缓冲区如果容量不够,生产者就无法再继续生产数据了,所以我们有必要把缓冲区的内存调大一点,缓冲区默认大小为 32M,其实基本也是合理的。


props.put("buffer.memory", 33554432);


那应该如何去验证我们这时候应该调整缓冲区的大小了呢,我们可以用一般 Java 计算结束时间减去开始时间的方式测试,当结束时间减去开始时间大于 100ms,我们认为此时 Sender 线程处理速度慢,需要调大缓冲区大小。


当然一般情况下我们是不需要去设置这个参数的,32M 在普遍情况下已经足以应付了。

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Kafka的生产者原理及重要参数说明,Java码农是如何进入腾讯的