写点什么

消息队列:Kafka Consumer 源码解读

用户头像
正向成长
关注
发布于: 刚刚
消息队列:Kafka Consumer源码解读

Kafka的JavaDoc如何使用 Consumer 为入口,了解消费者消费过程以及相关接口的源码解读来了解其实现。


// 设置必要的配置信息 Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建Consumer实例 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅Topic consumer.subscribe(Arrays.asList("foo", "bar"));
// 循环拉消息 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }
复制代码

消息的消费过程主要是:

  1. 设置必要的配置信息

  2. 创建消费者实例并订阅主题

  3. 拉取消息


接下来对相关的主题进行源码的解读。

消费流程实现

消息消费主要关注如下几个方面:

  1. 订阅过程是如何实现的?

  2. Consumer 是如何与 Coordinator 协商,确定消费哪些 Partition 的?

  3. 拉取消息的过程是如何实现的?

订阅过程源码解读


    @Override    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {        /**         *  Consumer不是线程安全的,在此获取一个轻量级的锁,确保只是单线程调用         *  具体的实现是通过acquire()和release()接口,通过currentThread原子量进行一个状态的判断         *      currentThread初始化为NO_CURRENT_THREAD,在release接口再次设置为NO_CURRENT_THREAD         *         *  acquireAndEnsureOpen()接口如果是多线程调用则抛出异常         */        acquireAndEnsureOpen();        try {            // 如果groupId不存在会抛出异常            maybeThrowInvalidGroupIdException();            if (topics == null)                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");            if (topics.isEmpty()) {                // treat subscribing to empty topic list as the same as unsubscribing                this.unsubscribe();            } else {                for (String topic : topics) {                    if (Utils.isBlank(topic))                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");                }
throwIfNoAssignorsConfigured(); fetcher.clearBufferedDataForUnassignedTopics(topics); log.info("Subscribed to topic(s): {}", Utils.join(topics, ", ")); /** * 更新订阅状态subscriptions,该订阅状态主要是维护: * 1、订阅类型subscriptionType、订阅主题subscription * 2、订阅的 topic和partition 的消费位置等状态信息 */ if (this.subscriptions.subscribe(new HashSet<>(topics), listener)) /** * 更新元数据metadata,该元数据主要是维护: * 1、Kafka集群元数据的一个子集,包括集群Broker节点、Topic和Partition在节点上分布 */ metadata.requestUpdateForNewTopics(); } } finally { // 释放轻量级的锁,也就是将currentThread设置为NO_CURRENT_THREAD release(); } }
复制代码

Kafka 在文档中明确地注明了 Consumer 不是线程安全的,意味着 Consumer 被并发调用时会出现不可预期的结果。为了避免这种情况发生,Kafka 做了主动的检测并抛出异常,而不是放任系统产生不可预期的情况。Kafka“主动检测不支持的情况并抛出异常,避免系统产生不可预期的行为”这种模式,对于增强的系统的健壮性是一种非常有效的做法[1],具体实现可以参考 Kafka 的 acquireAndEnsureOpen 的实现。这对于我们的编程实现有很好的参考价值,如果系统不支持用户的某种操作,正确的做法是,检测不支持的操作,直接拒绝用户操作,并给出明确的错误提示,而不应该只是在文档中写上“不要这样做”,却放任用户错误的操作,产生一些不可预期的、奇怪的错误结果[1]。


接下来,对订阅过程中重要的步骤:更新订阅状态和更新元数据进行源码解读。


更新订阅状态源码解读


更新元数据源码解读

    public synchronized int requestUpdateForNewTopics() {        // Override the timestamp of last refresh to let immediate update.        this.lastRefreshMs = 0;        /**         * 设置更新元数据标志位         *  Kafka在pull数据之前需要对元数据进行更新         *  否则 Consumer 就不知道它应该去哪个Broker上拉哪个Partition的消息。         */        this.needPartialUpdate = true;        this.requestVersion++;        return this.updateVersion;    }
复制代码

那么元数据是如何进行更新的呢?


拉取消息实现

李玥老师给出的拉取消息的时序图:


参考资料

  • 极客时间:李玥的《消息队列高手专栏》

发布于: 刚刚阅读数: 2
用户头像

正向成长

关注

正向成长 2018.08.06 加入

想要坚定地做大规模数据处理(流数据方向),希望结合结合批处理的传统处理方式,以及之后流批混合处理方向进行学习和记录。

评论

发布
暂无评论
消息队列:Kafka Consumer源码解读