写点什么

深入学习 Kafka 数据消费大致流程(如何创建并使用 Kafka 消费者)

  • 2021 年 11 月 12 日
  • 本文字数:4232 字

    阅读完需:约 14 分钟

props.put("group.id", groupId);// 指定 KafkaConsumer 对应的客户端 ID,默认为空,如果不设置 KafkaConsumer 会自动生成一个非空字符串 props.put("client.id", "consumer.client.id.demo");return props;}

2.订阅主题和分区

创建完消费者后我们便可以订阅主题了,只需要通过调用 subscribe()方法即可,这个方法接收一个主题列表


KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));


另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接 Kafka 与其他系统时非常有用。比如订阅所有的测试主题:


consumer.subscribe(Pattern.compile("heima*"));


指定订阅的分区


// 指定订阅的分区 consumer.assign(Arrays.asList(new TopicPartition("topic0701", 0)));

3.反序列化

// 与 KafkaProducer 中设置保持一致 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

4.位移提交

对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中的位置。


当我们调用 poll()时,该方法会返回我们没有消费的消息。当消息从 broker 返回消费者时,broker 并不跟踪这些消息是否被消费者接收到;Kafka 让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。


重复消费



消息丢失



自动提交


这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为 true,那么消费者会在 poll 方法调用后每隔 5 秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由 poll()方法来驱动的;在调用 poll()时,消费者判断是否到达提交时间,如果是则提交上一次 poll 返回的最大位移。


需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者 poll 消息后,应用正在处理消息,在 3 秒后 Kafka 进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。


同步提交


见代码库:com.heima.kafka.chapter3.CheckOffsetAndCommit


public static Properties initConfig() {Properties props = new Properties();props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");


// 手动提交开启 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);return props;}


while (true) {ConsumerRecords<String, String> records = consumer.poll(1000);if (records.isEmpty()) {break;}List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();consumer.commitSync();//同步提交消费位移}


异步提交


手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的 API。


见代码:com.heima.kafka.chapter3.OffsetCommitAsyncCallback


但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交 commitA,此时的提交位移为 2000,随后又发起了一个异步提交 commitB 且位移为 3000;commitA 提交失败但 commitB 提交成功,此时 commitA 进行重试并成功的话,会将实际上将已经提交的位移从 3000 回滚到 2000,导致消息重复消费。


异步回调


try {while (running.get()) {ConsumerRecords<String, String> records = consumer.poll(1000);for (ConsumerRecord<String, String> record : records) {//do some logical processing.}// 异步回调 consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,Exception exception) {if (exception == null) {System.out.println(offsets);} else {log.error("fail to commit offsets {}", offsets, exception);}}});}} finally {consumer.close();}

5.指定位移消费

到目前为止,我们知道消息的拉取是根据 poll()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。


seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费。


见代码库:com.heima.kafka.chapter3.SeekDemo


/**


  • 指定位移消费*/public class SeekDemo extends ConsumerClientConfig {public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));// timeout 参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待 consumer.poll(Duration.ofMillis(2000));// 获取消费者所分配到的分区 Set<TopicPartition> assignment = consumer.assignment();System.out.println(assignment);for (TopicPartition tp : assignment) {// 参数 partition 表示分区,offset 表示指定从分区的哪个位置开始消费 consumer.seek(tp, 10);}consumer.seek(new TopicPartition(topic,0),10);while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}}


增加判断是否分配到了分区,见代码库:com.heima.kafka.chapter3.SeekDemoAssignment


public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));long start = System.currentTimeMillis();Set<TopicPartition> assignment = new HashSet<>();while (assignment.size() == 0) {consumer.poll(Duration.ofMillis(100));assignment = consumer.assignment();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, 10);}while (tru


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


e) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));//consume the record.for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());}}}


  • 指定从分区末尾开始消费 ,见代码库:com.heima.kafka.chapter3.SeekToEnd


// 指定从分区末尾开始消费 Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);for (TopicPartition tp : assignment) {consumer.seek(tp, offsets.get(tp));}


  • 演示位移越界操作,修改代码如下:


for (TopicPartition tp : assignment) {//consumer.seek(tp, offsets.get(tp));consumer.seek(tp, offsets.get(tp) + 1);}


会通过 auto.offset.reset 参数的默认值将位置重置,效果如下:


INFO [Consumer clientId=consumer-1, groupId=group.heima] Fetch offset 1 is out of range for partition heima-0, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:967)INFO [Consumer clientId=consumer-1, groupId=group.heima] Fetch offset 10 is out of range for partition heima-1, resetting offset (org.apache.kafka.clients.consumer.internals.Fetcher:967)INFO [Consumer clientId=consumer-1, groupId=group.heima] Resetting offset for partition heima-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher:583)INFO [Consumer clientId=consumer-1, groupId=group.heima] Resetting offset for partition heima-1 to offset 9. (org.apache.kafka.clients.consumer.internals.Fetcher:583)

6.再均衡监听器

再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。


见代码库:com.heima.kafka.chapter3.CommitSyncInRebalance


public static void main(String[] args) {Properties props = initConfig();KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {


@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 劲量避免重复消费 consumer.commitSync(currentOffsets);}


@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {//do nothing.}});


try {while (isRunning.get()) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.offset() + ":" + record.value());// 异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的 onPartitionsRevoked 回调执行 commitSync 方法同步提交位移。currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1));}consumer.commitAsync(currentOffsets, null);}} finally {consumer.close();

评论

发布
暂无评论
深入学习Kafka数据消费大致流程(如何创建并使用Kafka消费者)