写点什么

Netty 实战 -- 使用 Netty 实现分布式框架 Dubbo RPC

作者:Bug终结者
  • 2022 年 8 月 12 日
    河北
  • 本文字数:5520 字

    阅读完需:约 18 分钟

Netty实战 -- 使用Netty实现分布式框架Dubbo RPC

一、什么是 RPC?

RPC【Remote Procedure Call】是指远程过程调用,是一种进程间通信方式,他是一种技术的思想,而不是规范。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。即程序员无论是调用本地的还是远程的函数,本质上编写的调用代码基本相同。


常见的 RPC 框架有: 比较知名的阿里 Dubbo、Google 的 GRPC、Go 的 RPCX、Apache 的 Thrift,SpringCloud


1.1 RPC 基本原理

两个或多个应用程序都分布在不同的服务器上,它们之间的调用都像是本地方法调用一样


示意图



RPC 两个核心模块:序列化和通讯

1.2 RPC 执行流程

在 RPC 中,Client 叫做服务消费者,Server 叫做服务提供者


RPC 调用流程说明


  1. 服务消费方(client),以本地调用方式调用服务

  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

  3. client stub 将消息进行编码并发送到服务器

  4. server stub 收到消息后进行解码

  5. server stub 根据解码结果调用本地 API

  6. 本地服务 执行并返回结果给 server stub

  7. server stub 将返回结果导入进行编码并发送至消费方

  8. client stub 接收到数据进行解码

  9. 服务消费方得到结果


RPC 的目标就是将 2~8 的步骤封装起来,用户无需关注这些细节,可以像调用本地方法一样即可完成远程服务调用


图解


二、什么是代理模式?

代理模式的定义:代理模式给某一个对象提供一个代理对象,并由代理对象控制对原对象的引用。通俗的来讲代理模式就是我们生活中常见的中介。


Java 中代理模式分为静态代理动态代理模式

2.1 案例实现

中午到了,小明很饿,于是在美团外卖上点了一份烤鸭,过了半个小时,外卖到了,小明下去拿外卖,顺利的吃上了烤鸭~

2.2 静态代理方式

由程序员手动创建代理类或工具对象,从而实现调用服务


Subject 类


package com.wanshi.netty.dubborpc.netty;
public interface Subject {
String buy(String msg);}
复制代码


SubjectImpl 类


package com.wanshi.netty.dubborpc.netty;
public class SubjectImpl implements Subject{ @Override public String buy(String msg) { return "买了" + msg; }}
复制代码


ProxySubject 类


package com.wanshi.netty.dubborpc.netty;

public class ProxySubject {
private Subject subject;
{ subject = new SubjectImpl(); }
public void buy(String msg) { System.out.println("美团外卖,使命必达,跑腿代买!"); String buy = subject.buy(msg); System.out.println(buy); }}
复制代码


测试类


public static void main(String[] args) {    ProxySubject subject = new ProxySubject();    subject.buy("北京烤鸭");}
复制代码


效果


⛽静态代理的优缺点

缺点: 不利于扩展,调用一次就要创建一次对象,从而造成不必要的内存空间浪费


优点: 可读性好,逻辑简单,清晰明了

2.3 动态代理方式

动态代理又分为:JDK 动态代理CGLIB 动态代理


在程序运行时,运用反射机制动态创建而成,达到调用服务


使用以上的 Subject 类和实现类


SubjectInvocationHandler 处理器类


package com.wanshi.netty.dubborpc.netty;
import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;
public class SubjectInvocationHandler implements InvocationHandler {
private Object obj;
public SubjectInvocationHandler(Object obj) { this.obj = obj; }
@Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Object res = method.invoke(obj, args); return res; }}
复制代码


ProxyFactry 工厂类


package com.wanshi.netty.dubborpc.netty;
import java.lang.reflect.Proxy;
public class ProxyFactry {
public static Subject getInstance() { SubjectImpl subject = new SubjectImpl(); System.out.println("美团外卖,使命必达,跑腿代买!"); SubjectInvocationHandler subjectInvocationHandler = new SubjectInvocationHandler(subject); Subject proxy = (Subject) Proxy.newProxyInstance(subject.getClass().getClassLoader(), subject.getClass().getInterfaces(), subjectInvocationHandler); return proxy; }}
复制代码


测试类


public static void main(String[] args) {    Subject subject = ProxyFactry.getInstance();    String buy = subject.buy("饮料");    System.out.println(buy);}
复制代码


效果


⛽动态代理的优缺点

两种动态代理对照表


三、Netty 实现 DubboRPC

3.1 需求说明

  1. dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架

  2. 模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据,底层使用 Netty 框架

3.2 剖析需求

  1. 创建接口,定义抽象方法,用于服务消费者与服务提供者之间的约定

  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据

  3. 创建一个消费者,该类需要 透明的调用自己不存在的方法,内部使用 Netty 请求提供者返回数据


3.3 效果图

3.4 核心源码

♻️共用接口 API

HelloService


package com.wanshi.netty.dubborpc.publicinterface;
/** * 公共接口,提供服务 */public interface HelloService {
String hello(String msg);}
复制代码

♻️服务提供者

ServerBootstrap 启动类


package com.wanshi.netty.dubborpc.provider;
import com.wanshi.netty.dubborpc.netty.NettyServer;
/** * 服务提供者启动类,监听消费者,并绑定端口8888 */public class ServerBootstrap {
public static void main(String[] args) { NettyServer.startServer("127.0.0.1", 8888); }}
复制代码


NettyServer


package com.wanshi.netty.dubborpc.netty;
import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;
/** * Netty服务器类,启动服务 */public class NettyServer {

/** * 开启服务方法,调用内部私有启动服务方法,此类写法很常用,进一层的封装了API * @param hostName * @param port */ public static void startServer(String hostName, int port) { startServer0(hostName, port); }
/** * 真正启动服务的方法 * @param hostName * @param port */ private static void startServer0(String hostName, int port) { //创建2个线程,一个为主线程仅创建1个,另外创建工作线程CPU核数*2个 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { //服务器启动类 ServerBootstrap serverBootstrap = new ServerBootstrap();
//初始化参数 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); } });
System.out.println("服务端提供服务准备就绪...");
//绑定端口,启动服务,异步执行方法 ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //优雅的关闭线程 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
复制代码

♻️服务消费者

ClientBootstrap


package com.wanshi.netty.dubborpc.consumer;
import com.wanshi.netty.dubborpc.netty.NettyClient;import com.wanshi.netty.dubborpc.publicinterface.HelloService;
public class ClientBootstrap {
public static final String providerName = "hello#";
public static void main(String[] args) throws InterruptedException { NettyClient client = new NettyClient();
HelloService service = (HelloService) client.getBean(HelloService.class, providerName);
for (;;) { Thread.sleep(2000); String hello = service.hello("你好鸭 dubbo~"); System.out.println("服务端返回的结果:" + hello + "\n\n"); } }}
复制代码


NettyClient


package com.wanshi.netty.dubborpc.netty;
import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
public class NettyClient {
//创建线程池,大小为CPU核数*2 private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
//将客户端处理器提升至全局变量 private NettyClientHandler clientHandler;
//记录调用服务的次数 private int count;
/** * 代理对象,执行方法,这里用到了代理模式 * @param serviceClass * @param providerName * @return */ public Object getBean(final Class<?> serviceClass, String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass}, (proxy, method, args) -> {
System.out.println("(proxy, method, args) 进入,第" + (++count) + "次调用远程服务"); if (clientHandler == null) { initClient(); } clientHandler.setParam(providerName + args[0]); return executorService.submit(clientHandler).get(); }); }
/** * 初始化客户端 */ public void initClient() { clientHandler = new NettyClientHandler(); EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(clientHandler); } });
try { bootstrap.connect("127.0.0.1", 8888).sync(); } catch (InterruptedException e) { e.printStackTrace(); } }}
复制代码

四、源码下载

本教程已上传至 GitHub 代码托管平台,希望点个 Star 鸭~


NettyDubboRPC


使用Git爬取GitHub文件

⛵小结

以上就是【Bug 终结者】对基于 Netty 实现 Dubbo RPC 简单的概述,手写 Dubbo RPC,代码有点难度,但坚持的啃下来,多敲上几遍,熟能生巧,加油,相信你会对 RPC 有一个新的理解,加油,编程路上,你我都是追梦人~


如果这篇【文章】有帮助到你,希望可以给【Bug 终结者】点个赞👍,创作不易,如果有对【后端技术】、【前端领域】感兴趣的小可爱,也欢迎关注❤️❤️❤️ 【Bug 终结者】❤️❤️❤️,我将会给你带来巨大的【收获与惊喜】💝💝💝!

发布于: 刚刚阅读数: 7
用户头像

Bug终结者

关注

励志成为一个优秀的开发者~ 2021.12.09 加入

星星之火,可以燎原

评论

发布
暂无评论
Netty实战 -- 使用Netty实现分布式框架Dubbo RPC_Netty_Bug终结者_InfoQ写作社区