写点什么

Kafka 消费组核心 API 与核心参数运行机制剖析,java 银行面试题目及答案

用户头像
极客good
关注
发布于: 刚刚

[](

)1.3 核心工作参数


  • max.poll.records


每一次 poll 方法调用拉取的最大消息条数,默认为 500。


  • max.poll.interval.ms


两次 poll 方法调用的最大间隔时间,单位毫秒,默认为 5 分钟。如果消费端在该间隔内没有发起 poll 操作,该消费者将被剔除,触发重平衡,将该消费者分配的队列分配给其他消费者


  • session.timeout.ms


消费者与 broker 的心跳超时时间,默认 10s,broker 在指定时间内没有收到心跳请求,broker 端将会将该消费者移出,并触发重平衡


  • heartbeat.interval.ms


心跳间隔时间,消费者会以该频率向 broker 发送心跳,默认为 3s,主要是确保 session 不会失效。


  • fetch.min.bytes


一次拉取消息最小返回的字节数量,默认为 1 字节。


  • fetch.max.bytes


一次拉取消息最大返回的字节数量,默认为 1M,如果一个分区的第一批消息大小大于该值也会返回。


  • max.partition.fetch.bytes


一次拉取每一个分区最大拉取字节数,默认为 1M。


  • fetch.max.wait.ms


fetch 等待拉取数据符合 fetch.min.bytes 的最大等待时间。


  • metadata.max.age.ms


元数据在客户端的过期时间,过期后客户端会向 broker 重新拉取最新的元数据,默认为 5 分钟。


  • internal.leave.group.on.close


消费者关闭后是否立即离开订阅组,默认为 true,即当客户端断开后立即触发重平衡。如果设置为 false,则不会立即触发重平衡,而是要等 session 过期后才会触发。


[](


)2、KafkaConsumer 核心组件与 API




通过 KafkaConsumer 核心参数,我们基本可以窥探 Kafka 中的核心要点,接下来再介绍一下 KafkaConsumer 的核心组件,为后续深入研究 Kafka 消费者消费模型打下基础。

[](

)2.1 核心组件



KafkaConsumer 由如下几个核心组件构成:


  • ConsumerNetworkClient


消费端网络客户端,服务底层网络通讯,负责客户端与服务端的 RPC 通信。


  • ConsumerCoordinator


消费端协调器,在 Kafka 的设计中,每一个消费组在集群中会选举一个 broker 节点成为该消费组的协调器,负责消费组状态的状态管理,尤其是消费组重平衡(消费者的加入与退出),该类就是消费者与 broker 协调器进行交互。


  • Fetcher


消息拉取。


温馨提示:本文不打算对每一个组件进行详细解读,这里建议大家按照本文第一部分关于各个参数的含义,然后对照这些参数最终是传 resume 递给哪些组件,进行一个关联思考。

[](

)2.2 核心 API 概述


最后我们再来看一下消费者的核心 API。



  • Set< TopicPartition> assignment()


获取该消费者的队列分配列表。


  • Set< String> subscription()


获取该消费者的订阅信息。


  • void subscribe(Collection< String> topics)


订阅主题。


  • void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)


订阅主题,并指定队列重平衡的监听器。


  • void assign(Collection< TopicPartition> partitions)


取代 subscription,手动指定消费哪些队列。


  • void unsubscribe()


取消订阅关系。


  • ConsumerRecords<K, V> poll(Duration timeout)


拉取消息,是 KafkaConsumer 的核心方法,将在下文详细介绍。


  • void commitSync()


同步提交消费进度,为本批次的消费提交,将在后续文章中详细介绍。


  • void commitSync(Duration timeout)


同步提交消费进度,可设置超时时间。


  • void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)


显示同步提交消费进度, offsets 指明需要提交消费进度的信息。


  • void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout)


显示同步提交消费进度,带超时间。


  • void seek(TopicPartition partition, long offset)


重置 consumer#poll 方法下一次拉消息的偏移量。


  • void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)


seek 方法重载方法。


  • void seekToBeginning(Collection< TopicPartition> partitions)


将 poll 方法下一次的拉取偏移量设置为队列的初始偏移量。


  • void seekToEnd(Collection< TopicPartition> partitions)


将 poll 方法下一次的拉取偏移量设置为队列的最大偏移量。


  • long position(TopicPartition partition)


获取将被拉取的偏移量。


  • long position(TopicPartition partition, final Duration timeout)


同上。


  • OffsetAndMetadata committed(TopicPartition partition)


获取指定分区已提交的偏移量。


  • OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)


同上


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码



  • Map<MetricName, ? extends Metric> metrics()


统计指标。


  • List< PartitionInfo> partitionsFor(String topic)


获取主题的路由信息。


  • List< PartitionInfo> partitionsFor(String topic, Duration timeout)


同上。


  • Map<String, List< PartitionInfo>> listTopics()


获取所有 topic 的路由信息。


  • Map<String, List< PartitionInfo>> listTopics(Duration timeout)


同上。


  • Set< TopicPartition> paused()


获取已挂起的分区信息。


  • void pause(Collection< TopicPartition> partitions)


挂起分区,下一次 poll 方法将不会返回这些分区的消息。


  • void resume(Collection< TopicPartition> partitions)


恢复挂起的分区。


  • Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)


根据时间戳查找最近的一条消息的偏移量。


  • Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout)


同上。


  • Map<TopicPartition, Long> beginningOffsets(Collection< TopicPartition> partitions)


查询指定分区当前最小的偏移量。


  • Map<TopicPartition, Long> beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)


同上。


  • Map<TopicPartition, Long> endOffsets(Collection< TopicPartition> partitions)


查询指定分区当前最大的偏移量。


  • Map<TopicPartition, Long> endOffsets(Collection< TopicPartition> partitions, Duration timeout)


同上。


  • void close()


关闭消费者。


  • void close(Duration timeout)


关闭消费者。


  • void wakeup()


唤醒消费者。


Kafka 提供的消费者并不像 RocketMQ 提供了 Push 模式,自动封装了消息队列负载、消息拉取、线程池消费、位点提交,而是提供了基础 API,需要应用程序自动组织这些 API。


值得注意的 kafka 消费者也支持位点自动提交机制,kafka 的消费者**(KafkaConsumer)对象是线程不安全的**。


基于 KafkaConsumer 的 pause(暂停某些分区的消费)与 resume(恢复某些分区的消费),可以轻松实现消费端限流机制。

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
Kafka消费组核心API与核心参数运行机制剖析,java银行面试题目及答案