netty 案例,netty4.1 高级应用篇三,手写 RPC 框架第三章《RPC 中间件》
案例介绍
结合上面两章节,本章将实现rpc的基础功能;提供一给rpc中间件jar给生产端和服务端。
技术点;
1、注册中心,生产者在启动的时候需要将本地接口发布到注册中心,我们这里采用redis作为注册中心,随机取数模拟权重。
2、客户端在启动的时候,连接到注册中心,也就是我们的redis。连接成功后将配置的生产者方法发布到注册中心{接口+别名}。
3、服务端配置生产者的信息后,在加载xml时候由中间件生成动态代理类,当发生发放调用时实际则调用了我们代理类的方法,代理里会通过netty的futuer通信方式进行数据交互。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
3、windows redis
代码示例
itstack-demo-rpc-03└── src └── main │ ├── java │ │ └── org.itstack.demo.rpc │ │ ├── config │ │ ├── domain │ │ ├── network │ │ │ ├── client │ │ │ │ ├── ClientSocket.java │ │ │ │ └── MyClientHandler.java │ │ │ ├── codec │ │ │ │ ├── RpcDecoder.java │ │ │ │ └── RpcEncoder.java │ │ │ ├── future │ │ │ │ ├── SyncWrite.java │ │ │ │ ├── SyncWriteFuture.java │ │ │ │ ├── SyncWriteMap.java │ │ │ │ └── WriteFuture.java │ │ │ ├── msg │ │ │ │ ├── Request.java │ │ │ │ └── Response.java │ │ │ ├── server │ │ │ │ ├── MyServerHandler.java │ │ │ │ └── ServerSocket.java │ │ │ └── util │ │ │ └── SerializationUtil.java │ │ ├── reflect │ │ │ ├── JDKInvocationHandler.java │ │ │ └── JDKProxy.java │ │ ├── registry │ │ │ └── RedisRegistryCenter.java │ │ └── util │ └── resource │ └── META-INF │ ├── rpc.xsd │ ├── spring.handlers │ └── spring.schemas └── test ├── java │ └── org.itstack.demo.test │ ├── service │ │ ├── impl │ │ │ └── HelloServiceImpl.java │ │ └── HelloService.java │ └── ApiTest.java └── resource ├── itstack-rpc-center.xml ├── itstack-rpc-consumer.xml ├── itstack-rpc-provider.xml └── log4j.xml
>ConsumerBean.java
package org.itstack.demo.rpc.config.spring.bean;import com.alibaba.fastjson.JSON;import io.netty.channel.ChannelFuture;import org.itstack.demo.rpc.config.ConsumerConfig;import org.itstack.demo.rpc.domain.RpcProviderConfig;import org.itstack.demo.rpc.network.client.ClientSocket;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.reflect.JDKProxy;import org.itstack.demo.rpc.registry.RedisRegistryCenter;import org.itstack.demo.rpc.util.ClassLoaderUtils;import org.springframework.beans.factory.FactoryBean;import org.springframework.util.Assert;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class ConsumerBean<T> extends ConsumerConfig<T> implements FactoryBean { private ChannelFuture channelFuture; private RpcProviderConfig rpcProviderConfig; @Override public Object getObject() throws Exception { //从redis获取链接 if (null == rpcProviderConfig) { String infoStr = RedisRegistryCenter.obtainProvider(nozzle, alias); rpcProviderConfig = JSON.parseObject(infoStr, RpcProviderConfig.class); } Assert.isTrue(null != rpcProviderConfig); //获取通信channel if (null == channelFuture) { ClientSocket clientSocket = new ClientSocket(rpcProviderConfig.getHost(), rpcProviderConfig.getPort()); new Thread(clientSocket).start(); for (int i = 0; i < 100; i++) { if (null != channelFuture) break; Thread.sleep(500); channelFuture = clientSocket.getFuture(); } } Assert.isTrue(null != channelFuture); Request request = new Request(); request.setChannel(channelFuture.channel()); request.setNozzle(nozzle); request.setRef(rpcProviderConfig.getRef()); request.setAlias(alias); return (T) JDKProxy.getProxy(ClassLoaderUtils.forName(nozzle), request); } @Override public Class<?> getObjectType() { try { return ClassLoaderUtils.forName(nozzle); } catch (ClassNotFoundException e) { return null; } } @Override public boolean isSingleton() { return true; }}
>ProviderBean.java
package org.itstack.demo.rpc.config.spring.bean;import com.alibaba.fastjson.JSON;import org.itstack.demo.rpc.config.ProviderConfig;import org.itstack.demo.rpc.domain.LocalServerInfo;import org.itstack.demo.rpc.domain.RpcProviderConfig;import org.itstack.demo.rpc.registry.RedisRegistryCenter;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.BeansException;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class ProviderBean extends ProviderConfig implements ApplicationContextAware { private Logger logger = LoggerFactory.getLogger(ProviderBean.class); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RpcProviderConfig rpcProviderConfig = new RpcProviderConfig(); rpcProviderConfig.setNozzle(nozzle); rpcProviderConfig.setRef(ref); rpcProviderConfig.setAlias(alias); rpcProviderConfig.setHost(LocalServerInfo.LOCAL_HOST); rpcProviderConfig.setPort(LocalServerInfo.LOCAL_PORT); //注册生产者 long count = RedisRegistryCenter.registryProvider(nozzle, alias, JSON.toJSONString(rpcProviderConfig)); logger.info("注册生产者:{} {} {}", nozzle, alias, count); }}
>ServerBean.java
package org.itstack.demo.rpc.config.spring.bean;import org.itstack.demo.rpc.config.ServerConfig;import org.itstack.demo.rpc.domain.LocalServerInfo;import org.itstack.demo.rpc.network.server.ServerSocket;import org.itstack.demo.rpc.registry.RedisRegistryCenter;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.BeansException;import org.springframework.beans.factory.InitializingBean;import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class ServerBean extends ServerConfig implements ApplicationContextAware { private Logger logger = LoggerFactory.getLogger(ServerBean.class); @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //启动注册中心 logger.info("启动注册中心 ..."); RedisRegistryCenter.init(host, port); logger.info("启动注册中心完成 {} {}", host, port); //初始化服务端 logger.info("初始化生产端服务 ..."); ServerSocket serverSocket = new ServerSocket(applicationContext); Thread thread = new Thread(serverSocket); thread.start(); while (!serverSocket.isActiveSocketServer()) { try { Thread.sleep(500); } catch (InterruptedException ignore) { } } logger.info("初始化生产端服务完成 {} {}", LocalServerInfo.LOCAL_HOST, LocalServerInfo.LOCAL_PORT); }}
>MyClientHandler.java
package org.itstack.demo.rpc.network.client;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import org.itstack.demo.rpc.network.future.SyncWriteFuture;import org.itstack.demo.rpc.network.future.SyncWriteMap;import org.itstack.demo.rpc.network.msg.Response;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { Response msg = (Response) obj; String requestId = msg.getRequestId(); SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId); if (future != null) { future.setResponse(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
>MyServerHandler.java
package org.itstack.demo.rpc.network.server;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;import org.itstack.demo.rpc.util.ClassLoaderUtils;import org.springframework.context.ApplicationContext;import java.lang.reflect.Method;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class MyServerHandler extends ChannelInboundHandlerAdapter { private ApplicationContext applicationContext; MyServerHandler(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } @Override public void channelRead(ChannelHandlerContext ctx, Object obj) { try { Request msg = (Request) obj; //调用 Class<?> classType = ClassLoaderUtils.forName(msg.getNozzle()); Method addMethod = classType.getMethod(msg.getMethodName(), msg.getParamTypes()); Object objectBean = applicationContext.getBean(msg.getRef()); Object result = addMethod.invoke(objectBean, msg.getArgs()); //反馈 Response request = new Response(); request.setRequestId(msg.getRequestId()); request.setResult(result); ctx.writeAndFlush(request); //释放 ReferenceCountUtil.release(msg); } catch (Exception e) { e.printStackTrace(); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); }}
>JDKInvocationHandler.java
package org.itstack.demo.rpc.reflect;import org.itstack.demo.rpc.network.future.SyncWrite;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;public class JDKInvocationHandler implements InvocationHandler { private Request request; public JDKInvocationHandler(Request request) { this.request = request; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] paramTypes = method.getParameterTypes(); if ("toString".equals(methodName) && paramTypes.length == 0) { return request.toString(); } else if ("hashCode".equals(methodName) && paramTypes.length == 0) { return request.hashCode(); } else if ("equals".equals(methodName) && paramTypes.length == 1) { return request.equals(args[0]); } //设置参数 request.setMethodName(methodName); request.setParamTypes(paramTypes); request.setArgs(args); request.setRef(request.getRef()); Response response = new SyncWrite().writeAndSync(request.getChannel(), request, 5000); //异步调用 return response.getResult(); }}
>JDKProxy.java
package org.itstack.demo.rpc.reflect;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.util.ClassLoaderUtils;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Proxy;public class JDKProxy { public static <T> T getProxy(Class<T> interfaceClass, Request request) throws Exception { InvocationHandler handler = new JDKInvocationHandler(request); ClassLoader classLoader = ClassLoaderUtils.getCurrentClassLoader(); T result = (T) Proxy.newProxyInstance(classLoader, new Class[]{interfaceClass}, handler); return result; }}
>RedisRegistryCenter.java
package org.itstack.demo.rpc.registry;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/7 * redis 模拟RPC注册中心 */public class RedisRegistryCenter { private static Jedis jedis; //非切片额客户端连接 //初始化redis public static void init(String host, int port) { // 池基本配置 JedisPoolConfig config = new JedisPoolConfig(); config.setMaxIdle(5); config.setTestOnBorrow(false); JedisPool jedisPool = new JedisPool(config, host, port); jedis = jedisPool.getResource(); } /** * 注册生产者 * * @param nozzle 接口 * @param alias 别名 * @param info 信息 * @return 注册结果 */ public static Long registryProvider(String nozzle, String alias, String info) { return jedis.sadd(nozzle + "_" + alias, info); } /** * 获取生产者 * 模拟权重,随机获取 * @param nozzle 接口名称 */ public static String obtainProvider(String nozzle, String alias) { return jedis.srandmember(nozzle + "_" + alias); } public static Jedis jedis() { return jedis; }}
>ApiTest.java
public class ApiTest { public static void main(String[] args) { String[] configs = {"itstack-rpc-center.xml", "itstack-rpc-provider.xml", "itstack-rpc-consumer.xml"}; new ClassPathXmlApplicationContext(configs); }}
框架,测试结果
2019-....ClassPathXmlApplicationContext:prepareRefresh:510] - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@299a06ac: startup date [Tue May 07 20:19:47 CST 2019]; root of context hierarchy2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-center.xml]2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-provider.xml]2019-...ml.XmlBeanDefinitionReader:loadBeanDefinitions:315] - Loading XML bean definitions from class path resource [spring/itstack-rpc-consumer.xml]2019-...upport.DefaultListableBeanFactory:preInstantiateSingletons:577] - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@7e0b0338: defining beans [consumer_itstack,provider_helloService,consumer_helloService]; root of factory hierarchy2019-...bean.ServerBean:setApplicationContext:25] - 启动注册中心 ...2019-...bean.ServerBean:setApplicationContext:27] - 启动注册中心完成 127.0.0.1 63792019-...bean.ServerBean:setApplicationContext:30] - 初始化生产端服务 ...2019-...bean.ServerBean:setApplicationContext:41] - 初始化生产端服务完成 10.13.81.104 222012019-...bean.ProviderBean:setApplicationContext:35] - 注册生产者:org.itstack.demo.test.service.HelloService itStackRpc 0
框架应用
为了测试我们写两个测试工程;itstack-demo-rpc-provider、itstack-demo-rpc-consumer
>itstack-demo-rpc-provider 提供生产者接口
itstack-demo-rpc-provider├── itstack-demo-rpc-provider-export│ └── src│ └── main│ └── java│ └── org.itstack.demo.rpc.provider.export│ ├── domain │ │ └── Hi.java│ └── HelloService.java│ └── itstack-demo-rpc-provider-web └── src └── main ├── java │ └── org.itstack.demo.rpc.provider.web │ └── HelloServiceImpl.java └── resources └── spring └── spring-itstack-rpc-provider.xml
>HelloService.java
public interface HelloService { String hi(); String say(String str); String sayHi(Hi hi);}
>HelloServiceImpl.java
@Controller("helloService")public class HelloServiceImpl implements HelloService { @Override public String hi() { return "hi itstack rpc"; } @Override public String say(String str) { return str; } @Override public String sayHi(Hi hi) { return hi.getUserName() + " say:" + hi.getSayMsg(); }}
>spring-itstack-rpc-provider.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd"> <!-- 注册中心 --> <rpc:server id="rpcServer" host="127.0.0.1" port="6379"/> <rpc:provider id="helloServiceRpc" nozzle="org.itstack.demo.rpc.provider.export.HelloService" ref="helloService" alias="itstackRpc"/></beans>
>itstack-demo-rpc-consumer 提供消费者调用
itstack-demo-rpc-consumer└── src ├── main │ ├── java │ └── resources │ └── spring │ └── spring-itstack-rpc-consumer.xml │ └── test └── java └── org.itstack.demo.test └── ConsumerTest.java
>spring-itstack-rpc-consumer.xml
<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rpc="http://rpc.itstack.org/schema/rpc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://rpc.itstack.org/schema/rpc http://rpc.itstack.org/schema/rpc/rpc.xsd"> <!-- 注册中心 --> <rpc:server id="consumer_itstack" host="127.0.0.1" port="6379"/> <rpc:consumer id="helloService" nozzle="org.itstack.demo.rpc.provider.export.HelloService" alias="itstackRpc"/></beans>
>ConsumerTest.java
@RunWith(SpringJUnit4ClassRunner.class)@ContextConfiguration("/spring-config.xml")public class ConsumerTest { @Resource(name = "helloService") private HelloService helloService; @Test public void test() { String hi = helloService.hi(); System.out.println("测试结果:" + hi); String say = helloService.say("hello world"); System.out.println("测试结果:" + say); Hi hiReq = new Hi(); hiReq.setUserName("付栈"); hiReq.setSayMsg("付可敌国,栈无不胜"); String hiRes = helloService.sayHi(hiReq); System.out.println("测试结果:" + hiRes); }}
应用,测试结果 测试时启动redis
启动ProviderTest Redis中的注册数据
redis 127.0.0.1:6379> srandmember org.itstack.demo.rpc.provider.export.HelloService_itstackRpc"{\"alias\":\"itstackRpc\",\"host\":\"10.13.81.104\",\"nozzle\":\"org.itstack.demo.rpc.provider.export.HelloService\",\"port\":22201,\"ref\":\"helloService\"}"redis 127.0.0.1:6379>
执行ConsumerTest中的单元测试方法
log4j:WARN No appenders could be found for logger (org.springframework.test.context.junit4.SpringJUnit4ClassRunner).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.测试结果:hi itstack rpc测试结果:hello world测试结果:付栈 say:付可敌国,栈无不胜Process finished with exit code 0
------------
版权声明: 本文为 InfoQ 作者【小傅哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/f374236367227cd6f5160d13d】。文章转载请联系作者。
小傅哥
公众号:bugstack虫洞栈 2019.04.03 加入
作者小傅哥多年从事一线互联网Java开发的学习历程技术汇总,旨在为大家提供一个清晰详细的学习教程,侧重点更倾向编写Java核心内容。如果能为您提供帮助,请给予支持(关注、点赞、分享)!
评论