写点什么

Kafka 的客户端 NetworkClient 如何发起的请求

  • 2022-10-15
    江西
  • 本文字数:5232 字

    阅读完需:约 1 分钟

Kafka的客户端NetworkClient如何发起的请求

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

前面我们有讲解Kafka的网络通信模型 , 但是那篇文章主要讲的是 作为服务端是如何处理的。

那么,今天我们再来讲一讲 客户端是如何发起请求的。

带着几个问题思考一下

  1. 如何发起 Request 请求

  2. 如果配置了多个 listeners,如何正确的选择 listener 发起请求?

  3. Controller2Broker、Broker2Broker、Client2Broker 的区别是什么?

1 构建 Request 并发起请求

关键类

客户端发起请求的几个关键类

NetworkSend

该类继承自ByteBufferSend, 超类是 Send,有以下几个接口

    String destination();    boolean completed();    long writeTo(GatheringByteChannel channel) throws IOException;    long size();
复制代码

它的作用主要是用来缓存待发送的数据的, writeTo 方法会把缓存的数据写入到入参的通道里面。例如ByteBufferSend,的写入方法如下。

    @Override    public long writeTo(GatheringByteChannel channel) throws IOException {        long written = channel.write(buffers);        if (written < 0)            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");        remaining -= written;        pending = TransportLayers.hasPendingWrites(channel);        return written;    }
复制代码

Send 接口,还有很多其他的实现类。


NetworkClientUtils

客户端的工具类, 只要构建好了 NetworkClient,就可以用这个工具类发送请求。

NetworkClient

用于异步请求/响应网络 i/o 的网络客户端。这是一个内部类,用于实现面向用户的生产者和消费者客户端。这个类不是线程安全的!

NetworkClient 的一些关键属性


    /* 用于执行网络 io 的选择器 */    private final Selectable selector;
    /* Metadata元信息的更新器, 他可以尝试更新元信息 */    private final MetadataUpdater metadataUpdater;
    /* 每个节点的连接状态 */    private final ClusterConnectionStates connectionStates;
    /* 当前正在发送或等待响应的一组请求 */    private final InFlightRequests inFlightRequests;
    /* 套接字发送缓冲区大小(以字节为单位) */    private final int socketSendBuffer;
    /* 套接字接收大小缓冲区(以字节为单位) */    private final int socketReceiveBuffer;
    /* 用于在对服务器的请求中识别此客户端的客户端 ID */    private final String clientId;
    /* 向服务器发送请求时使用的当前关联 ID*/    private int correlation;
    /* 单个请求等待服务器确认的默认超时*/    private final int defaultRequestTimeoutMs;  //.... 省略

复制代码

这里构建 NetworkClient 涉及到的 Broker 配置有:

元信息更新

请看

发送请求

   // 根据拿到的BrokerNode,和RequestBuilder构建 Request请求           val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,             time.milliseconds(), true)      // 发起请求并接受Response           clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
复制代码

主要的发送请求逻辑就是上面的关键代码, 先构建 clientRequest 请求,然后用 NetworkClientUtils 发送请求。

具体代码就不贴出来了, 简要概述一下整个流程吧

  1. 创建 networkclient 还有 clientRequest, 注意 brokerNode 是具体 Broker 的 EndPoint,一个 Broker 可能有多个 EndPoint,具体选择哪个是由调用层决定的。

  2. 开始执行发送流程

  3. 校验是否能够发送 Request,判断逻辑为:连接状态 Ready&&通道 Ready&&当然正在发送中的请求数量<maxInFlightRequestsPerConnection(最大未完成请求数,这个是上层参数决定的) 。当然,如果这个请求的类型是内部请求,是不需要这个判断的。

  4. 如果能够发送 Request, 则开始构建 NetworkSend, 然后调用Selector.send(send) 开始发送,这个过程其实是注册 SelectionKey.OP_WRITE 事件。当然在这之前会将请求保存起来放到 inFlightRequests 中,用于后面判断请求数是否超过阈值等等。

  5. 循环遍历 networkClient.poll 获取 Response, 直到结束。

2Request 的几个场景

客户端发起请求,总共分为以下几个场景。

Controller2Broker

关键类 ControllerChannelManager

Controller 会向 Broker 发起一些请求,比如 UpdateMetadataRequest 更新元信息请求。

  1. 那么 Controller 是如何构建 networkClient 的呢?

  2. 如果 Broker 配置了多个 listeners, 怎么选择 listeners 去发送请求呢?

在 Controller 重新选举初始化的时候,或者有新的 Broker 启动上线之后, Controller 节点会执行添加 Broker 的操作。

ControllerChannelManager#addBroker


private def addNewBroker(broker: Broker): Unit = {    val messageQueue = new LinkedBlockingQueue[QueueItem]    // 获取内部Broker之间通信的监听器名称    val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)    // 读取内部Broker之间通信的安全协议    val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)    // 根据监听器名称选择合适的节点和监听器名称    val brokerNode = broker.node(controllerToBrokerListenerName)    // 省略部分    ....... val networkClient = new NetworkClient(        selector,        new ManualMetadataUpdater(Seq(brokerNode).asJava),        config.brokerId.toString,        // 一次只能发一个请求,保证顺序性        1,        0,        0,        Selectable.USE_DEFAULT_BUFFER_SIZE,        Selectable.USE_DEFAULT_BUFFER_SIZE,        config.requestTimeoutMs,        config.connectionSetupTimeoutMs,        config.connectionSetupTimeoutMaxMs,        ClientDnsLookup.USE_ALL_DNS_IPS,        time,        false,        new ApiVersions,        logContext      )    // 省略部分    .......    val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,      brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)}


复制代码
  1. 解析配置control.plane.listener.name 获取 ControllerPlane(控制面板)使用的 listeners, 并解析出 listener_name 和 安全协议。如果没有配置control.plane.listener.name, 则使用inter.broker.listener.name的监听器和安全协议

  2. 根据上面得到的监听器名称, 就可以指定使用哪个 BrokerNode 了。例如某个新增的 Broker 配置如下:

    listeners = OUTSIDE1://172.23.164.160:9093,OUTSIDE2://172.23.164.160:9094 listener.security.protocol.map=OUTSIDE1:PLAINTEXT,OUTSIDE2:PLAINTEXT inter.broker.listener.name=OUTSIDE2

    那么新增 Broker 的时候,Controller 得到的 Broker 就有 2 个 Endpoint。

  1. 那么 Controller 应该选哪个 EndPoint 去跟这个 Broker 建立连接呢?这得看 Controller 这台 Broker 的配置是什么了。假设 Controller 的配置如下:

    listeners = OUTSIDE1://xxx.xxx:xxx,OUTSIDE2://xxx.xxx:xxx listener.security.protocol.map=OUTSIDE1:PLAINTEXT,OUTSIDE2:PLAINTEXT inter.broker.listener.name=OUTSIDE2

    那么通过上面 1 中的判断逻辑, 会找到监听器为 OUTSIDE2 的 EndPoint 进行连接。注意:是 Controller 拿本地配置,去匹配 Broker 的 EndPoint 配置。一般情况下,所有的 Broker 配置都应该一致!如果 Controller 本地配置的监听器,不存在于其他 Broker 中会造成什么情况?如果找不到正确的 BrokerNode,从日志里面看,好像并没有打印出明显的异常。但是实际上它是会抛出一个异常 BrokerEndPointNotAvailableException.

    s"End point with listener name ${listenerName.value} not found " +     s"for broker $id"

  2. 构建 NetworkClient, 将用于发起网络请求。

  3. 构建完 NetworkClient,创建 RequestSendThread 线程对象,该对象包含 networClient、BrokerNode 等等实例。ThreadName 为: "Controller-当前 BrokerID-to-broker-目标 BrokerID-send-thread"

  4. 启动 RequestSendThread, 这个线程做的事情就是跟 BrokerNode 建立起连接、发起 UpdateMetadataRequest 请求, 接受请求 Response。就是上面的 NetworkClientUtils.sendAndReceive 流程

PS: 这里传入的maxInFlightRequestsPerConnection 是 1,也就说 Controller 给某个 Broker 发送请求同一时间只有一个请求。确保请求的顺序性。

Broker2Controller

在 Kafka 启动过程中,会构建一个 brokerToControllerChannelManager 的实例。这个是专门管理 Broker 向 Controller 发起请求的类,里面有一个 BrokerToControllerRequestThread 线程负责真正的想 Controller 发起请求。

  brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)  brokerToControllerChannelManager.start()
复制代码

线程名格式:自定义前缀:broker-${config.brokerId}-to-controller-send-thread

可以看看他里面的类, 也是先构建 networkClient, 然后发起请求。具体构建就不再分析了,跟上面的 Controller2Broker 一样。但是列出几个重点需要注意的地方:

  1. Controller2Broker 通过配置可以找到具体的 BrokerNode, 也就是说在发起请求之前就知道向 Broker 的哪个 EndPoint 发起请求, 这个时候的 Broker2Controller 在发起之前是不知道的,只知道监听器名称。当然这个监听器名称的寻找逻辑 跟 Controller2Broker 一样,也是先找配置control.plane.listener.name ,找不到就用inter.broker.listener.name配置。

  2. Broker 刚启动的时候,还没有设置activeController,不知道谁是 Controller,所以等元信息更新之后,才拿到 Controller Broker 节点,但是 Controller 可能有多个 EndPoint,那么获取哪个呢? 当然是根据上面的 1 中获取到的监听器名 listenerName,过滤出 BrokerNode。并赋值给activeController(是具体的 Node,一个 Broker 可以有多个 Node 的)

  3. 元信息更新器是ManualMetadataUpdater. 这个更新器是手动更新,直接调用metadataUpdater.setNodes 来更新节点。

       // 获取正确的ControllerNode     activeController = Option(controllerOpt.get.node(listenerName))     // 手动更新一下Nodes信息。所有其他的Broker都只接收listenerName的Node     metadataUpdater.setNodes(metadataCache.getAliveBrokers.map(_.node(listenerName)).asJava)

  4. 有了 activeController 之后,就可以正常的走网络请求了。但是这个时候还没有请求进来, 它会去循环的 poll requestQueue里面的请求, 有请求的话就走请求流程。请求成功后,会调用回调接口request.callback.onComplete(response)

  5. 所以,你想要发送一个请求,只需要把请求参数放到队列requestQueue里面就行了。例如:Broker 定时向 Controller 发送 AlterIsr 请求。

    AlterIsrManager.star()

  1. 在这里插入图片描述

PS: 这里传入的maxInFlightRequestsPerConnection 也是 1,也就说 Broker 给 Controller 发送请求同一时间只有一个请求。确保请求的顺序性。

Broker2Broker

Broker 之间的请求, 例如 AbstractFetcherThread 副本同步线程。Follower 去 Leader Fetch 数据,FetchRequest 请求, 那么他们的通信又是什么样子呢?

基本上都是差不多的, 需要注意几个问题

  1. Broker2Broker 之间的请求用什么 EndPoint 呢?答: 用本地的inter.broker.listener.name 配置去匹配对应的 EndPoint。Broker2Broker 是属于内部 Broker 之间的请求。具体的代码在 ReplicaManager#makeFollowers


  1. ReplicaFetcher 的线程名: "配置的前缀 ReplicaFetcherThread-{sourceBroker.id}"

  2. 这里传入的maxInFlightRequestsPerConnection 也是 1,也就说 Broker 给 Controller 发送请求同一时间只有一个请求。确保请求的顺序性。

Client2Broker

这个就是 例如 Producer 和 Consumer 等等向 Broker 发起请求模块。

方式都是一样的,构建自己的 networkClient,配置不同属性。

发布于: 2022-10-15阅读数: 18
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019-09-06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
Kafka的客户端NetworkClient如何发起的请求_Kafk_石臻臻的杂货铺_InfoQ写作社区