netty 案例,netty4.1 高级应用篇二,手写 RPC 框架第二章《netty 通信》

用户头像
小傅哥
关注
发布于: 2020 年 08 月 22 日

案例介绍

在我们实现rpc框架的时候,需要选择socket的通信方式。而我们知道一般情况下socket通信类似与qq聊天,发过去消息,什么时候回复都可以。但是我们rpc框架通信,从感觉上类似http调用,需要在一定时间内返回,否则就会发生超时断开。



这里我们选择netty作为我们的socket框架,采用future方式进行通信。

>Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。



环境准备

1、jdk 1.8.0

2、IntelliJ IDEA Community Edition 2018.3.1 x64



代码示例

itstack-demo-rpc-02
└── src
└── main
│ └── java
│ └── org.itstack.demo.rpc.network
│ ├── client
│ │ ├── ClientSocket.java
│ │ └── MyClientHandler.java
│ ├── codec
│ │ ├── RpcDecoder.java
│ │ └── RpcEncoder.java
│ ├── future
│ │ ├── SyncWrite.java
│ │ ├── SyncWriteFuture.java
│ │ ├── SyncWriteMap.java
│ │ └── WriteFuture.java
│ ├── msg
│ │ ├── Request.java
│ │ └── Response.java
│ ├── server
│ │ ├── MyServerHandler.java
│ │ └── ServerSocket.java
│ └── util
│ └── SerializationUtil.java
└── test
└── java
└── org.itstack.demo.test
├── client
│ └── StartClient.java
└── server
└── StartServer.java




>ClientSocket.java



package org.itstack.demo.rpc.network.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.itstack.demo.rpc.network.codec.RpcDecoder;
import org.itstack.demo.rpc.network.codec.RpcEncoder;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ClientSocket implements Runnable {
private ChannelFuture future;
@Override
public void run() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RpcDecoder(Response.class),
new RpcEncoder(Request.class),
new MyClientHandler());
}
});
ChannelFuture f = b.connect("127.0.0.1", 7397).sync();
this.future = f;
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
public ChannelFuture getFuture() {
return future;
}
public void setFuture(ChannelFuture future) {
this.future = future;
}
}



>MyClientHandler.java



package org.itstack.demo.rpc.network.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.itstack.demo.rpc.network.future.SyncWriteFuture;
import org.itstack.demo.rpc.network.future.SyncWriteMap;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class MyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
Response msg = (Response) obj;
String requestId = msg.getRequestId();
SyncWriteFuture future = (SyncWriteFuture) SyncWriteMap.syncKey.get(requestId);
if (future != null) {
future.setResponse(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}



>RpcDecoder.java



package org.itstack.demo.rpc.network.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.itstack.demo.rpc.network.util.SerializationUtil;
import java.util.List;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
out.add(SerializationUtil.deserialize(data, genericClass));
}
}



>RpcEncoder.java



package org.itstack.demo.rpc.network.codec;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.itstack.demo.rpc.network.util.SerializationUtil;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RpcEncoder(Class<?> genericClass) {
this.genericClass = genericClass;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) {
if (genericClass.isInstance(in)) {
byte[] data = SerializationUtil.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}



>SyncWrite.java



package org.itstack.demo.rpc.network.future;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SyncWrite {
public Response writeAndSync(final Channel channel, final Request request, final long timeout) throws Exception {
if (channel == null) {
throw new NullPointerException("channel");
}
if (request == null) {
throw new NullPointerException("request");
}
if (timeout <= 0) {
throw new IllegalArgumentException("timeout <= 0");
}
String requestId = UUID.randomUUID().toString();
request.setRequestId(requestId);
WriteFuture<Response> future = new SyncWriteFuture(request.getRequestId());
SyncWriteMap.syncKey.put(request.getRequestId(), future);
Response response = doWriteAndSync(channel, request, timeout, future);
SyncWriteMap.syncKey.remove(request.getRequestId());
return response;
}
private Response doWriteAndSync(final Channel channel, final Request request, final long timeout, final WriteFuture<Response> writeFuture) throws Exception {
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
writeFuture.setWriteResult(future.isSuccess());
writeFuture.setCause(future.cause());
//失败移除
if (!writeFuture.isWriteSuccess()) {
SyncWriteMap.syncKey.remove(writeFuture.requestId());
}
}
});
Response response = writeFuture.get(timeout, TimeUnit.MILLISECONDS);
if (response == null) {
if (writeFuture.isTimeout()) {
throw new TimeoutException();
} else {
// write exception
throw new Exception(writeFuture.cause());
}
}
return response;
}
}



>SyncWriteFuture.java



package org.itstack.demo.rpc.network.future;
import org.itstack.demo.rpc.network.msg.Response;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SyncWriteFuture implements WriteFuture<Response> {
private CountDownLatch latch = new CountDownLatch(1);
private final long begin = System.currentTimeMillis();
private long timeout;
private Response response;
private final String requestId;
private boolean writeResult;
private Throwable cause;
private boolean isTimeout = false;
public SyncWriteFuture(String requestId) {
this.requestId = requestId;
}
public SyncWriteFuture(String requestId, long timeout) {
this.requestId = requestId;
this.timeout = timeout;
writeResult = true;
isTimeout = false;
}
public Throwable cause() {
return cause;
}
public void setCause(Throwable cause) {
this.cause = cause;
}
public boolean isWriteSuccess() {
return writeResult;
}
public void setWriteResult(boolean result) {
this.writeResult = result;
}
public String requestId() {
return requestId;
}
public Response response() {
return response;
}
public void setResponse(Response response) {
this.response = response;
latch.countDown();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return true;
}
public boolean isCancelled() {
return false;
}
public boolean isDone() {
return false;
}
public Response get() throws InterruptedException, ExecutionException {
latch.wait();
return response;
}
public Response get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (latch.await(timeout, unit)) {
return response;
}
return null;
}
public boolean isTimeout() {
if (isTimeout) {
return isTimeout;
}
return System.currentTimeMillis() - begin > timeout;
}
}



>SyncWriteMap.java



package org.itstack.demo.rpc.network.future;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SyncWriteMap {
public static Map<String, WriteFuture> syncKey = new ConcurrentHashMap<String, WriteFuture>();
}



>WriteFuture.java



package org.itstack.demo.rpc.network.future;
import org.itstack.demo.rpc.network.msg.Response;
import java.util.concurrent.Future;
public interface WriteFuture<T> extends Future<T> {
Throwable cause();
void setCause(Throwable cause);
boolean isWriteSuccess();
void setWriteResult(boolean result);
String requestId();
T response();
void setResponse(Response response);
boolean isTimeout();
}



>Request.java



package org.itstack.demo.rpc.network.msg;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class Request {
private String requestId;
private Object result;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}



>Response.java



package org.itstack.demo.rpc.network.msg;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class Response {
private String requestId;
private String param;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getParam() {
return param;
}
public void setParam(String param) {
this.param = param;
}
}



>MyServerHandler.java



package org.itstack.demo.rpc.network.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj){
Request msg = (Request) obj;
//反馈
Response request = new Response();
request.setRequestId(msg.getRequestId());
request.setParam(msg.getResult() + " 请求成功,反馈结果请接受处理。");
ctx.writeAndFlush(request);
//释放
ReferenceCountUtil.release(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
}



>ServerSocket.java



package org.itstack.demo.rpc.network.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.itstack.demo.rpc.network.codec.RpcDecoder;
import org.itstack.demo.rpc.network.codec.RpcEncoder;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class ServerSocket implements Runnable {
private ChannelFuture f;
@Override
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch){
ch.pipeline().addLast(
new RpcDecoder(Request.class),
new RpcEncoder(Response.class),
new MyServerHandler());
}
});
ChannelFuture f = null;
f = b.bind(7397).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}



>SerializationUtil.java



package org.itstack.demo.rpc.network.util;
import com.dyuproject.protostuff.LinkedBuffer;
import com.dyuproject.protostuff.ProtostuffIOUtil;
import com.dyuproject.protostuff.Schema;
import com.dyuproject.protostuff.runtime.RuntimeSchema;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Created by fuzhengwei1 on 2016/10/20.
*/
public class SerializationUtil {
private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap();
private static Objenesis objenesis = new ObjenesisStd();
private SerializationUtil() {
}
/**
* 序列化(对象 -> 字节数组)
*
* @param obj 对象
* @return 字节数组
*/
public static <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化(字节数组 -> 对象)
*
* @param data
* @param cls
* @param <T>
*/
public static <T> T deserialize(byte[] data, Class<T> cls) {
try {
T message = objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
cachedSchema.put(cls, schema);
}
return schema;
}
}



>StartClient.java



package org.itstack.demo.test.client;
import com.alibaba.fastjson.JSON;
import io.netty.channel.ChannelFuture;
import org.itstack.demo.rpc.network.client.ClientSocket;
import org.itstack.demo.rpc.network.future.SyncWrite;
import org.itstack.demo.rpc.network.msg.Request;
import org.itstack.demo.rpc.network.msg.Response;
/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class StartClient {
private static ChannelFuture future;
public static void main(String[] args) {
ClientSocket client = new ClientSocket();
new Thread(client).start();
while (true) {
try {
//获取future,线程有等待处理时间
if (null == future) {
future = client.getFuture();
Thread.sleep(500);
continue;
}
//构建发送参数
Request request = new Request();
request.setResult("查询用户信息");
SyncWrite s = new SyncWrite();
Response response = s.writeAndSync(future.channel(), request, 1000);
System.out.println("调用结果:" + JSON.toJSON(response));
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}



StartServer.java

package org.itstack.demo.test.server;

import org.itstack.demo.rpc.network.server.ServerSocket;

/**
* http://www.itstack.org
* create by fuzhengwei on 2019/5/6
*/
public class StartServer {

public static void main(String[] args) {
System.out.println("启动服务端开始");
new Thread(new ServerSocket()).start();
System.out.println("启动服务端完成");
}

}



测试结果



启动StartServer



启动服务端开始
启动服务端完成
log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.



启动StartClient



log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"3380f061-2501-49b5-998b-21b5956fe60a"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"81c51815-4d92-482c-bd05-e4b6dfa4d3b6"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7af01c4f-a438-47a1-b35c-8e2cd7e4a5e7"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"86e38bb1-eccc-4d45-b976-c3b67999e3ab"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"7f72002c-3b38-43d9-8452-db8797298899"}
调用结果:{"param":"查询用户信息 请求成功,反馈结果请接受处理。","requestId":"d566a7d4-4b0d-426b-8c09-c535ccf8eb09"}
...



------------