Tars-Java 网络编程源码分析
作者:vivo 互联网服务器团队- Jin Kai
本文从 Java NIO 网络编程的基础知识讲到了 Tars 框架使用 NIO 进行网络编程的源码分析。
一、Tars 框架基本介绍
Tars 是腾讯开源的支持多语言的高性能 RPC 框架,起源于腾讯内部 2008 年至今一直使用的统一应用框架 TAF(Total Application Framework),目前支持 C++、Java、PHP、Nodejs、Go 语言。
该框架为用户提供了涉及到开发、运维、以及测试的一整套解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。它集可扩展协议编解码、高性能 RPC 通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体,通过它可以快速用微服务的方式构建自己的稳定可靠的分布式应用,并实现完整有效的服务治理。
官方仓库地址:
https://github.com/TarsCloud/Tars
vivo 推送平台也深度使用了该框架,部署服务节点超过一千个,经过线上每日一百多亿消息推送量的考验。
此前已在 vivo 互联网技术公众号发布过《Tars Java 客户端源码分析》此篇文章为续集。
Tars-java 最新稳定版 1.7.2 以及之前的版本都使用 Java NIO 进行网络编程;本文将分别详细介绍 java NIO 的原理和 Tars 使用 NIO 进行网络编程的细节。
二、Java NIO 原理介绍
从 1.4 版本开始,Java 提供了一种新的 IO 处理方式:NIO (New IO 或 Non-blocking IO) 是一个可以替代标准 Java IO 的 API,它是面向缓冲区而不是字节流,它是非阻塞的,支持 IO 多路复用。
2.1 Channels (通道) and Buffers (缓冲区)
标准的 IO 基于字节流进行操作的,而 NIO 是基于通道(Channel)和缓冲区(Buffer)进行操作。数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,下图是一个完整流程。
Channel 类型:
支持文件读写数据的 FileChannel
能通过 UDP 读写网络中的数据的 DatagramChannel
能通过 TCP 读写网络数据的 SocketChannel
可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel 的 ServerSocketChannel 。
SocketChannel:
打开 SocketChannel:SocketChannel socketChannel = SocketChannel.open();
关闭 SocketChannel:socketChannel.close();
从 Channel 中读取的数据放到 Buffer: int bytesRead = inChannel.read(buf);
将 Buffer 中的数据写到 Channel: int bytesWritten = inChannel.write(buf);
ServerSocketChannel:
通过 ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel,因此 accept()方法会一直阻塞到有新连接到达。
通常不会仅仅只监听一个连接,在 while 循环中调用 accept()方法. 如下面的例子:
代码 1:
ServerSocketChannel 可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是 null。因此,需要检查返回的 SocketChannel 是否是 null。
代码 2:
Buffer 类型:
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
Buffer 的分配:
ByteBuffer buf = ByteBuffer.allocate(2048);
Buffer 的读写:
一般是以下四个步骤:
写入数据到 Buffer,最大写入量是 capacity,写模式下 limit 值即为 capacity 值,position 即为写到的位置。
调用 flip()方法将 Buffer 从写模式切换到读模式,此时 position 移动到开始位置 0,limit 移动到 position 的位置。
从 Buffer 中读取数据,在读模式下可以读取之前写入到 buffer 的所有数据,即为 limit 位置。
调用 clear()方法或者 compact()方法。clear()方法将 position 设为 0,limit 被设置成 capacity 的值。compact()方法将所有未读的数据拷贝到 Buffer 起始处,然后将 position 设到最后一个未读元素后面。
mark() 与 reset()方法
通过调用 Buffer.mark()方法,可以标记 Buffer 中的一个特定 position,之后可以通过调用 Buffer.reset()方法恢复到这个 position。
duplicate()
此方法返回承载先前字节缓冲区内容的新字节缓冲区。
remaining()
limit 减去 position 的值
2.2 Selector(选择器)
Java NIO 引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程可以监听多个数据通道。要使用 Selector,得向 Selector 注册 Channel,然后调用它的 select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。
代码 3:
注意 register()方法的第二个参数,这是一个监听的集合,即在通过 Selector 监听 Channel 时关注什么事件集合。
SelectionKey 包含:
1) interest 集合:selectionKey.interestOps() 可以监听四种不同类型的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ
2) ready 集合:selectionKey.readyOps(); ready 集合是通道已经准备就绪的操作的集合,提供 4 个方便的方法:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
3) Channel:selectionKey.channel();
4) Selector:selectionKey.selector();
5) 可选的附加对象:
selectionKey.attachment(); 可以将一个对象或者更多信息附着到 SelectionKey 上,这样就能方便的识别特定的通道。
提示:
OP_ACCEPT 和 OP_CONNECT 的区别:简单来说,客户端建立连接是 connect,服务器准备接收连接是 accept。一个典型的客户端服务器网络交互流程如下图
selectedKeys()
一旦调用了 select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用 selector 的 selectedKeys()方法,访问已选择键集(selected key set)中的就绪通道。
wakeUp()
某个线程调用 select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从 select()方法返回。只要让其它线程在阻塞线程调用 select()方法的对象上调用 Selector.wakeup()方法即可。阻塞在 select()方法上的线程会立马返回。如果有其它线程调用了 wakeup()方法,但当前没有线程阻塞在 select()方法上,下个调用 select()方法的线程会立即 wake up。
close()
用完 Selector 后调用其 close()方法会关闭该 Selector,且使注册到该 Selector 上的所有 SelectionKey 实例无效。通道本身并不会关闭。
通过 Selector 选择通道:
int select() 阻塞直到至少有一个通道在你注册的事件上就绪了
int select(long timeout) 增加最长阻塞毫秒数
int selectNow() 不会阻塞,不管什么通道就绪都立刻返回
三、 Tars NIO 网络编程
了解完 Java NIO 的原理,我们来看看 Tars 是如何使用 NIO 进行网络编程的。
Tars 的网络模型是多 reactor 多线程模型。有一点特殊的是 tars 的 reactor 线程组里随机选一个线程处理网络事件,并且该线程同时也能处理读写。
核心类之间的关系如下:
3.1 一个典型的 Java NIO 服务端开发流程
创建 ServerSocketChannel,设置为非阻塞,并绑定端口
创建 Selector 对象
给 ServerSocketChannel 注册 SelectionKey.OP_ACCEPT 事件
启动一个线程循环,调用 Selector 的 select 方法来检查 IO 就绪事件,一旦有 IO 就绪事件,就通知用户线程去处理 IO 事件
如果有 Accept 事件,就创建一个 SocketChannel,并注册 SelectionKey.OP_READ
如果有读事件,判断一下是否全包,如果全包,就交给后端线程处理
写事件比较特殊。isWriteable 表示的是本机的写缓冲区是否可写。这个在绝大多少情况下都是为真的。在 Netty 中只有写半包的时候才需要注册写事件,如果一次写就完全把数据写入了缓冲区就不需要注册写事件。
3.2 Tars 客户端发起请求到服务器的流程
Communicator.stringToProxy() 根据 servantName 等配置信息创建通信器。
ServantProxyFactory.getServantProxy() 调用工厂方法创建 servant 代理。
ObjectProxyFactory.getObjectProxy() 调用工厂方法创建 obj 代理。
TarsProtocolInvoker.create() 创建协议调用者。
ServantProtocolInvoker.initClient(Url url) 根据 servantProxyConfig 中的配置信息找到 servant 的 ip 端口等进行初始化 ServantClient。
ClientPoolManager.getSelectorManager() 如果第一次调用 selectorManager 是空的就会去初始化 selectorManager。
reactorSet = new Reactor[selectorPoolSize]; SelectorManager 初始化构造类中的会根据 selectorPoolSize(默认是 2)的配置创建 Reactor 线程数组。线程名称的前缀是 servant-proxy-加上 CommunicatorId,CommunicatorId 生成规则是由 locator 的地址生成的 UUID。
启动 reactor 线程。
3.3 Tars 服务端启动步骤
tars 支持 TCP 和 UDP 两种协议,RPC 场景下是使用 TCP 协议。
new SelectorManager() 根据配置信息初始化 selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是 server-tcp-reactor,然后启动 reactor 线程数组中的所有线程。
开启服务端监听的 ServerSocketChannel,绑定服务端本地 ip 和监听的端口号,设置 TCP 连接请求队列的最大容量为 1024;设置非阻塞模式。
选取 reactor 线程数组中第 0 个线程作为服务端监听连接 OP_ACCEPT 就绪事件的线程。
代码 4:
3.4 Reactor 线程启动流程
多路复用器开始轮询检查 是否有就绪的事件。
处理 register 队列中剩余的 channel 注册到当前 reactor 线程的多路复用器 selector 中。
获取已选键集中所有就绪的 channel。
更新 Session 中最近操作时间,Tars 服务端启动时会调用 startSessionManager() , 单线程每 30s 扫描一次 session 会话列表,会检查每个 session 的 lastUpdateOperationTime 与当前时间的时间差,如果超过 60 秒会将过期 session 对应的 channel 踢除。
分发 IO 事件进行处理。
处理 unregister 队列中剩余的 channel,从当前 reactor 线程的多路复用器 selector 中解除注册。
代码 5:
3.5 IO 事件分发处理
每个 reactor 线程都有一个专门的 Accepter 类去处理各种 IO 事件。TCPAccepter 可以处理全部的四种事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter 由于不需要建立连接所以只需要处理读和写两种事件。
1. 处理 OP_ACCEPT
获取 channel,处理 TCP 请求。
为这个 TCP 请求创建 TCPSession,会话的状态是服务器已连接
会话注册到 sessionManager 中,Tars 服务可配置最大连接数 maxconns,如果超过就会关闭当前会话。
寻找下一个 reactor 线程进行多路复用器与 channel 的绑定。
代码 6:
2. 处理 OP_CONNECT
获取客户端连接过来的 channel 通道
获取 Session
与服务器建立连接,将关注的兴趣 OPS 设置为 ready 就绪事件,session 中的状态修改为客户端已连接
处理 OP_CONNECT
代码 7:
3.处理 OP_WRITE、 处理 OP_READ
代码 8:
3.6 seesion 中网络读写的事件详细处理过程
1. 读事件处理
申请 2k 的 ByteBuffer 空间,读取 channel 中的数据到 readBuffer 中。根据 sessionStatus 判断是客户端读响应还是服务器读请求,分别进行处理。
代码 9:
① 客户端读响应
从当前 readBuffer 中的内容复制到一个新的临时 buffer 中,并且切换到读模式,使用 TarsCodec 类解析出 buffer 内的协议字段到 response,WorkThread 线程通知 Ticket 处理 response。如果 response 为空,则重置 tempBuffer 到 mark 的位置,重新解析协议。
代码 10:
② 服务器读请求
任务放入线程池交给 WorkThread 线程,最终交给 Processor 类出构建请求的响应体,包括分布式上下文,然后经过 FilterChain 的处理,最终通过 jdk 提供的反射方法 invoke 服务端本地的方法然后返回 response。如果线程池抛出拒绝异常,则返回 SERVEROVERLOAD = -9,服务端过载保护。如果 request 为空,则重置 tempBuffer 到 mark 的位置,重新解析协议。
代码 11:
2. 写事件处理
同样也包括客户端写请求和服务端写响应两种,其实这两种都是往 TCPSession 中的 LinkedBlockingQueue(有界队列最大 8K)中插入 ByteBuffer。LinkedBlockingQueue 中的 ByteBuffer 最终会由 TCPAcceptor 中的 handleWriteEvent 监听写就绪事件并消费。
代码 12:
四、总结
本文主要介绍了 Java NIO 编程的基础知识 和 Tars-Java 1.7.2 版本的网络编程模块的源码实现。
在最新的 Tars-Java 的 master 分支中我们可以发现网络编程已经由 NIO 改成了 Netty,虽然 Netty 更加成熟稳定,但是作为学习者了解 NIO 的原理也是掌握网络编程的必经之路。
更多关于 Tars 框架的介绍可以访问:
本文分析源码地址(v1.7.x 分支):
https://github.com/TarsCloud/TarsJava
版权声明: 本文为 InfoQ 作者【vivo互联网技术】的原创文章。
原文链接:【http://xie.infoq.cn/article/3718db815b0e64c418ba95e27】。文章转载请联系作者。
评论