Netty 是一款高性能的网络传输框架,作为基础通信组件被 RPC 框架广泛使用。例如Dubbo
协议中使用它进行节点间通信,Hadoop
中的Avro
组件使用它进行数据文件共享。那么我们就来尝试使用 Netty,实现一个简单的 RPC 框架。
首先我们先抽象出一个服务的 API 接口,服务提供者实现这个接口中的方法,服务消费者直接调用接口进行访问:
public interface TestService {
String test(String message);
}
复制代码
服务方实现该接口,供消费者调用:
public class TestServiceImpl implements TestService {
@Override
public String test(String message) {
System.out.println("Server has received:"+ message);
if (message !=null){
return "hi client, Server has Received:["+ message+"]";
}else{
return "empty message";
}
}
}
复制代码
然后我们开始使用 Netty 创建服务的 Server 端:
public class NettyServer {
public static void startServer(String hostname,int port){
EventLoopGroup bossGroup=new NioEventLoopGroup(1);
EventLoopGroup workerGroup=new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture future = serverBootstrap.bind(hostname, port).sync();
System.out.println("服务端启动");
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
复制代码
在创建 Server 端时,我们在ChannelPipeline
中添加了 Netty 自带的 String 类型的编码器与解码器,最后添加我们的业务逻辑处理的handler
。
类似于 Dubbo 在调用中使用了自己的 Dubbo 协议,我们在调用服务之前,也需要自定义我们的协议,如果接收到的消息不是按照我们定义的协议,则不予处理。这里定义一个简单的协议,来规定我们的消息的开头以什么开始:
public class Protocol {
public static final String HEADER="My#Protolcol#Header#";
}
复制代码
创建服务端的handler
,用于处理业务逻辑。新建一个类继承ChannelInboundHandlerAdapter
,通过channelRead
方法接收客户端发送的消息,在方法中判断消息是否以我们自定义的协议头开头,如果是则读取消息,并调用本地方法,最后通过writeAndFlush
返回调用的结果。
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg="+msg);
if(msg.toString().startsWith(Protocol.HEADER)){
String result = new TestServiceImpl().test(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
复制代码
至此,服务端我们就已经写完了,再开始写客户端。因为客户端的代码有一点特殊性,所以我们先写处理业务逻辑的NettyClientHandler
,之后再实现client
端的 Netty 初始化方法。
在handler
中,我们要使用多线程来调用服务端的服务,使用channelRead
接收服务端返回的结果,所以除了继承ChannelInboundHandlerAdapter
父类外,还要实现Callable
接口,并重写其中的call
方法。
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
//返回的结果
private String result;
//客户端调用方法时,传入的参数
private String param;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
//唤醒等待线程
notify();
}
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(param);
wait();
return result;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
public void setParam(String param) {
this.param = param;
}
}
复制代码
在上面的代码中,创建了变量context
用于存储当前handler
的ChannelHandlerContext
,这是为了在call
方法中使用该context
发送消息。与服务器的连接创建后,首先会执行channelActive
方法,给该context
赋值。
需要注意的是,call
方法和channelRead
方法的synchronized
关键字非常重要,在执行wait
方法的时候会释放锁,从而使channelRead
方法获取锁,在读取到服务端返回的消息后使用notify
唤醒call
方法的线程,返回结果。
说完了NettyClientHandler
,我们回过头来写 Netty 客户端的启动类NettyClient
。首先,我们创建一个线程池,用来在后面执行访问的请求,线程池的大小定义为我们的 cpu 可用线程数。
private static ExecutorService executor
= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
复制代码
因为客户端调用的是接口,需要使用代理模式创建代理对象,我们创建一个getProxy
方法用来获取代理对象并进行方法增强:
public Object getProxy(final Class<?> serviceClass, final String protocolHead) {
return Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (clientHandler == null) {
initClient();
}
clientHandler.setParam(protocolHead + args[0]);
return executor.submit(clientHandler).get();
}
});
}
复制代码
这里调用了线程池的submit
方法提交任务,调用handler
中的call
方法发送请求。上面的args[0]
是调用时的参数,initClient
方法用于初始化 Netty 的client
端,代码如下:
private static void initClient() {
clientHandler = new NettyClientHandler();
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(clientHandler);
}
}
);
bootstrap.connect("127.0.0.1", 7000).sync();
System.out.println("客户端启动");
} catch (Exception e) {
e.printStackTrace();
}
}
复制代码
NettyClient
端的ChannelPipeline
中同样添加了编码解码器,与我们自己实现的业务逻辑handler
。
至此,客户端与服务端的功能就完成了,我们创建启动类,先启动服务端:
public class ProviderBootstrap {
public static void main(String[] args) {
NettyServer.startServer("127.0.0.1",7000);
}
}
复制代码
再启动客户端:
public class ConsumerBootstrap {
public static void main(String[] args) {
NettyClient consumer = new NettyClient();
TestService proxy =(TestService) consumer.getProxy(TestService.class, Protocol.HEADER);
String result = proxy.test("hi,i am client");
System.out.println("result: "+result);
}
}
复制代码
最后看一下运行结果,先看服务提供者:
收到的消息以我们的协议开头,将协议头剔除后获得消息正文,作为 RPC 调用方法的参数,传递给请求的方法。再看服务消费者端:
接收到了服务提供端返回的信息。这样,一个简单的 RPC 框架就已经实现了。
如果文章对您有所帮助,欢迎关注公众号 码农参上
加号主好友,来围观朋友圈啊~
评论