写点什么

dubbo 源码 v2.7 分析:通信过程及序列化协议

发布于: 2021 年 03 月 04 日
dubbo 源码 v2.7 分析:通信过程及序列化协议

系列文章

dubbo 源码 v2.7 分析:结构、container 入口及线程模型

dubbo 源码 v2.7 分析:SPI 机制

dubbo 源码 v2.7 分析:核心机制(一)

dubbo 源码 v2.7 分析:核心机制(二)


关注公众号:程序员架构进阶,每天实时获取更新,上百份面试资料和其他福利等你拿~


一 摘要

前面我们介绍了 dubbo 的核心机制,今天将开始分析远程调用流程。毕竟,作为一个 rpc 框架,远程调用是理论的核心内容。通过对 dubbo 相关实现的探究,深入了解 rpc 原理及可能的问题。

二 rpc 通信核心问题

2.1 rpc 核心问题

1、远程调用,使用什么传输协议

2、远程传输数据,不能直接原格式传输,只能通过流数据传输,序列化和反序列化协议选择?

3、发起请求方式?

4、请求处理方式?同步/异步,长连接/短链接...

除此之外,还有超时时间、异常处理、缓存、负载均衡策略、注册中心、网关等等相关问题,都需要解决。我们先关注前面四个。

2.2 rpc 框架构成

1)服务提供者,运行在服务器端,提供服务接口定义与服务实现类。

2)服务中心,运行在服务器端,负责将本地服务发布成远程服务,管理远程服务,提供给服务消费者使用。

3)服务消费者,运行在客户端,通过远程代理对象调用远程服务。

三 序列化和反序列化

3.1 相关概念

3.1.1 定义

先说一个老生常谈的问题,什么是序列化和反序列化?简单来说,定义如下:

序列化(Serialization):将数据结构或对象转换成二进制串的过程;

反序列化(Deserialization):将在序列化过程中所生成的二进制串转换成数据结构或者对象的过程。 

3.1.2 数据结构、对象与二进制串

不同的开发语言中,数据结构、对象和二进制串的表示方式略有不同。(内容来自美团技术团队的文章,序列化和反序列化

1)数据结构和对象:

对于类似 Java 这种完全面向对象的语言,开发者操作的都是对象(Object),来自于类的实例化。在 Java 语言中最接近数据结构的概念,就是 POJO(Plain Old Java Object)或者 Java Bean--那些只有 setter/getter 方法的类。而在 C++这种半面向对象的语言中,数据结构和 struct 对应,对象和 class 对应。


2)二进制串:

序列化所生成的二进制串指的是存储在内存中的一块数据。C++语言具有内存操作符,所以二进制串的概念容易理解,例如,C++语言的字符串可以直接被传输层使用,因为其本质上就是以’\0’结尾的存储在内存中的二进制串。在 Java 语言里面,二进制串的概念容易和 String 混淆。实际上 String 是 Java 的一等公民,是一种特殊对象(Object)。对于跨语言间的通讯,序列化后的数据当然不能是某种语言的特殊数据类型。二进制串在 Java 里面所指的是 byte[],byte 是 Java 的 8 中原生数据类型之一(Primitive data types)。

3.2 为什么要序列化?

这是与远程通信的过程相关的,网络传输的数据必须是二进制数据,但通常调用方请求的入参有可能是对象(各种面向对象语言),而对象是不能直接在网络中传输的,所以需要把它转成可传输的二进制串,并且要求转换算法是可逆的,这个过程也就是“序列化”过程。这样,服务提供方就可以正确地从输入的二进制数据中分割出不同的请求参数,同时根据请求类型和序列化类型,把二进制的消息体逆向还原成请求对象,这个过程也就是“反序列化”。

回顾一下 rpc 请求过程:

另外,作为一个框架,通用性是必须考虑的要素之一;而且,作为跨机器的请求,服务提供方和调用方也很可能使用的不是一种编程语言,那么保证两端的数据结构能够相互理解就是必须解决的一个问题,这也是序列化和反序列化的意义所在。

四 dubbo 的通信过程

以 dubbo 官方提供的服务方和客户端 demo 作为生产和消费两端,对应的请求过程如下图所示:

可见,dubbo 采用的是 Netty(TCP)协议作为默认的通信协议;通过 Netty 的 NIO 来进行数据传输,并使用了它的响应机制。

五 Dubbo 通信分析

先回顾一下 OSI 模型,下图左侧是 7 层结构,右侧是对该层的解释:

其中,传输层是因特网协议套件和 OSI 模型中的网络堆栈中的协议的分层体系结构中的方法的概念划分。该层的协议为应用程序提供主机到主机的通信服务。如 UDP、TCP。并且提供面向连接的通信,可靠性,流量控制和多路复用等服务。

在 dubbo 中,Transporter 就是对传输层的实现。它对于提供了 dubbo 服务间通讯的支持。有了它,各个服务就可以进行网络通信了,不再是信息孤岛了。

5.1 相关代码结构

dubbo 远程通信相关代码在 org.apache.dubbo.remoting 包下。

  • transport 为网路传送层,抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel、Transporter、Client、Server、Codec。

  • exchange 是信息交换层,封装请求响应模式,以 Request 为中心,Response 为中心, 扩展接口为 Exchanger、ExchangeChannel、ExchangeClient、ExchangeServer。


5.2 通信协议及框架

从上图可见,dubbo 支持了 mina, netty(3 & 4 版本), grizzly 三种通信框架,默认配置常量在 org.apache.dubbo.remoting.Constants 中,这是一个常量接口,并未定义任何抽象方法。里面的常量包括一些关键的 key(DISPATCHER_KEY,BIND_IP_KEY,BIND_PORT_KEY 等等),默认的 transporter、默认的远程客户端、默认的二进制协议等等。

5.3 Transport 分析

5.3.1 Transporter 接口

@SPI("netty")public interface Transporter {
/** * Bind a server. * * @param url server url * @param handler * @return server * @throws RemotingException * @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...) */ @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException;
/** * Connect to a server. * * @param url server url * @param handler * @return client * @throws RemotingException * @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...) */ @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
复制代码

通过这个接口定义可以知道:

1)dubbo 是 C/S 架构

2)Transport 提供了创建服务端和客户端的功能;

3)默认的 Transporter 是 NettyTransporter,@SPI("netty")注解中标记;

4)可以通过 server/client、transport 来配置 server/client 的类型,目前支持的类型有 netty、mina 等

注:3)中的 netty,指的是 netty4(2.5.6 版本以后,之前的版本是 netty3),也就是对应 transport.netty4 包下的内容,这是因为 netty4 包下的 NettyTransporter 中,name 是 netty,SPI 是通过 name 来获取对应的 Transporter 的:

而 netty 包下标记的 NAME 为 netty3:

5.3.2 NettyServer

1、属性和构造方法

继承自 AbstractServer,且实现了 Server 接口:

2、构造方法和 AbstractServer

构造方法的主要动作都是在 AbstractServer 中完成的(继承关系从子类到父类:AbstractServer->AbstractEndpoint->AbstractPeer),在这里设置了 ip、端口、空闲时间等参数,并调用了 doOpen()方法,这是定义在 AbstractServer 的抽象方法,所以需要子类实现:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {        super(url, handler);        localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
复制代码

AbstractPeer.java,除提供构造方法外,还提供了消息发送方法,以及一些 handler:

    public AbstractPeer(URL url, ChannelHandler handler) {        if (url == null) {            throw new IllegalArgumentException("url == null");        }        if (handler == null) {            throw new IllegalArgumentException("handler == null");        }        this.url = url;        this.handler = handler;    }
@Override public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false)); }
@Override public ChannelHandler getChannelHandler() { if (handler instanceof ChannelHandlerDelegate) { return ((ChannelHandlerDelegate) handler).getHandler(); } else { return handler; } }
/** * @return ChannelHandler */ @Deprecated public ChannelHandler getHandler() { return getDelegateHandler(); }
/** * Return the final handler (which may have been wrapped). This method should be distinguished with getChannelHandler() method * * @return ChannelHandler */ public ChannelHandler getDelegateHandler() { return handler; }
复制代码

Transport 初始化是通过 Transporters 这个工具类进行初始化的,查看源码,有以下两个 bind 方法,支持传参 url 和 ChannelHandler,并且支持传入多个 Handler:

    public static Server bind(String url, ChannelHandler... handler) throws RemotingException {        return bind(URL.valueOf(url), handler);    }
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
复制代码

3、doOpen

大部分方法是在 AbstractServer 中实现,NettyServer 中最重要的方法就是这个 doOpen 方法,从代码中可以看出,这就是一个标准流程:

    @Override    protected void doOpen() throws Throwable {        bootstrap = new ServerBootstrap();
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel();
}
复制代码


六 调用序列

通过一张时序图来回顾调用链路:


有一点需要注意(From dubbo 官网):

Remoting 实现是 Dubbo 协议的实现,如果你选择 RMI 协议,整个 Remoting 都不会用上,Remoting 内部再划为 Transport 传输层和 Exchange 信息交换层,Transport 层只负责单向消息传输,是对 Mina, Netty, Grizzly 的抽象,它也可以扩展 UDP 传输,而 Exchange 层是在传输层之上封装了 Request-Response 语义。


发布于: 2021 年 03 月 04 日阅读数: 16
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
dubbo 源码 v2.7 分析:通信过程及序列化协议