图解 Kafka 的服务端的网络通信模型
作者:石臻臻,CSDN 博客之星 Top5、Kafka Contributor、nacos Contributor、华为云 MVP,腾讯云 TVP,滴滴 Kafka 技术专家、 KnowStreaming。
KnowStreaming 是滴滴开源的Kafka运维管控平台, 有兴趣一起参与参与开发的同学,但是怕自己能力不够的同学,可以联系我,当你导师带你参与开源! 。
Kafka 网络模型使用的是什么线程模型?
什么是 ControllerPlane(控制器面板),什么是 DataPlane(数据面板)?
Kafka 整个请求流程是什么样子的
与 Kafka 网络通信相关的配置。
为更好的阅读体验,和及时的勘误请访问原文链接:图解Kafka服务端网络通信模型
1Kafka 的网络模型
Kafka 中的网络模型就是基于 主从 Reactor 多线程进行设计的, 在整体讲述 Kafka 网络模型之前,我们现在按照源码中的相关类来讲解一下他们分别都是用来做什么的.
关键类解析
SocketServer
这个类是网络通信的核心类,它持有这 Acceptor 和 Processor 对象。
ConnectionQuotas
这个是控制连接数配额的类,
涉及到的 Broker 配置有:
AbstractServerThread
AbstractServerThread 类:这是 Acceptor 线程和 Processor 线程的抽象基类,它定义了一个抽象方法wakeup()
,主要是用来唤醒 Acceptor 线程和 Processor 对应的Selector
的, 当然还有一些共用方法
Acceptor 和 Processor
Acceptor 线程类:继承自 AbstractServerThread, 这是接收和创建外部 TCP 连接的线程。每个 SocketServer 实例一般会创建一个 Acceptor 线程(如果listeners
配置了多个就会创建多个 Acceptor)。它的唯一目的就是创建连接,并将接收到的 SocketChannel(SocketChannel 通道用于传输数据) 传递给下游的 Processor 线程处理,Processor 主要是处理连接之后的事情,例如读写 I/O。
涉及到的 Broker 配置有:
Processor 线程类:这是处理单个 TCP 连接上所有请求的处理线程。每个 Acceptor 实例创建若干个(num.network.threads
)Processor 线程。Processor 线程负责将接收到的 SocketChannel(SocketChannel 通道用于传输数据。), 注册读写事件,当数据传送过来的时候,会立即读取 Request 数据,通过解析之后, 然后将其添加到 RequestChannel 的 requestQueue
队列上,同时还负责将 Response 返还给 Request 发送方。
涉及到的 Broker 配置有:
简单画了一张两个类之间的关系图
在这里插入图片描述
这两个类都是 AbstractServerThead 的实现类,超类是
Runnable
可运行的。每个 Acceptor 持有
num.network.threads
个 Processor 线程, 假如配置了多个listeners
,那么总共 Processor 线程数是listeners
*num.network.threads
.Acceptor 创建的是 ServerSocketChannel 通道,这个通道是用来监听新进来的 TCP 链接的通道,通过
serverSocketChannel.accept()
方法可以拿到 SocketChannel 通道用于传输数据。每个 Processor 线程都有一个唯一的 id,并且通过 Acceptor 拿到的 SocketChannel 会被暂时放入到
newConnections
队列中每个 Processor 都创建了自己的 Selector
Processor 会不断的从自身的
newConnections
队列里面获取新 SocketChannel,并注册读写事件,如果有数据传输过来,则会读取数据,并解析成 Request 请求。
既然两个都是可执行线程,那我们看看两个线程的run
方法都做了哪些事情
Acceptor.run
将 ServerSocketChannel 通道注册到 nioSelector 上,并关注事件 SelectionKey.OP_ACCEPT
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
while 循环,持续阻塞监听事件,超时时间 500ms
// 阻塞查询Selector是否有监听到新的事件 val ready = nioSelector.select(500) // 如果有事件,则查询具体的事件和通道 if(ready>0>{ //获取所有就绪事件准备处理 val keys = nioSelector.selectedKeys() }
遍历刚刚监听到的事件, 如果该 SelectionKey 不包含
OP_ACCEPT
(建立连接)事件,则抛出异常,通常不会出现这个异常。Unrecognized key state for acceptor thread
如果 SelectionKey 包含
OP_ACCEPT
(建立连接)事件,则可以通过这个 SelectionKey 拿到 serverSocketChannel,通过 serverSocketChannel 拿到 socketChannel,并且将 SocketChannel 设置为非阻塞模式。val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] // 调用accept方法就可以拿到ScoketChannel了。 val socketChannel = serverSocketChannel.accept() //设置为非阻塞模式 就可以在异步模式下调用connect(), read() 和write()了。 socketChannel.configureBlocking(false)
接下来,把上面拿到的 SocketChannel 以遍历的形式给 Acceptor 下面的 Procesor, 让 Processor 来执行后面的处理。分配的体现形式是, 将拿到的 SocketChannel 保存在 Processor 中的
newConnections
阻塞队列中,这个newConnections
上限是 20,在代码里面写死了的,也就是说一个 Processor 同时最多只能处理 20 个连接, 那么所有的 Processor 能处理的最大连接就是 Processor 数量 * 20;如果你的连接请求并发度很高,可以尝试调大num.network.threads
最后,如果
newConnections
队列放入了一个新的 SocketChannel,则会调用一下对应 Processor 实例的wakeup()
方法。
Procesor.run
configureNewConnections()
: 之前 Acceptor 监听到的 SocketChannel 保存在 Procesor 中的newConnections
阻塞队列中, 现在开始将newConnections
阻塞队列一个个取出来,向 Procesor 的 Selector 注册 SocketChannel 通道,并且感兴趣的事件为SelectionKey.OP_READ
读事件。processNewResponses()
: 去 Processor 里面的无边界阻塞队列responseQueue
里面获取 RequestChannel.Response 数据, 如果有数据并且需要返回 Response 的话, 则通过 channel 返回数据. 具体的 Channel 是根据 connectionId 获取之前构建的 KafkaChannel, KafkaChannel 则会通过监听 SelectionKey.OP_WRITE。然后调用writeTo
方法。至于responseQueue
这个队列是什么时候入队的,我们后面再分析poll()
: 这个方法里面执行的就很多了, 这个方法底层调用的是selector.poll()
; 将监听到的事件批量处理,它才是执行 I/O 请求的最终地方, 它正对每个连接执行任何的 I/O 操作,这包括了 完成连接、完成断开连接、启动新发送等等。像校验身份信息,还有 handshake 等等这些也都是在这里执行的。processCompletedReceives()
: 处理所有 completedReceives(已完成接收的请求)进行接下来的处理, 处理的方式是解析一下收到的请求,最终调用了requestChannel.sendRequest(req)
. 也就是说所有的请求最终通过解析放入到了 RequestChannel 中的requestQueue
阻塞队列中, 这个阻塞队列的大小为queued.max.requests
默认 500;表示的是在阻塞网络线程之前,数据平面允许的排队请求数 PS: 这个completedReceives
是在poll()
方法中添加的元素。processCompletedSends():
它负责处理 Response 的回调逻辑,通过遍历completedSends
(已完成发送)集合 可以从inflightResponses
中移除并拿到 response 对象,然后再调用回调逻辑。PS: 这个completedSends
是在poll()
方法中添加的元素。processDisconnected():
处理断开链接的情况, connectionQuotas 连接限流减掉这个链接,inflightResponses 也移除对应连接。closeExcessConnections():
关闭超限连接 ,当总连接数 >max.connections
&& (inter.broker.listener.name!=listener|| listeners 数量==1) 则需要关闭一些连接.简单来说就是:就算 Broker 已经达到了最大连接数的限制了, 也应该允许 broker 之间监听器上的连接, 这种情况下,将会关闭另外一个监听器上最近最少使用的连接。broker 之间的监听器是配置inter.broker.listener.name
决定的所谓优先关闭,是指在诸多 TCP 连接中找出最近未被使用的那个。这里“未被使用”就是说,在最近一段时间内,没有任何 Request 经由这个连接被发送到 Processor 线程。
RequestChannel
这个类保存这所有的 Processor,还有一个阻塞队列保存这待处理请求。这个队列最大长度由queued.max.requests
控制,当待处理请求超过这个数值的时候网络就会阻塞
涉及到的 Broker 配置有:
KafkaApis
具体 Request 的处理类, 所有的请求方法处理逻辑都放在这个里面。
KafkaRequestHandlerPool
KafkaRequestHandler 的线程池,KafkaRequestHandler 线程的数量由配置num.io.threads
决定。
在这里插入图片描述
涉及到的 Broker 配置有:
KafkaRequestHandler
请求处理类, 每个 Handler 都会去 requestChannel 的 requestQueue 队列里面 poll 请求, 然后去处理,最终调用的处理方法是 KafkaApis.handle()
这几个类之间的关系如下
在这里插入图片描述
通信流程总结
在这里插入图片描述
KafkaServer 启动的时候,会根据
listeners
的配置来初始化对应的实例。一个
listeners
对应一个 Acceptor,一个 Acceptor 持有若干个(num.network.threads
)Processor 实例。Acceptor 中的 nioSelector 注册的是 ServerSocketChannel 通道,并监听 OP_ACCEPT 事件,它只负责 TCP 创建和连接,不包含读写数据。
当 Acceptor 监听到新的连接之后,就会通过调用
socketChannel = serverSocketChannel.accept()
拿到 SocketChannel,然后把 SocketChannel 保存在 Processor 里面的newConnection
队列中。 那么具体保存在哪个 Processor 中呢?当然是轮询分配了,确保负载均衡嘛。当然每个 Processor 的newConnection
队列最大只有 20,并且是代码写死的。如果一个 Processor 满了,则会寻找下一个存放,如果所有的都满了,那么就会阻塞。一个 Acceptor 的所有 Processor 最大能够并发处理的请求是20 * num.network.threads
。Processor 会持续的从自己的
newConnection
中 poll 数据,拿到 SocketChannel 之后,就把它注册到自己的 Selector 中,并且监听事件 OP_READ。 如果newConnection
是空的话,poll 的超时时间是 300ms。监听到有新的事件,比较 READ,则会读取数据,并且解析成 Request, 把 Request 放入到 RequestChannel 中的
requestQueue
阻塞队列中。所有的待处理请求都是临时放在这里面。这个队列也有最大值queued.max.requests(默认500)
,超过这个大小就会阻塞。KafkaRequestHandlerPool 中创建了很多(
num.io.threads(默认8)
)的 KafkaRequestHandler,用来处理 Request, 他们都会一直从 RequestChannel 中的requestQueue
队列中 poll 新的 Request,来进行处理。处理 Request 的具体逻辑是 KafkaApis 里面。当 Request 处理完毕之后,会调用 requestChannel.sendResponse()返回 Response。
当然,请求 Request 和返回 Response 必须是一一对应的, 你这个请求是哪个 Processor 监听到的,则需要哪个 Processor 返回, 他们通过 id 来标识。
Response 也不是里面返回的,而是先放到 Processor 中的 ResponseQueue 队列中,然后慢慢返回给客户端。
数据面板(DataPlane)
数据面板是用来处理 Broker 与 Broker/Client 之间的网络模型模块, 与之相对的是控制器面板
控制器面板 是专门用于 Controller 与 Broker 之间的网络通信模块。
其实本质上他们都是一模一样的, 但是为了将 Controller 的通信和普通通信隔离,才有这么两个概念。
上面的网络通信模型就是以数据面板来分析的,因为本质是一样的, 只是有一些配置不一样。
那么.数据面板 就不详细讲了,我们主要讲下控制器面板的不一样的地方
控制器面板(ControllerPlane)
控制器面板是用来专门处理 Controller 相关请求的独立通信模块。
大家都知道,Controller 是一个很重要的角色,基本上大部分协调整个集群的相关请求都跟它有关系, 比如创建 Topic、删除 Topic、分区副本重分配、等等。他们都很重要
但是一般情况下数据面板的请求很多,如果因为请求过多而导致 Controller 相关请求被阻塞不能执行,那么可能会造成一些影响, 所以我们可以让 Controller 类的请求有一个单独的通信模块。
首先,要启用控制器面板,必须配置control.plane.listener.name
. 并且这个监听器名称必须在listeners
里面有配置
否则的话,是不会专用的控制器链接的 EndPoint 的。
例如:Broker 配置
在启动时,代理将开始使用安全协议“SSL”监听“192.1.1.8:9094”。在控制器端,当它通过 zookeeper 发现代理发布的端点时,它将使用 control.plane.listener.name 找到端点,它将用于建立与代理的连接。
必须配置
control.plane.listener.name
才能使用独立的控制器面板控制器面板的 RequestChannel 中的
requestQueue
不是由queued.max.requests
控制的,而是写死的 20. 因为控制类请求不会有那么大的并发跟 DataPlane 相关隔离,互不影响。但是连接限流 ConnectionQuotas 是共享的,限流的时候,两个是算在一起的
控制类面板只有一个 Acceptor 和一个 Processor,这个跟数据面板的区别是 DataPlane 的 Processor 可以有多个。
涉及到的 Broker 配置有:
上面我们主要分析了一下, Kafka 中的网络通信模型, 那么聪明的你应该肯定能够看的出来,它是使用线程模型中的 Reactor 模式来实现的。
2 线程模型: Reactor 模式
该模块详细请参考Reactor 模型
Reactor 模式,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫 Dispatcher 模式。即 I/O 多路复用统一监听事件,收到事件后分发(Dispatch 给某进程),是编写高性能网络服务器的必备技术之一。
根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:
单 Reactor 单线程;
单 Reactor 多线程;
主从 Reactor 多线程。
我们主要了解一下 主从 Reactor 多线程
针对单 Reactor 多线程模型中,Reactor 在单线程中运行,高并发场景下容易成为性能瓶颈,可以让 Reactor 在多线程中运行。
方案说明:
Reactor 主线程 MainReactor 对象通过 Select 监控建立连接事件,收到事件后通过 Acceptor 接收,处理建立连接事件;
Acceptor 处理建立连接事件后,MainReactor 将连接分配 Reactor 子线程给 SubReactor 进行处理;
SubReactor 将连接加入连接队列进行监听,并创建一个 Handler 用于处理各种连接事件;
当有新的事件发生时,SubReactor 会调用连接对应的 Handler 进行响应;
Handler 通过 Read 读取数据后,会分发给后面的 Worker 线程池进行业务处理;
Worker 线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给 Handler 进行处理;
Handler 收到响应结果后通过 Send 将响应结果返回给 Client。
更详细的介绍可以看 Reactor 模型
3 问答
Kafka 的网络模型使用了 Reactor 模式的哪种实现方式?
单 Reactor 单线程;
单 Reactor 多线程;
主从 Reactor 多线程。
答案: 3 。 使用了主从 Reactor 多线程的实现方式.
在这里插入图片描述
MainReactor(Acceptor)只负责监听 OP_ACCEPT 事件, 监听到之后把 SocketChannel 传递给 SubReactor(Processor), 每个 Processor 都有自己的 Selector。SubReactor 会监听并处理其他的事件,并最终把具体的请求传递给 KafkaRequestHandlerPool。
很典型的主从 Reactor 多线程模式。
什么是 ControllerPlane(控制器面板),什么是 DataPlane(数据面板)?
控制器面板: 主要处理控制器类的的请求数据面板: 主要处理数据类的请求。
让他们隔离,互不影响,比如说普通的请求太多,导致了阻塞, 那么 Controller 相关的请求也可能被阻塞了,所以让他们隔离,不会互相影响。
但是默认情况下, ControllerPlane 是没有设置的,也就是 Controller 相关的请求还是走的 DataPlane。 想要隔离的话必须设置control.plane.listener.name
.
必须配置
control.plane.listener.name
控制器面板的 RequestChannel 中的
requestQueue
不是由queued.max.requests
控制的,而是写死的 20. 因为控制类请求不会有那么大的并发跟 DataPlane 相关隔离,互不影响。但是连接限流 ConnectionQuotas 是共享的,限流的时候,两个是算在一起的
控制类面板只有一个 Acceptor 和一个 Processor,这个跟数据面板的区别是 DataPlane 的 Processor 可以有多个。
Kafka 整个请求流程是什么样子的
请看上面网络通信总结部分。
版权声明: 本文为 InfoQ 作者【石臻臻的杂货铺】的原创文章。
原文链接:【http://xie.infoq.cn/article/384ec76a594d1c4a5659d930f】。未经作者许可,禁止转载。
评论