Kafka 的客户端 NetworkClient 如何发起的请求
作者:石臻臻,CSDN 博客之星 Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家、 KnowStreaming。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源! 。
前面我们有讲解Kafka的网络通信模型 , 但是那篇文章主要讲的是 作为服务端是如何处理的。
那么,今天我们再来讲一讲 客户端是如何发起请求的。
带着几个问题思考一下
如何发起 Request 请求
如果配置了多个 listeners,如何正确的选择 listener 发起请求?
Controller2Broker、Broker2Broker、Client2Broker 的区别是什么?
1 构建 Request 并发起请求
关键类
客户端发起请求的几个关键类
NetworkSend
该类继承自ByteBufferSend
, 超类是 Send,有以下几个接口
它的作用主要是用来缓存待发送的数据的, writeTo
方法会把缓存的数据写入到入参的通道里面。例如ByteBufferSend
,的写入方法如下。
Send 接口,还有很多其他的实现类。
NetworkClientUtils
客户端的工具类, 只要构建好了 NetworkClient,就可以用这个工具类发送请求。
NetworkClient
用于异步请求/响应网络 i/o 的网络客户端。这是一个内部类,用于实现面向用户的生产者和消费者客户端。这个类不是线程安全的!
NetworkClient 的一些关键属性
这里构建 NetworkClient 涉及到的 Broker 配置有:
元信息更新
请看
发送请求
主要的发送请求逻辑就是上面的关键代码, 先构建 clientRequest 请求,然后用 NetworkClientUtils 发送请求。
具体代码就不贴出来了, 简要概述一下整个流程吧
创建 networkclient 还有 clientRequest, 注意 brokerNode 是具体 Broker 的 EndPoint,一个 Broker 可能有多个 EndPoint,具体选择哪个是由调用层决定的。
开始执行发送流程
校验是否能够发送 Request,判断逻辑为:连接状态 Ready&&通道 Ready&&当然正在发送中的请求数量<
maxInFlightRequestsPerConnection
(最大未完成请求数,这个是上层参数决定的) 。当然,如果这个请求的类型是内部请求,是不需要这个判断的。如果能够发送 Request, 则开始构建 NetworkSend, 然后调用
Selector.send(send)
开始发送,这个过程其实是注册 SelectionKey.OP_WRITE 事件。当然在这之前会将请求保存起来放到 inFlightRequests 中,用于后面判断请求数是否超过阈值等等。循环遍历
networkClient.poll
获取 Response, 直到结束。
2Request 的几个场景
客户端发起请求,总共分为以下几个场景。
Controller2Broker
关键类 ControllerChannelManager
Controller 会向 Broker 发起一些请求,比如 UpdateMetadataRequest 更新元信息请求。
那么 Controller 是如何构建 networkClient 的呢?
如果 Broker 配置了多个 listeners, 怎么选择 listeners 去发送请求呢?
在 Controller 重新选举初始化的时候,或者有新的 Broker 启动上线之后, Controller 节点会执行添加 Broker 的操作。
ControllerChannelManager#addBroker
解析配置
control.plane.listener.name
获取 ControllerPlane(控制面板)使用的 listeners, 并解析出 listener_name 和 安全协议。如果没有配置control.plane.listener.name
, 则使用inter.broker.listener.name
的监听器和安全协议根据上面得到的监听器名称, 就可以指定使用哪个 BrokerNode 了。例如某个新增的 Broker 配置如下:
listeners = OUTSIDE1://172.23.164.160:9093,OUTSIDE2://172.23.164.160:9094 listener.security.protocol.map=OUTSIDE1:PLAINTEXT,OUTSIDE2:PLAINTEXT inter.broker.listener.name=OUTSIDE2
那么新增 Broker 的时候,Controller 得到的 Broker 就有 2 个 Endpoint。
那么 Controller 应该选哪个 EndPoint 去跟这个 Broker 建立连接呢?这得看 Controller 这台 Broker 的配置是什么了。假设 Controller 的配置如下:
listeners = OUTSIDE1://xxx.xxx:xxx,OUTSIDE2://xxx.xxx:xxx listener.security.protocol.map=OUTSIDE1:PLAINTEXT,OUTSIDE2:PLAINTEXT inter.broker.listener.name=OUTSIDE2
那么通过上面 1 中的判断逻辑, 会找到监听器为 OUTSIDE2 的 EndPoint 进行连接。注意:是 Controller 拿本地配置,去匹配 Broker 的 EndPoint 配置。一般情况下,所有的 Broker 配置都应该一致!如果 Controller 本地配置的监听器,不存在于其他 Broker 中会造成什么情况?如果找不到正确的 BrokerNode,从日志里面看,好像并没有打印出明显的异常。但是实际上它是会抛出一个异常 BrokerEndPointNotAvailableException.
s"End point with listener name ${listenerName.value} not found " + s"for broker $id"
构建 NetworkClient, 将用于发起网络请求。
构建完 NetworkClient,创建 RequestSendThread 线程对象,该对象包含 networClient、BrokerNode 等等实例。ThreadName 为: "Controller-当前 BrokerID-to-broker-目标 BrokerID-send-thread"
启动 RequestSendThread, 这个线程做的事情就是跟 BrokerNode 建立起连接、发起 UpdateMetadataRequest 请求, 接受请求 Response。就是上面的 NetworkClientUtils.sendAndReceive 流程
PS: 这里传入的maxInFlightRequestsPerConnection
是 1,也就说 Controller 给某个 Broker 发送请求同一时间只有一个请求。确保请求的顺序性。
Broker2Controller
在 Kafka 启动过程中,会构建一个 brokerToControllerChannelManager 的实例。这个是专门管理 Broker 向 Controller 发起请求的类,里面有一个 BrokerToControllerRequestThread 线程负责真正的想 Controller 发起请求。
线程名格式:自定义前缀:broker-${config.brokerId}-to-controller-send-thread
可以看看他里面的类, 也是先构建 networkClient, 然后发起请求。具体构建就不再分析了,跟上面的 Controller2Broker 一样。但是列出几个重点需要注意的地方:
Controller2Broker 通过配置可以找到具体的 BrokerNode, 也就是说在发起请求之前就知道向 Broker 的哪个 EndPoint 发起请求, 这个时候的 Broker2Controller 在发起之前是不知道的,只知道监听器名称。当然这个监听器名称的寻找逻辑 跟 Controller2Broker 一样,也是先找配置
control.plane.listener.name
,找不到就用inter.broker.listener.name
配置。Broker 刚启动的时候,还没有设置
activeController
,不知道谁是 Controller,所以等元信息更新之后,才拿到 Controller Broker 节点,但是 Controller 可能有多个 EndPoint,那么获取哪个呢? 当然是根据上面的 1 中获取到的监听器名 listenerName,过滤出 BrokerNode。并赋值给activeController
(是具体的 Node,一个 Broker 可以有多个 Node 的)元信息更新器是
ManualMetadataUpdater
. 这个更新器是手动更新,直接调用metadataUpdater.setNodes
来更新节点。// 获取正确的ControllerNode activeController = Option(controllerOpt.get.node(listenerName)) // 手动更新一下Nodes信息。所有其他的Broker都只接收listenerName的Node metadataUpdater.setNodes(metadataCache.getAliveBrokers.map(_.node(listenerName)).asJava)
有了 activeController 之后,就可以正常的走网络请求了。但是这个时候还没有请求进来, 它会去循环的 poll
requestQueue
里面的请求, 有请求的话就走请求流程。请求成功后,会调用回调接口request.callback.onComplete(response)
所以,你想要发送一个请求,只需要把请求参数放到队列
requestQueue
里面就行了。例如:Broker 定时向 Controller 发送 AlterIsr 请求。AlterIsrManager.star()
在这里插入图片描述
PS: 这里传入的maxInFlightRequestsPerConnection
也是 1,也就说 Broker 给 Controller 发送请求同一时间只有一个请求。确保请求的顺序性。
Broker2Broker
Broker 之间的请求, 例如 AbstractFetcherThread 副本同步线程。Follower 去 Leader Fetch 数据,FetchRequest 请求, 那么他们的通信又是什么样子呢?
基本上都是差不多的, 需要注意几个问题
Broker2Broker 之间的请求用什么 EndPoint 呢?答: 用本地的
inter.broker.listener.name
配置去匹配对应的 EndPoint。Broker2Broker 是属于内部 Broker 之间的请求。具体的代码在ReplicaManager#makeFollowers
ReplicaFetcher 的线程名: "配置的前缀 ReplicaFetcherThread-{sourceBroker.id}"
这里传入的
maxInFlightRequestsPerConnection
也是 1,也就说 Broker 给 Controller 发送请求同一时间只有一个请求。确保请求的顺序性。
Client2Broker
这个就是 例如 Producer 和 Consumer 等等向 Broker 发起请求模块。
方式都是一样的,构建自己的 networkClient,配置不同属性。
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/6c0c55c311199ddb92074cae1】。未经作者许可,禁止转载。
评论