写点什么

Kafka 消费者之 findCoordinator 源码解析

  • 2022-11-30
    江西
  • 本文字数:6720 字

    阅读完需:约 22 分钟

Kafka 消费者之 findCoordinator源码解析

1 在这里我们先来梳理一下 consumeGroup 的相关知识

1、首先,我们会给每个 consume 设置 groupId,对于相同 groupId 且订阅相同 topic 的 consume,会组成 consumeGroup,如图一所示


2、对于 Server 端的 topic 来说,会有 partition 这个概念,如图二所示

图二

3、现在我们有多个 consume 及多个 partition,到底由哪个 consume 来消费哪个 partition 呢?就由 consume 启动时的分区分配策略来决定。

  • 如果 consume 数量小于 partition 的数量,则一个 consume 有可能消费多个分区,如图三所示


  • 如果 consume 数量大于 partition 的数量,则会有 consume 线程空跑,如图四所示


  • 4、kafka 的内置 topic:consumer_offsets 专门记录消费位点信息,既然是内置 topic,那自然也会有 partition 及 partition leader 的概念,对于同一个 groupId 的消费位点都会记录在同一个 partition 中,在这篇文章中 findCoordinator 即是找到该 groupId 对应的 partition 的 leader 节点,我们知道这个节点才能将位移信息提交到这里保存,如果该 partition 还有其他副本,则该节点还会与其他副本同步位移信息。与该节点交互都是由 GroupCoordinator 完成的。

在这里插入图片描述

2findCoordinator 流程展示

在这里插入图片描述

3 客户端源码分析

这里还是放一下 findCoordinator 的代码,看其他 consume 的代码就发现客户端跟 kafkaServer 通信的格式大多是这样的,如果通信一次发现该 GroupCoordinator 的信息还未获取到则继续重试,直到超时,这里的超时时间即为 poll 时传入的超时时间,这个时间设置贯穿了整个 consume 的运行代码。

    protected synchronized boolean ensureCoordinatorReady(final Timer timer) {        //如果还未加入group则与group通信        if (!coordinatorUnknown())            return true;
        do {            if (findCoordinatorException != null && !(findCoordinatorException instanceof RetriableException)) {                final RuntimeException fatalException = findCoordinatorException;                findCoordinatorException = null;                throw fatalException;            }            final RequestFuture<Void> future = lookupCoordinator();            client.poll(future, timer);          //如果还没回调完成则说明是超时的            if (!future.isDone()) {                // ran out of time                break;            }
            if (future.failed()) {                if (future.isRetriable()) {                    log.debug("Coordinator discovery failed, refreshing metadata");                    client.awaitMetadataUpdate(timer);                } else                    throw future.exception();                //获取group的信息之后client会与group对应的节点建立连接,如果不可用则还会重试            } else if (coordinator != null && client.isUnavailable(coordinator)) {                // we found the coordinator, but the connection has failed, so mark                // it dead and backoff before retrying discovery                markCoordinatorUnknown();                timer.sleep(rebalanceConfig.retryBackoffMs);            }            //如果与group通信成功则会跳出循环        } while (coordinatorUnknown() && timer.notExpired());
        return !coordinatorUnknown();    }
复制代码

这里还有一点,跟踪代码可以看到以下代码在每次 check 以及与 Server 端通信完成之后都会有一样的逻辑,可以仔细思考一下,coordinator 即获取到的 group 节点对象,client.isUnavailable(coordinator)是在与 group 建立连接,每次判断 coordinator 不为空且 client 与 group 连接失败,则将 coordinator 置空,为什么会这样呢?很有可能是请求到 group 的信息之后发现该节点已下线或者不可用,此时服务端很有可能也在进行选举,所以我们需要将 coordinator 清空,待服务端选举完成后再次通信。

  protected synchronized Node checkAndGetCoordinator() {        if (coordinator != null && client.isUnavailable(coordinator)) {            markCoordinatorUnknown(true);            return null;        }        return this.coordinator;    }
复制代码

org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator

这段代码有个亮点就是先寻找的负载最小节点,然后与该节点通信获取 group 节点的信息。

   protected synchronized RequestFuture<Void> lookupCoordinator() {        if (findCoordinatorFuture == null) {            // find a node to ask about the coordinator            //与最小负载的node通信            Node node = this.client.leastLoadedNode();            if (node == null) {                log.debug("No broker available to send FindCoordinator request");                return RequestFuture.noBrokersAvailable();            } else {                findCoordinatorFuture = sendFindCoordinatorRequest(node);                // remember the exception even after the future is cleared so that                // it can still be thrown by the ensureCoordinatorReady caller                findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {                    @Override                    public void onSuccess(Void value) {} // do nothing
                    @Override                    public void onFailure(RuntimeException e) {                        findCoordinatorException = e;                    }                });            }        }        return findCoordinatorFuture;    }
复制代码

org.apache.kafka.clients.NetworkClient#leastLoadedNode

我们先来看看是如何寻找负载最小节点的,这里代码还是挺讲究的,首先就是取随机数,防止每次都从第一个节点连接,如果判断没有在途的 request 则直接返回该节点,否则取在途 request 最小的节点,如果该节点不存在,则依次取连接的节点、需要重试的节点,如果找到不为 null 的节点则返回该节点,否则返回 null。

public Node leastLoadedNode(long now) {        List<Node> nodes = this.metadataUpdater.fetchNodes();        if (nodes.isEmpty())            throw new IllegalStateException("There are no nodes in the Kafka cluster");        int inflight = Integer.MAX_VALUE;
        Node foundConnecting = null;        Node foundCanConnect = null;        Node foundReady = null;        //随机取一个节点        int offset = this.randOffset.nextInt(nodes.size());        for (int i = 0; i < nodes.size(); i++) {            int idx = (offset + i) % nodes.size();            Node node = nodes.get(idx);            //如果该节点是可连接的,且selector空闲,且发送队列空闲则可以发送请求            if (canSendRequest(node.idString(), now)) {                //inFlightRequests记录了已发送请求但还未收到response的request,这里判定如果该节点没有这种数据则直接作为最小负载节点返回                int currInflight = this.inFlightRequests.count(node.idString());                if (currInflight == 0) {                    // if we find an established connection with no in-flight requests we can stop right away                    log.trace("Found least loaded node {} connected with no in-flight requests", node);                    return node;                    //否则取inFlightRequests中最小count的节点作为最小负载节点                } else if (currInflight < inflight) {                    // otherwise if this is the best we have found so far, record that                    inflight = currInflight;                    foundReady = node;                }            } else if (connectionStates.isPreparingConnection(node.idString())) {                foundConnecting = node;            } else if (canConnect(node, now)) {                //如果该节点未被记录或者断连之后超过重试时间,则允许设置该节点                foundCanConnect = node;            } else {                log.trace("Removing node {} from least loaded node selection since it is neither ready " +                        "for sending or connecting", node);            }        }
        // We prefer established connections if possible. Otherwise, we will wait for connections        // which are being established before connecting to new nodes.        //优先取状态良好的节点        if (foundReady != null) {            log.trace("Found least loaded node {} with {} inflight requests", foundReady, inflight);            return foundReady;        } else if (foundConnecting != null) {            log.trace("Found least loaded connecting node {}", foundConnecting);            return foundConnecting;        } else if (foundCanConnect != null) {            log.trace("Found least loaded node {} with no active connection", foundCanConnect);            return foundCanConnect;        } else {            log.trace("Least loaded node selection failed to find an available node");            return null;        }    }
复制代码

拆解 FindCoordinatorRequest

通过下图我们来看看发送了哪些数据,key_type 有两种枚举,一种是 GROUP,另一种是 TRANSACTION,如果 type 为 GROUP 的话那 key 就是 groupId

在这里插入图片描述

4 服务端源码分析

kafka.server.KafkaApis#handleFindCoordinatorRequest

服务端还是通过 KafkaApi 来处理请求,代码也比较简单。

 def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {    val findCoordinatorRequest = request.body[FindCoordinatorRequest]    //校验数据    //……省略部分代码      // get metadata (and create the topic if necessary)      val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {        case CoordinatorType.GROUP =>            //4.1 找到对应发分区          val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key)            //4.2 获取对应的元数据          val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName)          (partition, metadata)
        case CoordinatorType.TRANSACTION =>          val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key)          val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName)          (partition, metadata)
        case _ =>          throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request")      }      //组装返回参数     //……省略部分代码    }  }
复制代码

kafka.coordinator.group.GroupMetadataManager#partitionFor

我们知道 consume 消费后对应的位点是保存在 kafka 的内部名为"__consumer_offsets"的内置 topic 中,内置 topic 初始化时由offsets.topic.num.partitions 参数来决定分区数,默认值是 50,相同 consumeGroup 的 offset 最终会保存在其中一个分区中,而保存在哪个分区就由下面这段代码来决定,可以看到逻辑很简单,就是取 groupId 的 hashCode,然后对总的分区数取模。比如 groupId 为"consume_group",最终就会在 34 号分区保存位点。

  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
复制代码

kafka.server.KafkaApis#getOrCreateInternalTopic

这里是先从当前 node 的元数据缓存中拿到对应 topic 的数据,如果没有,则创建。从这段代码也可以猜想 kafka 内置 topic 的创建原理,是一种懒加载的思想,当第一个 consume 接入之后才会创建对应 topicPartition 文件。

  private def getOrCreateInternalTopic(topic: String, listenerName: ListenerName): MetadataResponse.TopicMetadata = {    val topicMetadata = metadataCache.getTopicMetadata(Set(topic), listenerName)    topicMetadata.headOption.getOrElse(createInternalTopic(topic))  }
复制代码

这里的 topicMetadata 就是对应入参 topic 返回类似列表的对象,因为入参只有一个 topic,所以直接取第一个数据,数据结构见下图,可以更直观的理解返回参数。

在这里插入图片描述

在这里插入图片描述

kafka.server.KafkaApis#createTopic

Topic 创建的流程如下图所示,详情请看 Topic创建流程源码分析


拆解 FindCoordinatorResponse

通过下图我们来看看返回了哪些数据,可以看到前面取了很多数据,最终拼到返回参数里面的只有 leader 所在的节点信息

在这里插入图片描述

5 总结

这块代码本身不是很复杂,主要是有一些细节需要考虑,通过仔细思量这些细节对我们今后分析 consume 异常会大有好处。流程总结如下 1、寻找最小负载节点信息 2、向最小负载节点发送 FindCoordinatorRequest3、最小负载节点处理该请求。

  • 首先找到该 groupId 对应的分区

  • 通过内存中缓存的 metaData 获取该分区的信息,如果不存在则创建 topic

  • 返回查找到的分区 leader 信息

发布于: 刚刚阅读数: 2
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
Kafka 消费者之 findCoordinator源码解析_kafka_石臻臻的杂货铺_InfoQ写作社区