写点什么

手绘模型图带你认识 Kafka 服务端网络模型

  • 2022 年 4 月 02 日
  • 本文字数:8377 字

    阅读完需:约 27 分钟

本文分享自华为云社区《图解Kafka服务端网络模型》,作者:石臻臻的杂货铺 。


Kafka 中的网络模型就是基于主从 Reactor 多线程进行设计的,在整体讲述 Kafka 网络模型之前,我们现在按照源码中的相关类来讲解一下他们分别都是用来做什么的。

关键类解析

SocketServer

这个类是网络通信的核心类,它持有这 Acceptor 和 Processor 对象。

ConnectionQuotas

这个是控制连接数配额的类,涉及到的 Broker 配置有:

AbstractServerThread

AbstractServerThread 类:这是 Acceptor 线程和 Processor 线程的抽象基类,它定义了一个抽象方法 wakeup() ,主要是用来唤醒 Acceptor 线程和 Processor 对应的 Selector 的,当然还有一些共用方法

Acceptor 和 Processor

Acceptor 线程类:继承自 AbstractServerThread, 这是接收和创建外部 TCP 连接的线程。每个 SocketServer 实例一般会创建一个 Acceptor 线程(如果 listeners 配置了多个就会创建多个 Acceptor)。它的唯一目的就是创建连接,并将接收到的 SocketChannel(SocketChannel 通道用于传输数据) 传递给下游的 Processor 线程处理,Processor 主要是处理连接之后的事情,例如读写 I/O。

涉及到的 Broker 配置有:

Processor 线程类:这是处理单个 TCP 连接上所有请求的处理线程。每个 Acceptor 实例创建若干个(num.network.threads)Processor 线程。Processor 线程负责将接收到的 SocketChannel(SocketChannel 通道用于传输数据。), 注册读写事件,当数据传送过来的时候,会立即读取 Request 数据,通过解析之后, 然后将其添加到 RequestChannel 的 requestQueue 队列上,同时还负责将 Response 返还给 Request 发送方。


涉及到的 Broker 配置有:

简单画了一张两个类之间的关系图

在这里插入图片描述

  1. 这两个类都是 AbstractServerThead 的实现类,超类是 Runnable 可运行的。

  2. 每个 Acceptor 持有 num.network.threads 个 Processor 线程, 假如配置了多个 listeners,那么总共 Processor 线程数是 listeners*num.network.threads。

  3. Acceptor 创建的是 ServerSocketChannel 通道,这个通道是用来监听新进来的 TCP 链接的通道,

    通过 serverSocketChannel.accept()方法可以拿到 SocketChannel 通道用于传输数据。

  4. 每个 Processor 线程都有一个唯一的 id,并且通过 Acceptor 拿到的 SocketChannel 会被暂时放入到 newConnections 队列中

  5. 每个 Processor 都创建了自己的 Selector。

  6. Processor 会不断的从自身的 newConnections 队列里面获取新 SocketChannel,并注册读写事件,如果有数据传输过来,则会读取数据,并解析成 Request 请求。

  7. 既然两个都是可执行线程,那我们看看两个线程的 run 方法都做了哪些事情?

Acceptor.run

def run(): Unit = {    //将serverChannel 注册到nioSelector上,并且对 Accept事件感兴趣:表示服务器监听到了客户连接,那么服务器可以接收这个连接了    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)    try {      var currentProcessorIndex = 0      while (isRunning) {        try {          //返回感兴趣的事件数量  这里是感兴趣的是SelectionKey.OP_ACCEPT,监听到新的链接          val ready = nioSelector.select(500)          if (ready > 0) {            //获取所有就绪通道            val keys = nioSelector.selectedKeys()            val iter = keys.iterator()            //遍历所有就绪通道            while (iter.hasNext && isRunning) {              try {                val key = iter.next                iter.remove()                //只处理   Accept事件,其他的事件则抛出异常,ServerSocketChannel是 监听Tcp的链接通道                if (key.isAcceptable) {                  //根据Key 拿到SocketChannle = serverSocketChannel.accept(),然后再遍历                  accept(key).foreach { socketChannel =>                     //将socketChannel分配给我们的 processor来处理,如果有多个socketChannel 则按照轮训分配的原则                    //如果一个processor 中能够处理的newconnection 队列满了放不下了,则找下一个                    // 如果所有的都放不下,则会一直循环直到有processor能够处理。
var retriesLeft = synchronized(processors.length) var processor: Processor = null do { retriesLeft -= 1 //轮训每个processors来处理 processor = synchronized { // adjust the index (if necessary) and retrieve the processor atomically for // correct behaviour in case the number of processors is reduced dynamically currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) } } else throw new IllegalStateException("Unrecognized key state for acceptor thread.") } catch { case e: Throwable => error("Error while accepting connection", e) } } } } catch { 省略 } } } finally { 省略 } }
复制代码

1、将 ServerSocketChannel 通道注册到 nioSelector 上,并关注事件 SelectionKey.OP_ACCEPT

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
复制代码

2、while 循环,持续阻塞监听事件,超时时间 500ms

	// 阻塞查询Selector是否有监听到新的事件	val ready = nioSelector.select(500)	// 如果有事件,则查询具体的事件和通道	if(ready>0>{	 	//获取所有就绪事件准备处理        val keys = nioSelector.selectedKeys()	}
复制代码

3、遍历刚刚监听到的事件, 如果该 SelectionKey 不包含 OP_ACCEPT(建立连接)事件,则抛出异常,通常不会出现这个异常。

Unrecognized key state for acceptor thread
复制代码

4、如果 SelectionKey 包含 OP_ACCEPT(建立连接)事件,则可以通过这个 SelectionKey 拿到 serverSocketChannel,通过 serverSocketChannel 拿到 socketChannel,并且将 SocketChannel 设置为非阻塞模式。

val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] // 调用accept方法就可以拿到ScoketChannel了。  val socketChannel = serverSocketChannel.accept()   //设置为非阻塞模式 就可以在异步模式下调用connect(), read() 和write()了。  socketChannel.configureBlocking(false)
复制代码

5、接下来,把上面拿到的 SocketChannel 遍历的形式给 Acceptor 下面的 Procesor, 让 Processor 来执行后面的处理。分配的体现形式是, 将拿到的 SocketChannel 保存在 Processor 中的 newConnections 阻塞队列中,这个 newConnections 上限是 20,在代码里面写死了的,也就是说一个 Processor 同时最多只能处理 20 个连接,那么所有的 Processor 能处理的最大连接就是 Processor 数量 * 20;如果你的连接请求并发度很高,可以尝试调大 num.network.threads


6、最后,如果 newConnections 队列放入了一个新的 SocketChannel,则会调用一下对应 Processor 实例的 wakeup()方法。

Procesor.run

 override def run(): Unit = {    startupComplete()    try {      while (isRunning) {        try {          // setup any new connections that have been queued up          // 将之前监听到的TCP链接(暂时保存在newConnections中) 开始注册监听OP_READ事件到每个Processor的 KSelector选择器中。          configureNewConnections()          // register any new responses for writing          processNewResponses()          //在不阻塞的情况下对每个连接执行任何 I/O 操作。这包括完成连接、完成断开连接、启动新发送或在进行中的发送或接收上取得进展。          // 当此调用完成时,用户可以使用completedSends() 、 completedReceives() 、 connected() 、 disconnected()检查已完成的发送、接收、连接或断开连接。          poll()          // 把请求解析后放到 requestChannels 队列中,异步处理          processCompletedReceives()          //处理已经发送完成的请求          processCompletedSends()          processDisconnected()          closeExcessConnections()        } catch {          // We catch all the throwables here to prevent the processor thread from exiting. We do this because          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would          // be either associated with a specific socket channel or a bad request. These exceptions are caught and          // processed by the individual methods above which close the failing channel and continue processing other          // channels. So this catch block should only ever see ControlThrowables.          case e: Throwable => processException("Processor got uncaught exception.", e)        }      }    } finally {      debug(s"Closing selector - processor $id")      CoreUtils.swallow(closeAll(), this, Level.ERROR)      shutdownComplete()    }  }
复制代码
  1. configureNewConnections():之前 Acceptor 监听到的 SocketChannel 保存在 Procesor 中的 newConnections 阻塞队列中, 现在开始将 newConnections 阻塞队列一个个取出来,向 Procesor 的 Selector 注册 SocketChannel 通道,并且感兴趣的事件为 SelectionKey.OP_READ 读事件。

  2. processNewResponses() : 去 Processor 里面的无边界阻塞队列 responseQueue 里面获取 RequestChannel.Response 数据, 如果有数据并且需要返回 Response 的话,则通过 channel 返回数据. 具体的 Channel 是根据 connectionId 获取之前构建的 KafkaChannel, KafkaChannel 则会通过监听 SelectionKey.OP_WRITE。然后调用 write To 方法。

至于 responseQueue 这个队列是什么时候入队的,我们后面再分析

  1. poll(): 这个方法里面执行的就很多了, 这个方法底层调用的是 selector.poll(); 将监听到的事件批量处理,它才是执行 I/O 请求的最终地方, 它正对每个连接执行任何的 I/O 操作,这包括了 完成连接、完成断开连接、启动新发送等等。

像校验身份信息,还有 handshake 等等这些也都是在这里执行的。

  1. processCompletedReceives(): 处理所有 completedReceives(已完成接收的请求)进行接下来的处理, 处理的方式是解析一下收到的请求,最终调用了 requestChannel.sendRequest(req)。也就是说所有的请求最终通过解析放入到了 RequestChannel 中的 requestQueue 阻塞队列中, 这个阻塞队列的大小为 queued.max.requests 默认 500;表示的是在阻塞网络线程之前,数据平面允许的排队请求数

PS: 这个 completedReceives 是在 poll()方法中添加的元素。

  1. processCompletedSends(): 它负责处理 Response 的回调逻辑,通过遍历 completedSends(已完成发送)集合 可以从 inflightResponses 中移除并拿到 response 对象,然后再调用回调逻辑。

PS: 这个 completedSends 是在 poll()方法中添加的元素。

  1. processDisconnected(): 处理断开链接的情况, connectionQuotas 连接限流减掉这个链接,inflightResponses 也移除对应连接。

  2. closeExcessConnections(): 关闭超限连接 ,当总连接数 >max.connections && (inter.broker.listener.name!=listener|| listeners 数量==1) 则需要关闭一些连接。

简单来说就是:就算 Broker 已经达到了最大连接数的限制了,也应该允许 broker 之间监听器上的连接, 这种情况下,将会关闭另外一个监听器上最近最少使用的连接。broker 之间的监听器是配置 inter.broker.listener.name 决定的。

所谓优先关闭,是指在诸多 TCP 连接中找出最近未被使用的那个。这里“未被使用”就是说,在最近一段时间内,没有任何 Request 经由这个连接被发送到 Processor 线程。

RequestChannel

这个类保存这所有的 Processor,还有一个阻塞队列保存这待处理请求。这个队列最大长度由 queued.max.requests 控制,当待处理请求超过这个数值的时候网络就会阻塞

在这里插入图片描述

涉及到的 Broker 配置有:

KafkaApis

具体 Request 的处理类,所有的请求方法处理逻辑都放在这个里面。

KafkaRequestHandlerPool

KafkaRequestHandler 的线程池,KafkaRequestHandler 线程的数量由配置 num.io.threads 决定。

在这里插入图片描述

涉及到的 Broker 配置有:

KafkaRequestHandler

请求处理类, 每个 Handler 都会去 requestChannel 的 requestQueue 队列里面 poll 请求,然后去处理,最终调用的处理方法是 KafkaApis.handle()。

这几个类之间的关系如下

在这里插入图片描述

通信流程总结

在这里插入图片描述

  1. KafkaServer 启动的时候,会根据 listeners 的配置来初始化对应的实例。

  2. 一个 listeners 对应一个 Acceptor,一个 Acceptor 持有若干个(num.network.threads)Processor 实例。

  3. Acceptor 中的 nioSelector 注册的是 ServerSocketChannel 通道,并监听 OP_ACCEPT 事件,它只负责 TCP 创建和连接,不包含读写数据。

  4. 当 Acceptor 监听到新的连接之后,就会通过调用 socketChannel = serverSocketChannel.accept()拿到 SocketChannel,然后把 SocketChannel 保存在 Processor 里面的 newConnection 队列中。 那么具体保存在哪个 Processor 中呢?当然是轮询分配了,确保负载均衡嘛。当然每个 Processor 的 newConnection 队列最大只有 20,并且是代码写死的。如果一个 Processor 满了,,则会寻找下一个存放,如果所有的都满了,那么就会阻塞。一个 Acceptor 的所有 Processor 最大能够并发处理的请求是 20 * num.network.threads。

  5. Processor 会持续的从自己的 newConnection 中 poll 数据,拿到 SocketChannel 之后,就把它注册到自己的 Selector 中,并且监听事件 OP_READ。 如果 newConnection 是空的话,poll 的超时时间是 300ms。

  6. 监听到有新的事件,比较 READ,则会读取数据,并且解析成 Request,把 Request 放入到 RequestChannel 中的 requestQueue 阻塞队列中。所有的待处理请求都是临时放在这里面。这个队列也有最大值 queued.max.requests(默认 500),超过这个大小就会阻塞。

  7. KafkaRequestHandlerPool 中创建了很多(num.io.threads(默认 8))的 KafkaRequestHandler,用来处理 Request, 他们都会一直从 RequestChannel 中的 requestQueue 队列中 poll 新的 Request,来进行处理。

  8. 处理 Request 的具体逻辑是 KafkaApis 里面。当 Request 处理完毕之后,会调用 requestChannel.sendResponse()返回 Response。

  9. 当然,请求 Request 和返回 Response 必须是一一对应的,你这个请求是哪个 Processor 监听到的,则需要哪个 Processor 返回,他们通过 id 来标识。

  10. Response 也不是里面返回的,而是先放到 Processor 中的 ResponseQueue 队列中,然后慢慢返回给客户端。

数据面板(DataPlane)

数据面板是用来处理 Broker 与 Broker/Client 之间的网络模型模块, 与之相对的是控制器面板。

控制器面板 是专门用于 Controller 与 Broker 之间的网络通信模块。

其实本质上他们都是一模一样的,但是为了将 Controller 的通信和普通通信隔离,才有这么两个概念。

上面的网络通信模型就是以数据面板来分析的,因为本质是一样的,只是有一些配置不一样。

那么,数据面板就不详细讲了,我们主要讲下控制器面板的不一样的地方。

控制器面板(ControllerPlane)

控制器面板是用来专门处理 Controller 相关请求的独立通信模块。

大家都知道,Controller 是一个很重要的角色,基本上大部分协调整个集群的相关请求都跟它有关系, 比如创建 Topic、删除 Topic、分区副本重分配、等等。他们都很重要。

但是一般情况下数据面板的请求很多,如果因为请求过多而导致 Controller 相关请求被阻塞不能执行,那么可能会造成一些影响, 所以我们可以让 Controller 类的请求有一个单独的通信模块。

首先,要启用控制器面板,必须配置 control.plane.listener.name. 并且这个监听器名称必须在 listeners 里面有配置

否则的话,是不会专用的控制器链接的 EndPoint 的。

例如:

Broker 配置

## 所有的监听器isteners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094
## 监听器对应的安全协议listener.security.protocol.map = INTERNAL: PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL
## 控制器control.plane.listener.name = CONTROLLER
复制代码

在启动时,代理将开始使用安全协议“SSL”监听“192.1.1.8:9094”。

在控制器端,当它通过 zookeeper 发现代理发布的端点时,它将使用 control.plane.listener.name 找到端点,它将用于建立与代理的连接。

  1. 必须配置 control.plane.listener.name 才能使用独立的控制器面板

  2. 控制器面板的 RequestChannel 中的 requestQueue 不是由 queued.max.requests 控制的,而是写死的 20. 因为控制类请求不会有那么大的并发

  3. 跟 DataPlane 相关隔离,互不影响。但是连接限流 ConnectionQuotas 是共享的,限流的时候,两个是算在一起的

  4. 控制类面板只有一个 Acceptor 和一个 Processor,这个跟数据面板的区别是 DataPlane 的 Processor 可以有多个。

涉及到的 Broker 配置有:

上面我们主要分析了一下, Kafka 中的网络通信模型, 那么聪明的你应该肯定能够看的出来,它是使用线程模型中的 Reactor 模式来实现的。

线程模型: Reactor 模式

该模块详细请参考Reactor 模型

Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。

服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。

根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:

  1. 单 Reactor 单线程;

  2. 单 Reactor 多线程;

  3. 主从 Reactor 多线程。

我们主要了解一下 主从 Reactor 多线程

该图来源于网络

针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。

方案说明:

  • Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor 接收,处理建立连接事件;

  • Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理;

  • SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件;

  • 当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应;

  • Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;

  • Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理;

  • Handler 收到响应结果后通过 Send 将响应结果返回给 Client。

更详细的介绍可以看 Reactor 模型

问答

(1)Kafka 的网络模型使用了 Reactor 模式的哪种实现方式?

  1. 单 Reactor 单线程;

  2. 单 Reactor 多线程;

  3. 主从 Reactor 多线程。

答案: 3 。 使用了主从 Reactor 多线程的实现方式.

在这里插入图片描述

MainReactor(Acceptor)只负责监听 OP_ACCEPT 事件, 监听到之后把 SocketChannel 传递给 SubReactor(Processor), 每个 Processor 都有自己的 Selector。SubReactor 会监听并处理其他的事件,并最终把具体的请求传递给 KafkaRequestHandlerPool。

很典型的主从 Reactor 多线程模式。

(2)什么是 ControllerPlane(控制器面板),什么是 DataPlane(数据面板)?

控制器面板: 主要处理控制器类的的请求数据面板: 主要处理数据类的请求。

让他们隔离,互不影响,比如说普通的请求太多,导致了阻塞, 那么 Controller 相关的请求也可能被阻塞了,所以让他们隔离,不会互相影响。

但是默认情况下, ControllerPlane 是没有设置的,也就是 Controller 相关的请求还是走的 DataPlane。 想要隔离的话必须设置 control.plane.listener.name .

  1. 必须配置 control.plane.listener.name

  2. 控制器面板的 RequestChannel 中的 requestQueue 不是由 queued.max.requests 控制的,而是写死的 20. 因为控制类请求不会有那么大的并发

  3. 跟 DataPlane 相关隔离,互不影响。但是连接限流 ConnectionQuotas 是共享的,限流的时候,两个是算在一起的

  4. 控制类面板只有一个 Acceptor 和一个 Processor,这个跟数据面板的区别是 DataPlane 的 Processor 可以有多个。


点击关注,第一时间了解华为云新鲜技术~​

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
手绘模型图带你认识Kafka服务端网络模型_kafka_华为云开发者社区_InfoQ写作平台