写点什么

图解 Kafka 的服务端的网络通信模型

  • 2022 年 9 月 10 日
    江西
  • 本文字数:9245 字

    阅读完需:约 30 分钟

图解Kafka的服务端的网络通信模型

作者石臻臻,CSDN 博客之星 Top5Kafka Contributornacos Contributor华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家 KnowStreaming


KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源!

  1. Kafka 网络模型使用的是什么线程模型?

  2. 什么是 ControllerPlane(控制器面板),什么是 DataPlane(数据面板)?

  3. Kafka 整个请求流程是什么样子的

  4. 与 Kafka 网络通信相关的配置。

为更好的阅读体验,和及时的勘误请访问原文链接:图解Kafka服务端网络通信模型

1Kafka 的网络模型

Kafka 中的网络模型就是基于 主从 Reactor 多线程进行设计的, 在整体讲述 Kafka 网络模型之前,我们现在按照源码中的相关类来讲解一下他们分别都是用来做什么的.

关键类解析

SocketServer

这个类是网络通信的核心类,它持有这 Acceptor 和 Processor 对象。

ConnectionQuotas

这个是控制连接数配额的类,

涉及到的 Broker 配置有:

AbstractServerThread

AbstractServerThread 类:这是 Acceptor 线程和 Processor 线程的抽象基类,它定义了一个抽象方法wakeup() ,主要是用来唤醒 Acceptor 线程和 Processor 对应的Selector的, 当然还有一些共用方法

Acceptor 和 Processor

Acceptor 线程类:继承自 AbstractServerThread, 这是接收和创建外部 TCP 连接的线程。每个 SocketServer 实例一般会创建一个 Acceptor 线程(如果listeners配置了多个就会创建多个 Acceptor)。它的唯一目的就是创建连接,并将接收到的 SocketChannel(SocketChannel 通道用于传输数据) 传递给下游的 Processor 线程处理,Processor 主要是处理连接之后的事情,例如读写 I/O。

涉及到的 Broker 配置有:

Processor 线程类:这是处理单个 TCP 连接上所有请求的处理线程。每个 Acceptor 实例创建若干个(num.network.threads)Processor 线程。Processor 线程负责将接收到的 SocketChannel(SocketChannel 通道用于传输数据。), 注册读写事件,当数据传送过来的时候,会立即读取 Request 数据,通过解析之后, 然后将其添加到 RequestChannel 的 requestQueue 队列上,同时还负责将 Response 返还给 Request 发送方。

涉及到的 Broker 配置有:

简单画了一张两个类之间的关系图

在这里插入图片描述

  1. 这两个类都是 AbstractServerThead 的实现类,超类是Runnable 可运行的。

  2. 每个 Acceptor 持有num.network.threads个 Processor 线程, 假如配置了多个listeners,那么总共 Processor 线程数是 listeners*num.network.threads.

  3. Acceptor 创建的是 ServerSocketChannel 通道,这个通道是用来监听新进来的 TCP 链接的通道,通过serverSocketChannel.accept()方法可以拿到 SocketChannel 通道用于传输数据。

  4. 每个 Processor 线程都有一个唯一的 id,并且通过 Acceptor 拿到的 SocketChannel 会被暂时放入到newConnections队列中

  5. 每个 Processor 都创建了自己的 Selector

  6. Processor 会不断的从自身的newConnections队列里面获取新 SocketChannel,并注册读写事件,如果有数据传输过来,则会读取数据,并解析成 Request 请求。

既然两个都是可执行线程,那我们看看两个线程的run方法都做了哪些事情

Acceptor.run

def run(): Unit = {    //将serverChannel 注册到nioSelector上,并且对 Accept事件感兴趣:表示服务器监听到了客户连接,那么服务器可以接收这个连接了    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)    try {      var currentProcessorIndex = 0      while (isRunning) {        try {          //返回感兴趣的事件数量  这里是感兴趣的是SelectionKey.OP_ACCEPT,监听到新的链接          val ready = nioSelector.select(500)          if (ready > 0) {            //获取所有就绪通道            val keys = nioSelector.selectedKeys()            val iter = keys.iterator()            //遍历所有就绪通道            while (iter.hasNext && isRunning) {              try {                val key = iter.next                iter.remove()                //只处理   Accept事件,其他的事件则抛出异常,ServerSocketChannel是 监听Tcp的链接通道                if (key.isAcceptable) {                  //根据Key 拿到SocketChannle = serverSocketChannel.accept(),然后再遍历                  accept(key).foreach { socketChannel =>                                        //将socketChannel分配给我们的 processor来处理,如果有多个socketChannel 则按照轮训分配的原则                    //如果一个processor 中能够处理的newconnection 队列满了放不下了,则找下一个                    // 如果所有的都放不下,则会一直循环直到有processor能够处理。
                    var retriesLeft = synchronized(processors.length)                    var processor: Processor = null                    do {                      retriesLeft -= 1                      //轮训每个processors来处理                      processor = synchronized {                        // adjust the index (if necessary) and retrieve the processor atomically for                        // correct behaviour in case the number of processors is reduced dynamically                        currentProcessorIndex = currentProcessorIndex % processors.length                        processors(currentProcessorIndex)                      }                      currentProcessorIndex += 1                    } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))                  }                } else                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")              } catch {                case e: Throwable => error("Error while accepting connection", e)              }            }          }        }        catch {          省略        }      }    } finally {     省略    }  }
复制代码
  1. 将 ServerSocketChannel 通道注册到 nioSelector 上,并关注事件 SelectionKey.OP_ACCEPT

    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)

  2. while 循环,持续阻塞监听事件,超时时间 500ms

     // 阻塞查询Selector是否有监听到新的事件  val ready = nioSelector.select(500)  // 如果有事件,则查询具体的事件和通道  if(ready>0>{    //获取所有就绪事件准备处理         val keys = nioSelector.selectedKeys()  }

  3. 遍历刚刚监听到的事件, 如果该 SelectionKey 不包含OP_ACCEPT(建立连接)事件,则抛出异常,通常不会出现这个异常。

    Unrecognized key state for acceptor thread

  4. 如果 SelectionKey 包含OP_ACCEPT(建立连接)事件,则可以通过这个 SelectionKey 拿到 serverSocketChannel,通过 serverSocketChannel 拿到 socketChannel,并且将 SocketChannel 设置为非阻塞模式。

      val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]  // 调用accept方法就可以拿到ScoketChannel了。   val socketChannel = serverSocketChannel.accept()    //设置为非阻塞模式 就可以在异步模式下调用connect(), read() 和write()了。   socketChannel.configureBlocking(false)

  5. 接下来,把上面拿到的 SocketChannel 以遍历的形式给 Acceptor 下面的 Procesor, 让 Processor 来执行后面的处理。分配的体现形式是, 将拿到的 SocketChannel 保存在 Processor 中的newConnections阻塞队列中,这个newConnections上限是 20,在代码里面写死了的,也就是说一个 Processor 同时最多只能处理 20 个连接, 那么所有的 Processor 能处理的最大连接就是 Processor 数量 * 20;如果你的连接请求并发度很高,可以尝试调大num.network.threads

  6. 最后,如果newConnections队列放入了一个新的 SocketChannel,则会调用一下对应 Processor 实例的wakeup()方法。

Procesor.run


  override def run(): Unit = {    startupComplete()    try {      while (isRunning) {        try {          // setup any new connections that have been queued up          // 将之前监听到的TCP链接(暂时保存在newConnections中) 开始注册监听OP_READ事件到每个Processor的 KSelector选择器中。          configureNewConnections()          // register any new responses for writing          processNewResponses()          //在不阻塞的情况下对每个连接执行任何 I/O 操作。这包括完成连接、完成断开连接、启动新发送或在进行中的发送或接收上取得进展。          // 当此调用完成时,用户可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()检查已完成的发送、接收、连接或断开连接。          poll()          // 把请求解析后放到 requestChannels 队列中,异步处理          processCompletedReceives()          //处理已经发送完成的请求          processCompletedSends()          processDisconnected()          closeExcessConnections()        } catch {          // We catch all the throwables here to prevent the processor thread from exiting. We do this because          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would          // be either associated with a specific socket channel or a bad request. These exceptions are caught and          // processed by the individual methods above which close the failing channel and continue processing other          // channels. So this catch block should only ever see ControlThrowables.          case e: Throwable => processException("Processor got uncaught exception.", e)        }      }    } finally {      debug(s"Closing selector - processor $id")      CoreUtils.swallow(closeAll(), this, Level.ERROR)      shutdownComplete()    }  }

复制代码
  1. configureNewConnections(): 之前 Acceptor 监听到的 SocketChannel 保存在 Procesor 中的newConnections阻塞队列中, 现在开始将newConnections阻塞队列一个个取出来,向 Procesor 的 Selector 注册 SocketChannel 通道,并且感兴趣的事件为SelectionKey.OP_READ读事件。

  2. processNewResponses() : 去 Processor 里面的无边界阻塞队列responseQueue里面获取 RequestChannel.Response 数据, 如果有数据并且需要返回 Response 的话, 则通过 channel 返回数据. 具体的 Channel 是根据 connectionId 获取之前构建的 KafkaChannel, KafkaChannel 则会通过监听 SelectionKey.OP_WRITE。然后调用writeTo方法。至于responseQueue这个队列是什么时候入队的,我们后面再分析

  3. poll(): 这个方法里面执行的就很多了, 这个方法底层调用的是selector.poll(); 将监听到的事件批量处理,它才是执行 I/O 请求的最终地方, 它正对每个连接执行任何的 I/O 操作,这包括了 完成连接、完成断开连接、启动新发送等等。像校验身份信息,还有 handshake 等等这些也都是在这里执行的。

  4. processCompletedReceives(): 处理所有 completedReceives(已完成接收的请求)进行接下来的处理, 处理的方式是解析一下收到的请求,最终调用了requestChannel.sendRequest(req). 也就是说所有的请求最终通过解析放入到了 RequestChannel 中的requestQueue阻塞队列中, 这个阻塞队列的大小为queued.max.requests默认 500;表示的是在阻塞网络线程之前,数据平面允许的排队请求数 PS: 这个completedReceives 是在 poll()方法中添加的元素。

  5. processCompletedSends(): 它负责处理 Response 的回调逻辑,通过遍历completedSends(已完成发送)集合 可以从inflightResponses中移除并拿到 response 对象,然后再调用回调逻辑。PS: 这个completedSends 是在 poll()方法中添加的元素。

  6. processDisconnected(): 处理断开链接的情况, connectionQuotas 连接限流减掉这个链接,inflightResponses 也移除对应连接。

  7. closeExcessConnections(): 关闭超限连接 ,当总连接数 >max.connections && (inter.broker.listener.name!=listener|| listeners 数量==1) 则需要关闭一些连接.简单来说就是:就算 Broker 已经达到了最大连接数的限制了, 也应该允许 broker 之间监听器上的连接, 这种情况下,将会关闭另外一个监听器上最近最少使用的连接。broker 之间的监听器是配置inter.broker.listener.name 决定的所谓优先关闭,是指在诸多 TCP 连接中找出最近未被使用的那个。这里“未被使用”就是说,在最近一段时间内,没有任何 Request 经由这个连接被发送到 Processor 线程。

RequestChannel

这个类保存这所有的 Processor,还有一个阻塞队列保存这待处理请求。这个队列最大长度由queued.max.requests控制,当待处理请求超过这个数值的时候网络就会阻塞


涉及到的 Broker 配置有:

KafkaApis

具体 Request 的处理类, 所有的请求方法处理逻辑都放在这个里面。

KafkaRequestHandlerPool

KafkaRequestHandler 的线程池,KafkaRequestHandler 线程的数量由配置num.io.threads决定。

在这里插入图片描述

涉及到的 Broker 配置有:

KafkaRequestHandler

请求处理类, 每个 Handler 都会去 requestChannel 的 requestQueue 队列里面 poll 请求, 然后去处理,最终调用的处理方法是 KafkaApis.handle()

这几个类之间的关系如下

在这里插入图片描述

通信流程总结

在这里插入图片描述

  1. KafkaServer 启动的时候,会根据listeners的配置来初始化对应的实例。

  2. 一个listeners对应一个 Acceptor,一个 Acceptor 持有若干个(num.network.threads)Processor 实例。

  3. Acceptor 中的 nioSelector 注册的是 ServerSocketChannel 通道,并监听 OP_ACCEPT 事件,它只负责 TCP 创建和连接,不包含读写数据。

  4. 当 Acceptor 监听到新的连接之后,就会通过调用socketChannel = serverSocketChannel.accept()拿到 SocketChannel,然后把 SocketChannel 保存在 Processor 里面的newConnection队列中。 那么具体保存在哪个 Processor 中呢?当然是轮询分配了,确保负载均衡嘛。当然每个 Processor 的newConnection队列最大只有 20,并且是代码写死的。如果一个 Processor 满了,则会寻找下一个存放,如果所有的都满了,那么就会阻塞。一个 Acceptor 的所有 Processor 最大能够并发处理的请求是 20 * num.network.threads

  5. Processor 会持续的从自己的newConnection中 poll 数据,拿到 SocketChannel 之后,就把它注册到自己的 Selector 中,并且监听事件 OP_READ。 如果newConnection是空的话,poll 的超时时间是 300ms。

  6. 监听到有新的事件,比较 READ,则会读取数据,并且解析成 Request, 把 Request 放入到 RequestChannel 中的requestQueue阻塞队列中。所有的待处理请求都是临时放在这里面。这个队列也有最大值queued.max.requests(默认500),超过这个大小就会阻塞。

  7. KafkaRequestHandlerPool 中创建了很多(num.io.threads(默认8))的 KafkaRequestHandler,用来处理 Request, 他们都会一直从 RequestChannel 中的requestQueue队列中 poll 新的 Request,来进行处理。

  8. 处理 Request 的具体逻辑是 KafkaApis 里面。当 Request 处理完毕之后,会调用 requestChannel.sendResponse()返回 Response。

  9. 当然,请求 Request 和返回 Response 必须是一一对应的, 你这个请求是哪个 Processor 监听到的,则需要哪个 Processor 返回, 他们通过 id 来标识。

  10. Response 也不是里面返回的,而是先放到 Processor 中的 ResponseQueue 队列中,然后慢慢返回给客户端。

数据面板(DataPlane)

数据面板是用来处理 Broker 与 Broker/Client 之间的网络模型模块, 与之相对的是控制器面板

控制器面板 是专门用于 Controller 与 Broker 之间的网络通信模块。

其实本质上他们都是一模一样的, 但是为了将 Controller 的通信和普通通信隔离,才有这么两个概念。

上面的网络通信模型就是以数据面板来分析的,因为本质是一样的, 只是有一些配置不一样。

那么.数据面板 就不详细讲了,我们主要讲下控制器面板的不一样的地方

控制器面板(ControllerPlane)

控制器面板是用来专门处理 Controller 相关请求的独立通信模块。

大家都知道,Controller 是一个很重要的角色,基本上大部分协调整个集群的相关请求都跟它有关系, 比如创建 Topic、删除 Topic、分区副本重分配、等等。他们都很重要

但是一般情况下数据面板的请求很多,如果因为请求过多而导致 Controller 相关请求被阻塞不能执行,那么可能会造成一些影响, 所以我们可以让 Controller 类的请求有一个单独的通信模块。

首先,要启用控制器面板,必须配置control.plane.listener.name. 并且这个监听器名称必须在listeners里面有配置

否则的话,是不会专用的控制器链接的 EndPoint 的。

例如:Broker 配置

## 所有的监听器isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
## 监听器对应的安全协议listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
## 控制器control.plane.listener.name = CONTROLLER
复制代码

在启动时,代理将开始使用安全协议“SSL”监听“192.1.1.8:9094”。在控制器端,当它通过 zookeeper 发现代理发布的端点时,它将使用 control.plane.listener.name 找到端点,它将用于建立与代理的连接。


  1. 必须配置control.plane.listener.name 才能使用独立的控制器面板

  2. 控制器面板的 RequestChannel 中的requestQueue不是由queued.max.requests控制的,而是写死的 20. 因为控制类请求不会有那么大的并发

  3. 跟 DataPlane 相关隔离,互不影响。但是连接限流 ConnectionQuotas 是共享的,限流的时候,两个是算在一起的

  4. 控制类面板只有一个 Acceptor 和一个 Processor,这个跟数据面板的区别是 DataPlane 的 Processor 可以有多个。

涉及到的 Broker 配置有:



上面我们主要分析了一下, Kafka 中的网络通信模型, 那么聪明的你应该肯定能够看的出来,它是使用线程模型中的 Reactor 模式来实现的。

2 线程模型: Reactor 模式

该模块详细请参考Reactor 模型

Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。

根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:

  1. 单 Reactor 单线程;

  2. 单 Reactor 多线程;

  3. 主从 Reactor 多线程。

我们主要了解一下 主从 Reactor 多线程


针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。

方案说明:

  • Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor 接收,处理建立连接事件;

  • Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理;

  • SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件;

  • 当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应;

  • Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;

  • Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理;

  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client。

更详细的介绍可以看 Reactor 模型

3 问答

  1. Kafka 的网络模型使用了 Reactor 模式的哪种实现方式?

  1. 单 Reactor 单线程;

  2. 单 Reactor 多线程;

  3. 主从 Reactor 多线程。

答案: 3 。 使用了主从 Reactor 多线程的实现方式.

在这里插入图片描述

MainReactor(Acceptor)只负责监听 OP_ACCEPT 事件, 监听到之后把 SocketChannel 传递给 SubReactor(Processor), 每个 Processor 都有自己的 Selector。SubReactor 会监听并处理其他的事件,并最终把具体的请求传递给 KafkaRequestHandlerPool。

很典型的主从 Reactor 多线程模式。

  1. 什么是 ControllerPlane(控制器面板),什么是 DataPlane(数据面板)?

控制器面板: 主要处理控制器类的的请求数据面板: 主要处理数据类的请求。

让他们隔离,互不影响,比如说普通的请求太多,导致了阻塞, 那么 Controller 相关的请求也可能被阻塞了,所以让他们隔离,不会互相影响。

但是默认情况下, ControllerPlane 是没有设置的,也就是 Controller 相关的请求还是走的 DataPlane。 想要隔离的话必须设置control.plane.listener.name .

  1. 必须配置control.plane.listener.name

  2. 控制器面板的 RequestChannel 中的requestQueue不是由queued.max.requests控制的,而是写死的 20. 因为控制类请求不会有那么大的并发

  3. 跟 DataPlane 相关隔离,互不影响。但是连接限流 ConnectionQuotas 是共享的,限流的时候,两个是算在一起的

  4. 控制类面板只有一个 Acceptor 和一个 Processor,这个跟数据面板的区别是 DataPlane 的 Processor 可以有多个。

  1. Kafka 整个请求流程是什么样子的

请看上面网络通信总结部分。

发布于: 刚刚阅读数: 4
用户头像

关注公众号: 石臻臻的杂货铺 获取最新文章 2019.09.06 加入

进高质量滴滴技术交流群,只交流技术不闲聊 加 szzdzhp001 进群 20w字《Kafka运维与实战宝典》PDF下载请关注公众号:石臻臻的杂货铺

评论

发布
暂无评论
图解Kafka的服务端的网络通信模型_kafka_石臻臻的杂货铺_InfoQ写作社区