netty 案例,netty4.1 高级应用篇二,手写 RPC 框架第二章《netty 通信》
案例介绍
在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。
这里我们选择netty作为我们的socket框架,采用future方式进行通信。
>Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。
环境准备
1、jdk 1.8.0
2、IntelliJ IDEA Community Edition 2018.3.1 x64
代码示例
itstack-demo-rpc-02└── src └── main │ └── java │ └── org.itstack.demo.rpc.network │ ├── client │ │ ├── ClientSocket.java │ │ └── MyClientHandler.java │ ├── codec │ │ ├── RpcDecoder.java │ │ └── RpcEncoder.java │ ├── future │ │ ├── SyncWrite.java │ │ ├── SyncWriteFuture.java │ │ ├── SyncWriteMap.java │ │ └── WriteFuture.java │ ├── msg │ │ ├── Request.java │ │ └── Response.java │ ├── server │ │ ├── MyServerHandler.java │ │ └── ServerSocket.java │ └── util │ └── SerializationUtil.java └── test └── java └── org.itstack.demo.test ├── client │ └── StartClient.java └── server └── StartServer.java
>ClientSocket.java
package org.itstack.demo.rpc.network.client;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import org.itstack.demo.rpc.network.codec.RpcDecoder;import org.itstack.demo.rpc.network.codec.RpcEncoder;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class ClientSocket implements Runnable { private ChannelFuture future; @Override public void run() { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.AUTO_READ, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new RpcDecoder(Response.class), new RpcEncoder(Request.class), new MyClientHandler()); } }); ChannelFuture f = b.connect("127.0.0.1", 7397).sync(); this.future = f; f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } public ChannelFuture getFuture() { return future; } public void setFuture(ChannelFuture future) { this.future = future; }}
>MyClientHandler.java
package org.itstack.demo.rpc.network.client;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import org.itstack.demo.rpc.network.future.SyncWriteFuture;import org.itstack.demo.rpc.network.future.SyncWriteMap;import org.itstack.demo.rpc.network.msg.Response;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class MyClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception { Response msg = (Response) obj; String requestId = msg.getRequestId(); SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId); if (future != null) { future.setResponse(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); }}
>RpcDecoder.java
package org.itstack.demo.rpc.network.codec;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.ByteToMessageDecoder;import org.itstack.demo.rpc.network.util.SerializationUtil;import java.util.List;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); out.add(SerializationUtil.deserialize(data, genericClass)); }}
>RpcEncoder.java
package org.itstack.demo.rpc.network.codec;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToByteEncoder;import org.itstack.demo.rpc.network.util.SerializationUtil;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serialize(in); out.writeInt(data.length); out.writeBytes(data); } }}
>SyncWrite.java
package org.itstack.demo.rpc.network.future;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;import java.util.UUID;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class SyncWrite { public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception { if (channel == null) { throw new NullPointerException("channel"); } if (request == null) { throw new NullPointerException("request"); } if (timeout <= 0) { throw new IllegalArgumentException("timeout <= 0"); } String requestId = UUID.randomUUID().toString(); request.setRequestId(requestId); WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId()); SyncWriteMap.syncKey.put(request.getRequestId(), future); Response response = doWriteAndSync(channel, request, timeout, future); SyncWriteMap.syncKey.remove(request.getRequestId()); return response; } private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { writeFuture.setWriteResult(future.isSuccess()); writeFuture.setCause(future.cause()); //失败移除 if (!writeFuture.isWriteSuccess()) { SyncWriteMap.syncKey.remove(writeFuture.requestId()); } } }); Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS); if (response == null) { if (writeFuture.isTimeout()) { throw new TimeoutException(); } else { // write exception throw new Exception(writeFuture.cause()); } } return response; }}
>SyncWriteFuture.java
package org.itstack.demo.rpc.network.future;import org.itstack.demo.rpc.network.msg.Response;import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class SyncWriteFuture implements WriteFuture<Response> { private CountDownLatch latch = new CountDownLatch(1); private final long begin = System.currentTimeMillis(); private long timeout; private Response response; private final String requestId; private boolean writeResult; private Throwable cause; private boolean isTimeout = false; public SyncWriteFuture(String requestId) { this.requestId = requestId; } public SyncWriteFuture(String requestId, long timeout) { this.requestId = requestId; this.timeout = timeout; writeResult = true; isTimeout = false; } public Throwable cause() { return cause; } public void setCause(Throwable cause) { this.cause = cause; } public boolean isWriteSuccess() { return writeResult; } public void setWriteResult(boolean result) { this.writeResult = result; } public String requestId() { return requestId; } public Response response() { return response; } public void setResponse(Response response) { this.response = response; latch.countDown(); } public boolean cancel(boolean mayInterruptIfRunning) { return true; } public boolean isCancelled() { return false; } public boolean isDone() { return false; } public Response get() throws InterruptedException, ExecutionException { latch.wait(); return response; } public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (latch.await(timeout, unit)) { return response; } return null; } public boolean isTimeout() { if (isTimeout) { return isTimeout; } return System.currentTimeMillis() - begin > timeout; }}
>SyncWriteMap.java
package org.itstack.demo.rpc.network.future;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;public class SyncWriteMap { public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>();}
>WriteFuture.java
package org.itstack.demo.rpc.network.future;import org.itstack.demo.rpc.network.msg.Response;import java.util.concurrent.Future;public interface WriteFuture<T> extends Future<T> { Throwable cause(); void setCause(Throwable cause); boolean isWriteSuccess(); void setWriteResult(boolean result); String requestId(); T response(); void setResponse(Response response); boolean isTimeout();}
>Request.java
package org.itstack.demo.rpc.network.msg;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class Request { private String requestId; private Object result; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; }}
>Response.java
package org.itstack.demo.rpc.network.msg;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class Response { private String requestId; private String param; public String getRequestId() { return requestId; } public void setRequestId(String requestId) { this.requestId = requestId; } public String getParam() { return param; } public void setParam(String param) { this.param = param; }}
>MyServerHandler.java
package org.itstack.demo.rpc.network.server;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.util.ReferenceCountUtil;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class MyServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object obj){ Request msg = (Request) obj; //反馈 Response request = new Response(); request.setRequestId(msg.getRequestId()); request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理。"); ctx.writeAndFlush(request); //释放 ReferenceCountUtil.release(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); }}
>ServerSocket.java
package org.itstack.demo.rpc.network.server;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelOption;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import org.itstack.demo.rpc.network.codec.RpcDecoder;import org.itstack.demo.rpc.network.codec.RpcEncoder;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class ServerSocket implements Runnable { private ChannelFuture f; @Override public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch){ ch.pipeline().addLast( new RpcDecoder(Request.class), new RpcEncoder(Response.class), new MyServerHandler()); } }); ChannelFuture f = null; f = b.bind(7397).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }}
>SerializationUtil.java
package org.itstack.demo.rpc.network.util;import com.dyuproject.protostuff.LinkedBuffer;import com.dyuproject.protostuff.ProtostuffIOUtil;import com.dyuproject.protostuff.Schema;import com.dyuproject.protostuff.runtime.RuntimeSchema;import org.objenesis.Objenesis;import org.objenesis.ObjenesisStd;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;/** * Created by fuzhengwei1 on 2016/10/20. */public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap(); private static Objenesis objenesis = new ObjenesisStd(); private SerializationUtil() { } /** * 序列化(对象 -> 字节数组) * * @param obj 对象 * @return 字节数组 */ public static <T> byte[] serialize(T obj) { Class<T> cls = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(cls); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化(字节数组 -> 对象) * * @param data * @param cls * @param <T> */ public static <T> T deserialize(byte[] data, Class<T> cls) { try { T message = objenesis.newInstance(cls); Schema<T> schema = getSchema(cls); ProtostuffIOUtil.mergeFrom(data, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } private static <T> Schema<T> getSchema(Class<T> cls) { Schema<T> schema = (Schema<T>) cachedSchema.get(cls); if (schema == null) { schema = RuntimeSchema.createFrom(cls); cachedSchema.put(cls, schema); } return schema; }}
>StartClient.java
package org.itstack.demo.test.client;import com.alibaba.fastjson.JSON;import io.netty.channel.ChannelFuture;import org.itstack.demo.rpc.network.client.ClientSocket;import org.itstack.demo.rpc.network.future.SyncWrite;import org.itstack.demo.rpc.network.msg.Request;import org.itstack.demo.rpc.network.msg.Response;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class StartClient { private static ChannelFuture future; public static void main(String[] args) { ClientSocket client = new ClientSocket(); new Thread(client).start(); while (true) { try { //获取future,线程有等待处理时间 if (null == future) { future = client.getFuture(); Thread.sleep(500); continue; } //构建发送参数 Request request = new Request(); request.setResult("查询用户信息"); SyncWrite s = new SyncWrite(); Response response = s.writeAndSync(future.channel(), request, 1000); System.out.println("调用结果:" + JSON.toJSON(response)); Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } } }}
StartServer.java
package org.itstack.demo.test.server;import org.itstack.demo.rpc.network.server.ServerSocket;/** * http://www.itstack.org * create by fuzhengwei on 2019/5/6 */public class StartServer { public static void main(String[] args) { System.out.println("启动服务端开始"); new Thread(new ServerSocket()).start(); System.out.println("启动服务端完成"); }}
测试结果
启动StartServer
启动服务端开始启动服务端完成log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
启动StartClient
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).log4j:WARN Please initialize the log4j system properly.log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"}调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"}调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"}调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"}调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"}调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"}...
------------
版权声明: 本文为 InfoQ 作者【小傅哥】的原创文章。
原文链接:【http://xie.infoq.cn/article/59c7888e8fc9a2af9fbf9aced】。文章转载请联系作者。
小傅哥
沉淀、分享、成长,让自己和他人都有所收获 2019.04.03 加入
作者小傅哥多年从事一线互联网Java开发的学习历程技术汇总,旨在为大家提供一个清晰详细的学习教程,侧重点更倾向编写Java核心内容。如果能为您提供帮助,请给予支持(关注、点赞、分享)!
评论