提示:本文可能已过期,请点击原文查看:寻找协调器FindCoordinatorRequest请求流程
@
1 客户端发起请求
我们在分析消费者的时候, 有看到调用 FindCoordinatorRequest 的请求
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) { // initiate the group metadata request log.debug("Sending FindCoordinator request to broker {}", node); FindCoordinatorRequest.Builder requestBuilder = new FindCoordinatorRequest.Builder( new FindCoordinatorRequestData() .setKeyType(CoordinatorType.GROUP.id()) .setKey(this.rebalanceConfig.groupId)); return client.send(node, requestBuilder) .compose(new FindCoordinatorResponseHandler()); }
复制代码
2Broker 处理请求
def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = { val findCoordinatorRequest = request.body[FindCoordinatorRequest] // 根据协调器类型判断是否授权过 if (findCoordinatorRequest.data.keyType == CoordinatorType.GROUP.id && !authorize(request.context, DESCRIBE, GROUP, findCoordinatorRequest.data.key)) sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) else if (findCoordinatorRequest.data.keyType == CoordinatorType.TRANSACTION.id && !authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, findCoordinatorRequest.data.key)) sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) else { // get metadata (and create the topic if necessary) val (partition, topicMetadata) = CoordinatorType.forId(findCoordinatorRequest.data.keyType) match { case CoordinatorType.GROUP => val partition = groupCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.context.listenerName) (partition, metadata)
case CoordinatorType.TRANSACTION => val partition = txnCoordinator.partitionFor(findCoordinatorRequest.data.key) val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.context.listenerName) (partition, metadata)
case _ => throw new InvalidRequestException("Unknown coordinator type in FindCoordinator request") }
def createResponse(requestThrottleMs: Int): AbstractResponse = { def createFindCoordinatorResponse(error: Errors, node: Node): FindCoordinatorResponse = { new FindCoordinatorResponse( new FindCoordinatorResponseData() .setErrorCode(error.code) .setErrorMessage(error.message) .setNodeId(node.id) .setHost(node.host) .setPort(node.port) .setThrottleTimeMs(requestThrottleMs)) } val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) { createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } else { val coordinatorEndpoint = topicMetadata.partitions.asScala .find(_.partitionIndex == partition) .filter(_.leaderId != MetadataResponse.NO_LEADER_ID) .flatMap(metadata => metadataCache.getAliveBroker(metadata.leaderId)) .flatMap(_.getNode(request.context.listenerName)) .filterNot(_.isEmpty)
coordinatorEndpoint match { case Some(endpoint) => createFindCoordinatorResponse(Errors.NONE, endpoint) case _ => createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode) } } trace("Sending FindCoordinator response %s for correlation id %d to client %s." .format(responseBody, request.header.correlationId, request.header.clientId)) responseBody } sendResponseMaybeThrottle(request, createResponse) } }
复制代码
简单校验
根据协调器类型判断是否有被授权。协调器类型有GROUP((byte) 0), TRANSACTION((byte) 1)两种
获取分区号和元信息
这里的接口分两种情况,一个是协调列席为 GROUP 一个是 TRANSACTION 他们的处理逻辑都是一样的,只是处理的 Topic 不一样
GROUP 对应的 Topic 是 __consumer_offsets
TRANSACTION 对应的 Topic 是__transaction_state
这里我们主要分析一下 GROUP 的情况
去 zk 获取/brokers/topic/__consumer_offsets数据 找到消费者 Topic 的分区总数。默认是 50. (由offsets.topic.num.partitions 控制)找到分区数之和后, 则计算Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount(groupID 按分区数取模运算)获取到了分区号partition;
然后接着获取该 Topic 的元信息, 这里需要注意的是 去获取元信息应该走的是什么 监听协议(listenerName) 呢?这个主要是看当前处理请求的 Broker 是通过哪个入口来的。比如说该 Broker 有两个监听口,listeners = INTER://xxx.xx.xx.100:9091, OUTSIDE://xxx.xx.xx.101:9092 .如果客户端发起请求的时候是对xxx.xx.xx.101:9092发起的请求,那么这个对应的监听器就是OUTSIDE . 那么 Broker 去获取__consumer_offsets元信息发起请求的时候也是会用的OUTSIDE 协议。
如果发现没有这个 Topic 的元信息,则需要去创建__consumer_offsetsTopic 。注意:创建这个 Topic 的的几个特殊属性:
构建返回数据 createResponse
这里才是真正的找到协调器的主要逻辑, 这里的判断逻辑是
上面我们获取到的分区号是partition, 我们同样获取到了__consumer_offsets的元信息 Metadata。
那我们就可以获取到这个分区号, 并且就能够找到该分区的 LeaderId 所属在哪个 Broker 上。
知道了哪个 Broker, 那我们就能够获取到对应的 EndPoint, 一个 Broker 可能同时有多个 EndPoint(配置了多个监听器),那么我们应该使用哪个 EndPoint 呢?
这个的判断逻辑与上面说过的一样,客户端发起请求时候的监听器是哪个,那么这里就应该用哪个监听器。
在这里插入图片描述
注意:如果找到的分区 Leader 不存在 那么这个协调器就不存在
然后会返回异常:
The coordinator is not available
复制代码
3 问题
如果客户端走的外网监听器访问的集群,那么在客户端发起请求之后到集群内部,触发内部调用链的请求,那么内部这个调用链是用什么监听器访问的呢?
从客户端 -> Broker -> 其他 Broker. 这是一个调用链路,从最开始用的是什么监听器那么这条链路上都是用的这个监听器!具体请看:多网络情况下,Kafka客户端如何选择合适的网络发起请求
评论