作者:石臻臻,CSDN 博客之星 Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家、 KnowStreaming PMC)。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,带你一起你参与开源! 。
那在上一篇文章中我们了解了 消费者偏移量__consumer_offsets_,知道了 消费者在消费了消息之后会把消费的 offset 更新到以 名称为__consumer_offsets_的内置 Topic 中; 每个消费组都有维护一个当前消费组的 offset; 那么就会有以下疑问
到底消费组什么时候把 offset 更新到 broker 中的分区中呢? 每次消费一条消息就提交到 broker 中去更新?那这样是不是会有一些效率的一些问题?
既然有了疑问 ,那么我们本篇文章就来好好分析一下这个问题!
通过查询 kafka消费者配置中找到有以下几个配置
1 自动提交
消费者端开启了自动提交之后,每隔auto.commit.interval.ms自动提交一次;
public static void consumer(){ Properties props = new Properties(); props.put("bootstrap.servers", "xxx1:9092,xxx2:9092,xxx3:9092"); props.put("group.id", "szz-local-consumer"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "5000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("szz1-test-topic")); while (true) { Duration duration = Duration.ofSeconds(5); ConsumerRecords<String, String> records = consumer.poll(duration); for (ConsumerRecord<String, String> record : records){ System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
复制代码
假如 Consumer 在获取了消息消费成功但是在提交之前服务挂掉了
如果发生这种情况会有什么影响? 答: 重复消费消费者消费之后 offset 并没有及时更新过去,那么在下次启动或者同组内其他消费者去消费的时候 取到的数据就是之前的数据;那么就会出现 重复消费的情况;所以auto.commit.interval.ms到底设置成多少就很有考究了
2 手动提交
虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握 offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。
手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步 提交)。两者的相同点是,都会将本次poll 的一批数据最高的偏移量提交;不同点是, commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致, 也会出现提交失败);而commitAsync则没有失败重试机制,故有可能提交失败。
同步提交 offset
public static void consumerCommitSync(){ Properties props = new Properties(); props.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092"); props.put("group.id", "szz-local-consumer"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("szz1-test-topic")); while (true) { Duration duration = Duration.ofSeconds(2); ConsumerRecords<String, String> records = consumer.poll(duration); for (ConsumerRecord<String, String> record : records){ System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } //同步提交,当前线程会阻塞直到 offset 提交成功 consumer.commitSync(); } }
复制代码
异步提交
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。
public static void consumerCommitAsync(){ Properties props = new Properties(); props.put("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092"); props.put("group.id", "szz-local-consumer"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("szz1-test-topic")); while (true) { Duration duration = Duration.ofSeconds(2); ConsumerRecords<String, String> records = consumer.poll(duration); for (ConsumerRecord<String, String> record : records){ System.out.printf("------offset-- = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
//异步提交 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) { System.err.println("异常....."); } } });
} }
复制代码
数据漏消费和重复消费分析
无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据 的重复消费
3 参考资料
kafka文档: 密码:hirykafka消费者配置
评论