写点什么

Tars-Java 网络编程源码分析

  • 2023-03-20
    广东
  • 本文字数:8723 字

    阅读完需:约 29 分钟

作者:vivo 互联网服务器团队- Jin Kai


本文从 Java NIO 网络编程的基础知识讲到了 Tars 框架使用 NIO 进行网络编程的源码分析。

一、Tars 框架基本介绍

Tars 是腾讯开源的支持多语言的高性能 RPC 框架,起源于腾讯内部 2008 年至今一直使用的统一应用框架 TAF(Total Application Framework),目前支持 C++、Java、PHP、Nodejs、Go 语言。


该框架为用户提供了涉及到开发、运维、以及测试的一整套解决方案,帮助一个产品或者服务快速开发、部署、测试、上线。它集可扩展协议编解码、高性能 RPC 通信框架、名字路由与发现、发布监控、日志统计、配置管理等于一体,通过它可以快速用微服务的方式构建自己的稳定可靠的分布式应用,并实现完整有效的服务治理。


官方仓库地址:

https://github.com/TarsCloud/Tars


vivo 推送平台也深度使用了该框架,部署服务节点超过一千个,经过线上每日一百多亿消息推送量的考验。


此前已在 vivo 互联网技术公众号发布过《Tars Java 客户端源码分析》此篇文章为续集。


Tars-java 最新稳定版 1.7.2 以及之前的版本都使用 Java NIO 进行网络编程;本文将分别详细介绍 java NIO 的原理和 Tars 使用 NIO 进行网络编程的细节。

二、Java NIO 原理介绍

从 1.4 版本开始,Java 提供了一种新的 IO 处理方式:NIO (New IO 或 Non-blocking IO) 是一个可以替代标准 Java IO 的 API,它是面向缓冲区而不是字节流,它是非阻塞的,支持 IO 多路复用。

2.1 Channels (通道) and Buffers (缓冲区)

标准的 IO 基于字节流进行操作的,而 NIO 是基于通道(Channel)和缓冲区(Buffer)进行操作。数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,下图是一个完整流程。

Channel 类型:

  1. 支持文件读写数据的 FileChannel

  2. 能通过 UDP 读写网络中的数据的 DatagramChannel 

  3. 能通过 TCP 读写网络数据的 SocketChannel

  4. 可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel 的 ServerSocketChannel 。


SocketChannel:

  • 打开 SocketChannel:SocketChannel socketChannel = SocketChannel.open();

  • 关闭 SocketChannel:socketChannel.close();

  • 从 Channel 中读取的数据放到 Buffer: int bytesRead = inChannel.read(buf);

  • 将 Buffer 中的数据写到 Channel: int bytesWritten = inChannel.write(buf);


ServerSocketChannel:


通过 ServerSocketChannel.accept() 方法监听新进来的连接,当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel,因此 accept()方法会一直阻塞到有新连接到达。


通常不会仅仅只监听一个连接,在 while 循环中调用 accept()方法. 如下面的例子:


代码 1:

while(true){    SocketChannel socketChannel = serverSocketChannel.accept();     //do something with socketChannel...}
复制代码


ServerSocketChannel 可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是 null。因此,需要检查返回的 SocketChannel 是否是 null。


代码 2:

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.socket().bind(new InetSocketAddress(8888));serverSocketChannel.configureBlocking(false);while(true){    SocketChannel socketChannel = serverSocketChannel.accept();    if(socketChannel != null){        //do something with socketChannel...    }}
复制代码


Buffer 类型:

  • ByteBuffer

  • CharBuffer

  • DoubleBuffer

  • FloatBuffer

  • IntBuffer

  • LongBuffer

  • ShortBuffer


Buffer 的分配:

ByteBuffer buf = ByteBuffer.allocate(2048);


Buffer 的读写:

一般是以下四个步骤:

  1. 写入数据到 Buffer,最大写入量是 capacity,写模式下 limit 值即为 capacity 值,position 即为写到的位置。

  2. 调用 flip()方法将 Buffer 从写模式切换到读模式,此时 position 移动到开始位置 0,limit 移动到 position 的位置。

  3. 从 Buffer 中读取数据,在读模式下可以读取之前写入到 buffer 的所有数据,即为 limit 位置。

  4. 调用 clear()方法或者 compact()方法。clear()方法将 position 设为 0,limit 被设置成 capacity 的值。compact()方法将所有未读的数据拷贝到 Buffer 起始处,然后将 position 设到最后一个未读元素后面。



mark() 与 reset()方法

通过调用 Buffer.mark()方法,可以标记 Buffer 中的一个特定 position,之后可以通过调用 Buffer.reset()方法恢复到这个 position。


duplicate()

此方法返回承载先前字节缓冲区内容的新字节缓冲区。


remaining()

limit 减去 position 的值

2.2 Selector(选择器)


Java NIO 引入了选择器的概念,选择器用于监听多个通道的事件。单个的线程可以监听多个数据通道。要使用 Selector,得向 Selector 注册 Channel,然后调用它的 select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件。


代码 3:

channel.configureBlocking(false);SelectionKey key = channel.register(selector,Selectionkey.OP_READ);
复制代码


注意 register()方法的第二个参数,这是一个监听的集合,即在通过 Selector 监听 Channel 时关注什么事件集合。


SelectionKey 包含:


1) interest 集合:selectionKey.interestOps()  可以监听四种不同类型的事件:OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ

2) ready 集合:selectionKey.readyOps();  ready 集合是通道已经准备就绪的操作的集合,提供 4 个方便的方法:

  • selectionKey.isAcceptable();

  • selectionKey.isConnectable();

  • selectionKey.isReadable();

  • selectionKey.isWritable();

3) Channel:selectionKey.channel();

4) Selector:selectionKey.selector();

5) 可选的附加对象:

selectionKey.attachment();  可以将一个对象或者更多信息附着到 SelectionKey 上,这样就能方便的识别特定的通道。


提示:

OP_ACCEPT 和 OP_CONNECT 的区别:简单来说,客户端建立连接是 connect,服务器准备接收连接是 accept。一个典型的客户端服务器网络交互流程如下图


selectedKeys() 

一旦调用了 select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用 selector 的 selectedKeys()方法,访问已选择键集(selected key set)中的就绪通道。


wakeUp()

某个线程调用 select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从 select()方法返回。只要让其它线程在阻塞线程调用 select()方法的对象上调用 Selector.wakeup()方法即可。阻塞在 select()方法上的线程会立马返回。如果有其它线程调用了 wakeup()方法,但当前没有线程阻塞在 select()方法上,下个调用 select()方法的线程会立即 wake up。


close()

用完 Selector 后调用其 close()方法会关闭该 Selector,且使注册到该 Selector 上的所有 SelectionKey 实例无效。通道本身并不会关闭。


通过 Selector 选择通道:

  • int select() 阻塞直到至少有一个通道在你注册的事件上就绪了

  • int select(long timeout) 增加最长阻塞毫秒数

  • int selectNow() 不会阻塞,不管什么通道就绪都立刻返回

三、 Tars NIO 网络编程

了解完 Java NIO 的原理,我们来看看 Tars 是如何使用 NIO 进行网络编程的。

Tars 的网络模型是多 reactor 多线程模型。有一点特殊的是 tars 的 reactor 线程组里随机选一个线程处理网络事件,并且该线程同时也能处理读写。


核心类之间的关系如下:

3.1 一个典型的 Java NIO 服务端开发流程

  1. 创建 ServerSocketChannel,设置为非阻塞,并绑定端口

  2. 创建 Selector 对象

  3. 给 ServerSocketChannel 注册 SelectionKey.OP_ACCEPT 事件

  4. 启动一个线程循环,调用 Selector 的 select 方法来检查 IO 就绪事件,一旦有 IO 就绪事件,就通知用户线程去处理 IO 事件

  5. 如果有 Accept 事件,就创建一个 SocketChannel,并注册 SelectionKey.OP_READ

  6. 如果有读事件,判断一下是否全包,如果全包,就交给后端线程处理

  7. 写事件比较特殊。isWriteable 表示的是本机的写缓冲区是否可写。这个在绝大多少情况下都是为真的。在 Netty 中只有写半包的时候才需要注册写事件,如果一次写就完全把数据写入了缓冲区就不需要注册写事件。

3.2 Tars 客户端发起请求到服务器的流程

  1. Communicator.stringToProxy()  根据 servantName 等配置信息创建通信器。

  2. ServantProxyFactory.getServantProxy() 调用工厂方法创建 servant 代理。

  3.  ObjectProxyFactory.getObjectProxy()  调用工厂方法创建 obj 代理。

  4.  TarsProtocolInvoker.create() 创建协议调用者。

  5. ServantProtocolInvoker.initClient(Url url)  根据 servantProxyConfig 中的配置信息找到 servant 的 ip 端口等进行初始化 ServantClient。

  6. ClientPoolManager.getSelectorManager() 如果第一次调用 selectorManager 是空的就会去初始化 selectorManager。

  7.  reactorSet = new Reactor[selectorPoolSize];     SelectorManager 初始化构造类中的会根据 selectorPoolSize(默认是 2)的配置创建 Reactor 线程数组。线程名称的前缀是 servant-proxy-加上 CommunicatorId,CommunicatorId 生成规则是由 locator 的地址生成的 UUID。

  8. 启动 reactor 线程。

3.3 Tars 服务端启动步骤

  1. tars 支持 TCP 和 UDP 两种协议,RPC 场景下是使用 TCP 协议。

  2. new SelectorManager() 根据配置信息初始化 selectorManager,线程池大小 processors > 8 ? 4 + (processors * 5 / 8) : processors + 1;线程名称前缀是 server-tcp-reactor,然后启动 reactor 线程数组中的所有线程。

  3. 开启服务端监听的 ServerSocketChannel,绑定服务端本地 ip 和监听的端口号,设置 TCP 连接请求队列的最大容量为 1024;设置非阻塞模式。

  4.  选取 reactor 线程数组中第 0 个线程作为服务端监听连接 OP_ACCEPT 就绪事件的线程。


代码 4:

public void bind(AppService appService) throws IOException {     // 此处略去非关键代码     if (endpoint.type().equals("tcp")) {  // 1        this.selectorManager = new SelectorManager(Utils.getSelectorPoolSize(), new ServantProtocolFactory(codec), threadPool, processor, keepAlive, "server-tcp-reactor", false);     // 2        this.selectorManager.setTcpNoDelay(serverCfg.isTcpNoDelay());        this.selectorManager.start();        ServerSocketChannel serverChannel = ServerSocketChannel.open();        serverChannel.socket().bind(new InetSocketAddress(endpoint.host(), endpoint.port()), 1024);   // 3        serverChannel.configureBlocking(false);              selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_ACCEPT);  // 4    } else if (endpoint.type().equals("udp")) {        this.selectorManager = new SelectorManager(1, new ServantProtocolFactory(codec), threadPool, processor, false, "server-udp-reactor", true);        this.selectorManager.start();        // UDP开启的是DatagramChannel        DatagramChannel serverChannel = DatagramChannel.open();        DatagramSocket socket = serverChannel.socket();        socket.bind(new InetSocketAddress(endpoint.host(), endpoint.port()));        serverChannel.configureBlocking(false);        // UDP协议不需要建连,监听的是OP_READ就绪事件        this.selectorManager.getReactor(0).registerChannel(serverChannel, SelectionKey.OP_READ);    }}
复制代码

3.4 Reactor 线程启动流程

  1. 多路复用器开始轮询检查 是否有就绪的事件。

  2. 处理 register 队列中剩余的 channel 注册到当前 reactor 线程的多路复用器 selector 中。

  3. 获取已选键集中所有就绪的 channel。

  4.  更新 Session 中最近操作时间,Tars 服务端启动时会调用 startSessionManager() , 单线程每 30s 扫描一次 session 会话列表,会检查每个 session 的 lastUpdateOperationTime 与当前时间的时间差,如果超过 60 秒会将过期 session 对应的 channel 踢除。

  5.  分发 IO 事件进行处理。

  6.  处理 unregister 队列中剩余的 channel,从当前 reactor 线程的多路复用器 selector 中解除注册。


代码 5:

public void run() {        while (!Thread.interrupted()) {            selector.select();  // 1            processRegister();  // 2            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();   //  3            while (iter.hasNext()) {                SelectionKey key = iter.next();                iter.remove();                if (!key.isValid()) continue;                try {                    if (key.attachment() != null && key.attachment() instanceof Session) {                      ((Session) key.attachment()).updateLastOperationTime(); //4                    }                 dispatchEvent(key);    // 5                } catch (Throwable ex) {                 disConnectWithException(key, ex);                }            }            processUnRegister();  // 6        }}
复制代码

3.5 IO 事件分发处理

每个 reactor 线程都有一个专门的 Accepter 类去处理各种 IO 事件。TCPAccepter 可以处理全部的四种事件(OP_ACCEPT、OP_CONNECT、OP_WRITE、OP_READ)、UDPAccepter 由于不需要建立连接所以只需要处理读和写两种事件。

1. 处理 OP_ACCEPT

  1. 获取 channel,处理 TCP 请求。

  2. 为这个 TCP 请求创建 TCPSession,会话的状态是服务器已连接

  3. 会话注册到 sessionManager 中,Tars 服务可配置最大连接数 maxconns,如果超过就会关闭当前会话。

  4. 寻找下一个 reactor 线程进行多路复用器与 channel 的绑定。


代码 6:

public void handleAcceptEvent(SelectionKey key) throws IOException {    ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 1    SocketChannel channel = server.accept();       channel.socket().setTcpNoDelay(selectorManager.isTcpNoDelay());    channel.configureBlocking(false);    Utils.setQosFlag(channel.socket());    TCPSession session = new TCPSession(selectorManager);    // 2    session.setChannel(channel);    session.setStatus(SessionStatus.SERVER_CONNECTED);    session.setKeepAlive(selectorManager.isKeepAlive());    session.setTcpNoDelay(selectorManager.isTcpNoDelay());    SessionManager.getSessionManager().registerSession(session);   // 3      selectorManager.nextReactor().registerChannel(channel, SelectionKey.OP_READ, session); // 4}
复制代码


2. 处理 OP_CONNECT

  1. 获取客户端连接过来的 channel 通道

  2. 获取 Session

  3.  与服务器建立连接,将关注的兴趣 OPS 设置为 ready 就绪事件,session 中的状态修改为客户端已连接

  4. 处理 OP_CONNECT


代码 7:

public void handleConnectEvent(SelectionKey key) throws IOException {    SocketChannel client = (SocketChannel) key.channel();  // 1    TCPSession session = (TCPSession) key.attachment();   //2    if (session == null) throw new RuntimeException("The session is null when connecting to ...");    try {  // 3        client.finishConnect();        key.interestOps(SelectionKey.OP_READ);        session.setStatus(SessionStatus.CLIENT_CONNECTED);    } finally {        session.finishConnect();    }}
复制代码


3.处理 OP_WRITE、 处理 OP_READ

代码 8:

public void handleReadEvent(SelectionKey key) throws IOException {    TCPSession session = (TCPSession) key.attachment();    if (session == null) throw new RuntimeException("The session is null when reading data...");    session.read();}public void handleWriteEvent(SelectionKey key) throws IOException {    TCPSession session = (TCPSession) key.attachment();    if (session == null) throw new RuntimeException("The session is null when writing data...");    session.doWrite();}
复制代码

3.6 seesion 中网络读写的事件详细处理过程

1. 读事件处理

申请 2k 的 ByteBuffer 空间,读取 channel 中的数据到 readBuffer 中。根据 sessionStatus 判断是客户端读响应还是服务器读请求,分别进行处理。


代码 9:

protected void read() throws IOException {    int ret = readChannel();    if (this.status == SessionStatus.CLIENT_CONNECTED) {        readResponse();    } else if (this.status == SessionStatus.SERVER_CONNECTED) {        readRequest();    } else {        throw new IllegalStateException("The current session status is invalid. [status:" + this.status + "]");    }    if (ret < 0) {        close();        return;    }}private int readChannel() throws IOException {    int readBytes = 0, ret = 0;    ByteBuffer data = ByteBuffer.allocate(1024 * 2);  // 1    if (readBuffer == null) {        readBuffer = IoBuffer.allocate(bufferSize);    }       // 2    while ((ret = ((SocketChannel) channel).read(data)) > 0) {        data.flip();  // 3        readBytes += data.remaining();        readBuffer.put(data.array(), data.position(), data.remaining());        data.clear();    }    return ret < 0 ? ret : readBytes;}
复制代码


① 客户端读响应

从当前 readBuffer 中的内容复制到一个新的临时 buffer 中,并且切换到读模式,使用 TarsCodec 类解析出 buffer 内的协议字段到 response,WorkThread 线程通知 Ticket 处理 response。如果 response 为空,则重置 tempBuffer 到 mark 的位置,重新解析协议。 


代码 10:

public void readResponse() {    Response response = null;    IoBuffer tempBuffer = null;        tempBuffer = readBuffer.duplicate().flip();        while (true) {            tempBuffer.mark();            if (tempBuffer.remaining() > 0) {                response = selectorManager.getProtocolFactory().getDecoder().decodeResponse(tempBuffer, this);            } else {                response = null;            }            if (response != null) {                if (response.getTicketNumber() == Ticket.DEFAULT_TICKET_NUMBER) response.setTicketNumber(response.getSession().hashCode());                selectorManager.getThreadPool().execute(new WorkThread(response, selectorManager));            } else {                tempBuffer.reset();                readBuffer = resetIoBuffer(tempBuffer);                break;            }        }}
复制代码


② 服务器读请求

任务放入线程池交给 WorkThread 线程,最终交给 Processor 类出构建请求的响应体,包括分布式上下文,然后经过 FilterChain 的处理,最终通过 jdk 提供的反射方法 invoke 服务端本地的方法然后返回 response。如果线程池抛出拒绝异常,则返回 SERVEROVERLOAD = -9,服务端过载保护。如果 request 为空,则重置 tempBuffer 到 mark 的位置,重新解析协议。


代码 11:

public void readRequest() {    Request request = null;    IoBuffer tempBuffer = readBuffer.duplicate().flip();        while (true) {            tempBuffer.mark();            if (tempBuffer.remaining() > 0) {                request = selectorManager.getProtocolFactory().getDecoder().decodeRequest(tempBuffer, this);            } else {                request = null;            }            if (request != null) {                try {                    request.resetBornTime();                    selectorManager.getThreadPool().execute(new WorkThread(request, selectorManager));                } catch (RejectedExecutionException e) {                  selectorManager.getProcessor().overload(request, request.getIoSession());                } catch (Exception ex) {                  ex.printStackTrace();                }            } else {                    tempBuffer.reset();                readBuffer = resetIoBuffer(tempBuffer);                break;            }        }}
复制代码


2. 写事件处理

同样也包括客户端写请求和服务端写响应两种,其实这两种都是往 TCPSession 中的 LinkedBlockingQueue(有界队列最大 8K)中插入 ByteBuffer。LinkedBlockingQueue 中的 ByteBuffer 最终会由 TCPAcceptor 中的 handleWriteEvent 监听写就绪事件并消费。


代码 12:

protected void write(IoBuffer buffer) throws IOException {    if (buffer == null) return;    if (channel == null || key == null) throw new IOException("Connection is closed");    if (!this.queue.offer(buffer.buf())) {        throw new IOException("The session queue is full. [ queue size:" + queue.size() + " ]");    }    if (key != null) {        key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);        key.selector().wakeup();    }}
复制代码


四、总结

本文主要介绍了 Java NIO 编程的基础知识 和 Tars-Java 1.7.2 版本的网络编程模块的源码实现。


在最新的 Tars-Java 的 master 分支中我们可以发现网络编程已经由 NIO 改成了 Netty,虽然 Netty 更加成熟稳定,但是作为学习者了解 NIO 的原理也是掌握网络编程的必经之路。


更多关于 Tars 框架的介绍可以访问:

https://tarscloud.org/

本文分析源码地址(v1.7.x 分支):

https://github.com/TarsCloud/TarsJava


发布于: 刚刚阅读数: 4
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
Tars-Java网络编程源码分析_网络编程_vivo互联网技术_InfoQ写作社区