Netty RPC Demo 实现
简介
一个 RPC 框架 Demo 的简单实现(业务场景中还没用过RPC,但Dome应该还是有那个意思的)
完整的项目工程地址:RPC-Demo
工程运行说明
- 服务端启动:rpcfx-demo-server:ServerApplication 
- 客户端启动:rpcfx-demo-client:ClientApplication 
工程结构说明
RPC-DEMO├─rpcfx-core: 框架核心部分,client、server的代理|├─rpcfx-demo-api: 接口定义部分,类似于web后端给前端写的接口文档|├─rpcfx-demo-client: 客户端,调用接口发送请求|└─rpcfx-demo-server: 服务端,接收请求,进行CURD之类的
关于 RPC 的一些思考
秦老师说过一句话,可以道破其本质:RPC 就是为了 OOP。下面以HTTP的相似场景举例:

上面是一个比较简略的使用postman或者浏览器访问的数据流程,因为HTTP协议本质传输的也是一个问题。在自己接触的写前端请求后端接口,传输的基本是字符串
下面是 RPC 的:

RPC表示不想这么玩,咱后端就要有后端的样子,要像平时使用类,调用方法那样才用的爽。
类比平时Web的开发,客户端就像Vue前端,服务端就是使用Spring boot web写的那些业务CURD服务,RPC框架就是Spring Boot Web框架
RPC框架想达到的目的有如下几个:
- 1.客户端:封装屏蔽从客户端发送请求到服务端,并将结果序列号成对象的过程,让客户端感觉像在本地调用类方法一样 
- 2.服务端:封装屏蔽服务端根据请求路由到基本的处理类和函数的查找,让服务端只专心写好CRUD即可 
代码实现
根据上面的描述,客户端只需要调用接口获取结果进行处理,服务端只需要实现好接口CRUD就行了,其它的全由 RPC 框架帮你搞定
代码编写的大致流程如下:
- 1.定义接口:类似于后端给前端编写的 API 文档 
- 2.客户端调用:客户端调用接口,获取结果,并反序列化成对象 
- 3.服务端接收处理:服务端接收到请求,匹配相应的实现类,调用相应的方法,得到结果返回返回 
1.定义接口:类似于后端给前端编写的 API 文档
类似于接口文档,描述了能提供哪些功能、能返回什么、需要传什么参数。即本工程的 API 模块
public interface UserService {    /**     * find by id     * @param id id     * @return user     */    User findById(Integer id);}
2.客户端调用:客户端调用接口,获取结果,并反序列成对象
客户端只需要简单的使用RPC生成的代理,进行调用即可
2.1 客户端业务代码
public class ClientApplication {    public static void main(String[] args) {        RpcClient jdk = new RpcClientJdk();        UserService userService = jdk.create(UserService.class, "http://localhost:8080/");        User user = userService.findById(1);        if (user == null) {            log.info("Clint service invoke Error");            return;        }        System.out.println("find user id=1 from server: " + user.getName());    }}
2.2 RPC 框架客户端:代理类生成、netty请求发送和响应处理
下面的发送请求和读取结果序列号返回都是 RPC 框架的事情了,大致代码如下:
生成代理类:
public class RpcClientJdk extends RpcProxy implements RpcClient {    @Override    public <T> T create(Class<T> serviceClass, String url) {        // 查询是否之前生成过,存储的直接返回        if (!isExit(serviceClass.getName())) {            add(serviceClass.getName(), newProxy(serviceClass, url));        }        return (T) getProxy(serviceClass.getName());    }    private <T> T newProxy(Class<T> serviceClass, String url) {        ClassLoader loader = RpcClientJdk.class.getClassLoader();        Class[] classes = new Class[]{serviceClass};        return (T) Proxy.newProxyInstance(loader, classes, new RpcInvocationHandler(serviceClass, url));    }}
请求发送与返回处理具体实现
/** * 用于jdk、cglib、buddy * * @author lw1243925457 */@Slf4jpublic class RpcInvocationHandler implements InvocationHandler, MethodInterceptor {    private final Class<?> serviceClass;    private final String url;    <T> RpcInvocationHandler(Class<T> serviceClass, String url) {        this.serviceClass = serviceClass;        this.url = url;        ParserConfig.getGlobalInstance().setAutoTypeSupport(true);    }    @Override    public Object invoke(Object proxy, Method method, Object[] args) {        return process(serviceClass, method, args, url);    }    @Override    public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy) {        return process(serviceClass, method, args, url);    }    /**     * 发送请求到服务端     * 获取结果后序列号成对象,返回     * @param service service name     * @param method service method     * @param params method params     * @param url server host     * @return object     */    private Object process(Class<?> service, Method method, Object[] params, String url) {        log.info("Client proxy instance method invoke");        // 自定义了Rpc请求的结构 RpcRequest,放入接口名称、方法名、参数        log.info("Build Rpc request");        RpcRequest rpcRequest = new RpcRequest();        rpcRequest.setServiceClass(service.getName());        rpcRequest.setMethod(method.getName());        rpcRequest.setArgv(params);        // 客户端使用的 netty,发送请求到服务端,拿到结果(自定义结构:rpcfxResponse)        log.info("Client send request to Server");        RpcResponse rpcResponse;        try {            rpcResponse = RpcNettyClientSync.getInstance().getResponse(rpcRequest, url);        } catch (InterruptedException | URISyntaxException e) {            e.printStackTrace();            return null;        }        log.info("Client receive response Object");        assert rpcResponse != null;        if (!rpcResponse.getStatus()) {            log.info("Client receive exception");            rpcResponse.getException().printStackTrace();            return null;        }        // 序列化成对象返回        log.info("Response:: " + rpcResponse.getResult());        return JSON.parse(rpcResponse.getResult().toString());    }}
下面是 Netty 客户端部分,这部分的编写需要注意数据的流向和相应的处理:
下面是一些自定义的约定的数据库结构
/** * Rpc 自定义请求结构 *  * @author lw */@Datapublic class RpcRequest {    /**     * 接口类名称     */    private String serviceClass;        /**     * 方法名     */    private String method;    /**     * 参数     */    private Object[] argv;}
/** * Rpc 自定义响应结果 * @author lw */@Datapublic class RpcResponse {    /**     * 响应结果     */    private Object result;    /**     * 函数是否执行成功     */    private Boolean status;    /**     * 执行失败的异常信息     */    private Exception exception;}
/** * Netty 通信的数据格式 *  * @author lw1243925457 */@Datapublic class RpcProtocol {    /**     * 数据大小     */    private int len;    /**     * 数据内容     */    private byte[] content;}
netty 通信处理需要注意的是数据形式的变化和流向的处理
客户端发送和接收的数据流向大致如下:
发送请求: RpcRequest --> bytes -> RpcProtocol -> bytes接收请求: bytes -> RpcProtocol -> bytes -> RpcRequest
相关的处理函数如下:
RpcRequest --> bytes -> RpcProtocol
public class RpcNettyClientSync {    ......    /**     * 调用channel发送请求,从handler中获取响应结果     * @return 响应     * @throws InterruptedException exception     */    public RpcResponse getResponse(RpcRequest rpcRequest, String url) throws InterruptedException,            URISyntaxException {        RpcProtocol request = convertNettyRequest(rpcRequest);        // 查看缓存池中是否有可重用的channel        ......        // 没有或者不可用则新建        // 并将最终的handler添加到pipeline中,拿到结果后返回        ......        channel.writeAndFlush(request).sync();        return handler.getResponse();    }    ......    /**     * 将 {@RpcRequest} 转成 netty 自定义的通信格式 {@RpcProtocol}     * @param rpcRequest RpcRequest     * @return RpcProtocol     */    private RpcProtocol convertNettyRequest(RpcRequest rpcRequest) {        RpcProtocol request = new RpcProtocol();        String requestJson = JSON.toJSONString(rpcRequest);        request.setLen(requestJson.getBytes(CharsetUtil.UTF_8).length);        request.setContent(requestJson.getBytes(CharsetUtil.UTF_8));        return request;    }}
RpcProtocol -> bytes,到这请求就发送出去了
/** * Rpc 自定义编码器 * RpcProtocol -> bytes * * @author lw1243925457 */@Slf4jpublic class RpcEncoder extends MessageToByteEncoder<RpcProtocol> {    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, RpcProtocol msg, ByteBuf out) throws Exception {        log.info("Netty rpc encode run");        out.writeInt(msg.getLen());        out.writeBytes(msg.getContent());    }}
响应处理就是一个简单的逆向处理,在 netty中处理大致如下:
/** * Rpc framework 自定义解码器 * bytes -> rpcProtocol * * @author lw1243925457 */@Slf4jpublic class RpcDecoder extends ByteToMessageDecoder {    private int length = 0;    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> out) throws Exception {        log.info("Netty decode run");        if (in.readableBytes() >= 4) {            if (length == 0) {                length = in.readInt();            }            if (in.readableBytes() < length) {                log.info("Readable data is less, wait");                return;            }            byte[] content = new byte[length];            if (in.readableBytes() >= length) {                in.readBytes(content);                RpcProtocol rpcProtocol = new RpcProtocol();                rpcProtocol.setLen(length);                rpcProtocol.setContent(content);                out.add(rpcProtocol);            }            length = 0;        }    }}/** * 这里使用并发的等待-通知机制来拿到结果 * @author lw */@Slf4jpublic class RpcClientSyncHandler extends SimpleChannelInboundHandler<RpcProtocol> {    private CountDownLatch latch;    private RpcResponse response;    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcProtocol msg) {        log.info("Netty client receive message:");        log.info("Message length: " + msg.getLen());        log.info("Message content: " + new String(msg.getContent(), CharsetUtil.UTF_8));        // 将 RpcResponse字符串 反序列化成 RpcResponse对象        RpcResponse rpcfxResponse = JSON.parseObject(new String(msg.getContent(), CharsetUtil.UTF_8),                RpcResponse.class);        log.info("Netty client serializer : " + rpcfxResponse.toString());        response = rpcfxResponse;        latch.countDown();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        cause.printStackTrace();        ctx.close();    }    /**     * 锁的初始化     * @param latch CountDownLatch     */    void setLatch(CountDownLatch latch) {        this.latch = latch;    }    /**     * 阻塞等待结果后返回     * @return 后台服务器响应     * @throws InterruptedException exception     */    RpcResponse getResponse() throws InterruptedException {        latch.await();        return response;    }}
这样,客户端的部分就写好了
3.服务端接收处理:服务端接收到请求,匹配相应的实现类,调用相应的方法,得到结果返回返回
服务端把 netty server 启动以后,专心写业务 CURD就行了;其他的部分都交给框架去处理
3.1 服务端业务代码的编写
利用Spring,需要将接口相应的实现放入容器中,用于后面反射代理时查找获取,并启动 netty server。大致代码如下:
public class OrderServiceImpl implements OrderService {    @Override    public Order findById(Integer id) {        return new Order(1, "RPC", 1);    }    @Override    public Order findError() {        throw new CustomException("Custom exception");    }}/** * 配置接口的实现类 *  * @author lw */@Configurationpublic class BeanConfig {    @Bean("com.example.demo.service.UserService")    public UserService userService() {        return new UserServiceImpl();    }    @Bean("com.example.demo.service.OrderService")    public OrderService orderService() {        return new OrderServiceImpl();    }}/** * 不使用 spring boot web,启动 netty server 进行监听 *  * @author lw */@SpringBootApplication@Slf4jpublic class ServerApplication implements ApplicationRunner {    private final RpcNettyServer rpcNettyServer;    public ServerApplication(RpcNettyServer rpcNettyServer) {        this.rpcNettyServer = rpcNettyServer;    }    public static void main(String[] args) {        SpringApplication.run(ServerApplication.class, args);    }    @Override    public void run(ApplicationArguments args) {        try {            rpcNettyServer.run();        } catch (Exception e) {            e.printStackTrace();        } finally {            rpcNettyServer.destroy();        }    }}
3.2 RPC 框架服务端:接收请求、生成代理类、调用方法、返回结果
服务端数据流向和客户相反,大致如下:
接收请求: bytes -> RpcProtocol -> bytes -> RpcRequest发送请求: RpcRequest --> bytes -> RpcProtocol -> bytes
数据流向处理和客户端差不多,这里就不赘述了,大致代码如下:
/** * Netty Server 启动类 *  * @author lw1243925457 */@Slf4j@Componentpublic class RpcNettyServer {    private final ApplicationContext context;    private EventLoopGroup boss;    private EventLoopGroup worker;    public RpcNettyServer(ApplicationContext context) {        this.context = context;    }    public void destroy() {        worker.shutdownGracefully();        boss.shutdownGracefully();    }    public void run() throws Exception {        boss = new NioEventLoopGroup(1);        worker = new NioEventLoopGroup();        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(boss, worker)                .channel(NioServerSocketChannel.class)                .childHandler(new ChannelInitializer() {                    @Override                    protected void initChannel(Channel channel) throws Exception {                        ChannelPipeline pipeline = channel.pipeline();                        pipeline.addLast("Message Encoder", new RpcEncoder());                        pipeline.addLast("Message Decoder", new RpcDecoder()); // 将 byte 转为 RpcProtocol,放入下一个handler                        pipeline.addLast("Message Handler", new RpcServerHandler(context));                    }                });        int port = 8080;        Channel channel = serverBootstrap.bind(port).sync().channel();        log.info("Netty server listen in port: " + port);        channel.closeFuture().sync();    }}public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol> {    private ApplicationContext applicationContext;    RpcServerHandler(ApplicationContext applicationContext){        this.applicationContext = applicationContext;    }    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcProtocol msg) throws Exception {        log.info("Netty server receive message:");        log.info("Message length: " + msg.getLen());        log.info("Message content: " + new String(msg.getContent(), CharsetUtil.UTF_8));        // 获取 RpcProtocol中的 RpcRequest内容,反序列化成 RpcRequest 对象        RpcRequest rpcRequest = JSON.parseObject(new String(msg.getContent(), CharsetUtil.UTF_8),                RpcRequest.class);        log.info("Netty server serializer : " + rpcRequest.toString());        // 获取相应的bean,反射调用方法,获取结果        RpcResponse response = invoke(rpcRequest);        // 返回结果给netty 客户端        RpcProtocol message = new RpcProtocol();        String requestJson = JSON.toJSONString(response);        message.setLen(requestJson.getBytes(CharsetUtil.UTF_8).length);        message.setContent(requestJson.getBytes(CharsetUtil.UTF_8));        channelHandlerContext.writeAndFlush(message).sync();        log.info("return response to client end");    }    /**     * 获取接口实现对应的bean,反射调用方法,返回结果     * @param request rpc request     * @return result     */    private RpcResponse invoke(RpcRequest request) {        RpcResponse response = new RpcResponse();        String serviceClass = request.getServiceClass();        Object service = applicationContext.getBean(serviceClass);        try {            Method method = resolveMethodFromClass(service.getClass(), request.getMethod());            Object result = method.invoke(service, request.getArgv());            log.info("Server method invoke result: " + result.toString());            response.setResult(JSON.toJSONString(result, SerializerFeature.WriteClassName));            response.setStatus(true);            log.info("Server Response serialize to string return");            return response;        } catch ( IllegalAccessException | InvocationTargetException | CustomException e) {            e.printStackTrace();            response.setException(e);            response.setStatus(false);            return response;        }    }    private Method resolveMethodFromClass(Class<?> klass, String methodName) {        return Arrays.stream(klass.getMethods()).filter(m -> methodName.equals(m.getName())).findFirst().get();    }}
版权声明: 本文为 InfoQ 作者【萧】的原创文章。
原文链接:【http://xie.infoq.cn/article/e6944fda87dff68e8e66d3100】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。

萧
还未添加个人签名 2018.09.09 加入
代码是门手艺活,也是门艺术活











 
    
评论