Netty 基础—Netty 实现私有协议栈
- 2025-03-19 福建
本文字数:31852 字
阅读完需:约 105 分钟
1.私有协议介绍
(1)什么是私有协议
跨节点的远程服务调用(跨节点通信),除了链路层的物理连接外,还需要对请求和响应消息进行编解码。在请求和应答消息本身以外,也需要携带一些其他控制和管理类指令。例如链路建立的握手请求和响应消息、链路检测的心跳消息等。当这些功能组合到一起后就会形成私有协议。私有协议并没有标准的定义,只要是能够用于跨进程、跨主机数据交换的非标准协议,都可以称为私有协议。
(2)公有协议与私有协议
公有协议是类似于 HTTP、WebSocket 这样公开的一套协议,所有人都可以基于这个协议来进行通信。
私有协议是类似于 RPC 这样的自定义的通信协议,需要自定义请求消息应包含哪些内容和响应消息应包含哪些内容。
偏向于中间件类的软件产品,内部的各节点之间通常使用私有协议进行通信。Java Web 系统、业务系统内部之间的通信,则通常使用 Dubbo 这种 RPC 框架进行通信。Dubbo 有一套 Dubbo 协议,业务系统之间的通信直接使用 Dubbo 协议即可,无须再自定义协议。对于其他的比如自研微服务治理平台,那么其内部通信可能就需要自定义一套私有协议。
(3)使用 Netty 定制私有协议
绝大多数的私有协议的传输层都是基于 TCP/IP 协议,所以利用 Netty 的 NIO TCP 协议栈可以非常方便地进行私有协议的定制和开发。
下面使用 Netty 提供的异步 TCP 协议栈开发一个私有协议栈。这个私有协议栈可用于内部各模块之间的通信,它基于 TCP/IP 协议栈,是一个类 HTTP 协议的应用层协议栈。相比于传统的标准协议栈,Netty 协议栈更加轻巧、灵活。
建立在 TCP 协议之上的应用层公有协议有:HTTP 协议。TCP 连接建立后,发送的请求数据时带的数据会按照 HTTP 协议来组织,返回的响应数据也会按照 HTTP 协议来组织。
(4)私有协议栈的基本功能
这个私有协议栈可用于承载业务内部各模块之间的消息交互和服务调用,功能有:
一.提供高性能的异步通信能力(基于 Netty)
二.提供消息的编码解码能力
三.提供 IP 白名单认证机制
四.提供链路的保活探测机制
五.提供链路的断连重连机制
2.私有协议栈的通信模型
一.客户端发送握手请求消息
二.服务端对握手请求消息进行合法性校验
三.链路建立成功后,客户端发送业务消息
四.链路建立成功后,服务端发送心跳消息
五.链路建立成功后,客户端发送心跳消息
六.链路建立成功后,服务端发送业务消息

说明一:Netty 协议栈的通信双方在链路建立成功后,双方可进行全双工通信。无论客户端还是服务端,都可以主动发送请求消息给对方,通信方可用是 TWO WAY 或者 ONE WAY。
说明二:双方之间的心跳采用 Ping-Pong 机制。当链路处于空闲时(即链路已经长时间没有通信),客户端主动发送 Ping 消息给服务端,服务端收到 Ping 消息后发送应答消息(即 Pong 消息)给客户端。
说明三:如果客户端连续发送 N 条 Ping 消息都没有收到服务端返回的 Pong 消息,说明链路断开或服务端异常。此时客户端会主动关闭连接,间隔周期 T 之后再发起重连操作,直到重连成功。
3.私有协议栈的消息定义
任何一个自定义协议,都必须有消息头 Header + 消息体 Body。消息头 Header 里会存放一些消息的元数据,消息体 Body 里会存放完整的请求体数据。
消息头 Header 里可以放:
一.crcCode(32 位的 int 型)
CRC 检验码由三部分组成:第一部分是 2 个字节的固定值,表明这是某协议的消息。第二部分是 1 字节的消息主版本号,第三部分是 1 字节的消息次版本号。
二.length(32 位的 int 型)
这是整个消息的消息长度:包括消息头 Header + 消息体 Body。
三.SessionId(64 位的 long 型)
建立 TCP 长连接后,这个长连接就是一个会话,会话 ID 在集群节点内全局唯一。
四.type(8 位 Byte 型)
表示当前消息的类型。比如 0 是业务请求消息、1 是业务响应消息、2 是业务 ONE WAY 消息(既是请求又是响应消息)、3 是握手请求消息、4 是握手响应消息、5 是心跳请求消息、6 是心跳应答消息。
4.私有协议栈链路的建立
如果 A 节点需要调用 B 节点的服务,但是 A 和 B 之间还没有建立物理层链路,即 Netty 的 connect()方法触发的 TCP 的三次握手。那么会由调用方主动发起连接,此时调用方为客户端,被调用方为服务端。
建立好物理层链路后,还需要建立应用层链路。此时客户端会发送握手请求消息给服务端,服务端收到握手请求消息后,如果通过 IP 白名单校验,则返回握手成功应答消息给客户端,应用层链路建立成功。
之后,客户端和服务端就可以互相发送业务消息了。当然,服务端还要处理握手请求超时的情况。
5.私有协议栈链路的关闭
由于客户端和服务端采用长连接进行通信,而且在正常的业务运行期间,双方通过心跳和业务消息来维持链路,所以任何一方都不需要主动关闭连接。
但在以下情况下,客户端和服务端需要关闭连接:
一.一方宕机或重启
二.消息读写 IO 异常
三.心跳消息读写 IO 异常
四.心跳超时
五.编码异常
6.私有协议栈的心跳机制
心跳机制用于解决网络超时、闪断、对方进程僵死或者处理缓慢等情况。客户端和服务端会在网络空闲时采用心跳机制来检测链路的互通性,一旦发现网络故障,立即关闭链路、主动重连。
说明一:每隔时间周期 T 如 10 秒,客户端和服务端各自做一次心跳检查。
说明二:当其中一方发现网络处于空闲状态的持续时间已经达到 t 比如 1 分钟时,那么就主动发送 Ping 心跳消息给另一方。
说明三:如果在下一个周期 T 到来时,一方没有收到另一方发送的 Pong 心跳应答消息或者业务消息,则心跳失败计数器 + 1。
说明四:每当一方接收到另一方的 Pong 心跳应答消息或者业务消息时,则将心跳失败计数器清零。连续 N 次没有收到另一方的 Pong 心跳应答消息或者业务消息时,则关闭链路。如果是客户端关闭了链路,那么在间隔 INTERVAL 时间后再发起重连操作。
Ping-Pong 双向心跳机制:客户端和服务端共用一个 KeepAliveHandler,并各自启动心跳线程。
7.私有协议栈的重连机制
如果链路中断,客户端等待 INTERVAL 时间后会发起重连操作。如果重连失败,间隔 INTERVAL 时间后再次发起重连,直到重连成功。
为了保证服务端能有充足的时间释放句柄资源,在首次断连时客户端需要等待 INTERVAL 时间后再发起重连,而不是失败后就立即重连。重连失败时,客户端也要保证自身资源能被及时释放。
8.私有协议栈的重复登录保护
当客户端握手成功后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致服务端的句柄资源被耗尽。
服务端接收到客户端的握手请求消息后,首先对 IP 地址进行合法性检验。如果检验成功,则在会话缓存中查看客户端是否已经登录。如果已经登录,则拒绝重复登录,同时关闭 TCP 链路。
客户端收到握手失败的消息后,要关闭客户端的 TCP 连接,等待 INTERVAL 时间后,再发起 TCP 连接,直到认证成功。
当服务端连续 N 次心跳超时后需要主动关闭链路,同时还要清空该客户端的会话缓存信息,以保证可重连成功。
9.私有协议栈核心的 ChannelHandler
主要五种 ChannelHandler:
一.Packet 数据包编码器 PacketEncoder
二.Packet 数据包解码器 PacketDecoder
三.握手机制处理 HandShakeHandler
四.会话 ID 管理 SessionIdHandler
五.链路保活处理 KeepAliveHandler
ch.pipeline()
.addLast(new PacketEncoder())
.addLast(new PacketDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH))
.addLast(new SessionIdHandler(false))
.addLast(new HandshakeHandler(NettyMode.SERVER.value()))
.addLast(new KeepAliveHandler());
SessionIdHandler 会话 ID 处理器(重复登录保护)在 pipeline 链条中排第三。
HandshakeHandler 握手处理器(握手超时检查)在 pipeline 链条排第四。
KeepAliveHandler 链路保活处理器(心跳检查机制)在 pipeline 链条排第五。
10.私有协议栈的客户端和服务端
(1)可断网重连的客户端
public class NettyClient {
private static final Logger logger = LogManager.getLogger(NettyClient.class);
private static final int SCHEDULED_THREAD_NUM = 1;
private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 4;
private static final int LENGTH_FIELD_LENGTH = 4;
private ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(SCHEDULED_THREAD_NUM);
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
public void connect(final String host, final int port) throws InterruptedException {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new PacketEncoder())
.addLast(new PacketDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH))
.addLast(new SessionIdHandler(true))
.addLast(new HandshakeHandler(NettyMode.CLIENT.value()))
.addLast(new KeepAliveHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(new InetSocketAddress(host, port)).sync();
channelFuture.channel().closeFuture().sync();//阻塞等待直到连接被关闭
} finally {
//如果连接被关闭了,上面的closeFuture().sync()就不会再进行阻塞,于是下面的代码就会被执行进行重连
threadPool.execute(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(5);
try {
//递归调用
connect(host, port);
} catch(InterruptedException e) {
logger.error("Netty client connect error.");
}
} catch (InterruptedException e) {
logger.error("Socket channel close thread interrupted exception.");
}
}
});
}
}
}
(2)私有协议栈的服务端
public class NettyServer {
private static final int MAX_FRAME_LENGTH = 1024 * 1024;
private static final int LENGTH_FIELD_OFFSET = 4;
private static final int LENGTH_FIELD_LENGTH = 4;
public void bind(int port) throws Exception {
EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new PacketEncoder())
.addLast(new PacketDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH))
.addLast(new SessionIdHandler(false))
.addLast(new HandshakeHandler(NettyMode.SERVER.value()))
.addLast(new KeepAliveHandler());
}
});
serverBootstrap.bind(port).sync();
}
}
11.私有协议栈的 Packet 数据包与编解码
(1)Packet 数据包的数据结构
//数据包
public class Packet {
private Header header = new Header();
private Body body = new Body();
class Header {
//4个字节检验值
private int crc;
//完整数据包的字节数量的大小
private int size;
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extend = new HashMap<String, Object>();
}
class Body {
}
}
(2)从数据读取角度完善 Packet 数据包
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private Header header = new Header();
private Body body = new Body();
class Header {
//4个字节检验值:magic number + major version + minor version
private int crc;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize;
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头header字节数量的大小
private int extendHeaderSize;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//body的字节数量的大小
private int bodySize;
}
class Body {
private byte[] bytes;
}
}
(3)Packet 数据包的编码器实现
一.序列化工具使用 Hessian
public class SerializeUtils {
public static byte[] serialize(Object object) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
HessianOutput hessianOutput = new HessianOutput(byteArrayOutputStream);
hessianOutput.writeObject(object);
byte[] bytes = byteArrayOutputStream.toByteArray();
return bytes;
}
public static Object deserialize(byte[] bytes, Class clazz) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
HessianInput hessianInput = new HessianInput(byteArrayInputStream);
Object object = hessianInput.readObject(clazz);
return object;
}
}
二.封装 Packet 数据包时需要进行序列化和字节大小计算
//Packet数据包编码器
public class PacketEncoder extends MessageToMessageEncoder<Packet> {
protected void encode(ChannelHandlerContext ctx, Packet msg, List<Object> out) throws Exception {
Packet packet = msg;
if (packet == null || packet.getHeader() == null) {
throw new ProtocolException("packet or header is null.");
}
//分配一块不在内存池子里的临时内存块,底层就是一个byte[]字节数组
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(packet.getHeader().getCrc());
byteBuf.writeInt(packet.getPacketSize());
byteBuf.writeLong(packet.getHeader().getSessionId());
byteBuf.writeByte(packet.getHeader().getType());
byteBuf.writeByte(packet.getHeader().getLevel());
byteBuf.writeInt(packet.getHeader().getExtendHeaders().length);
byteBuf.writeBytes(packet.getHeader().getExtendHeaders());
byteBuf.writeInt(packet.getBody().length);
byteBuf.writeBytes(packet.getBody());
out.add(byteBuf);
}
}
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private static final int ZERO_BYTES = 0;
private static final int SINGLE_BYTES = 1;
private static final int INT_BYTES = 4;
private static final int LONG_BYTES = 8;
private Header header = new Header();
private Object body;
private byte[] bodyBytes;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize = ZERO_BYTES;
class Header {
//4个字节检验值:magic number + major version + minor version
private int crc = 0xabef0101;
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//扩展消息头序列化后的字节数组
private byte[] extendHeadersBytes;
public int getCrc() { return crc; }
public void setCrc(int crc) { this.crc = crc; }
public long getSessionId() { return sessionId; }
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
packetSize += LONG_BYTES;
}
public byte getType() { return type; }
public void setType(byte type) {
this.type = type;
packetSize += SINGLE_BYTES;
}
public byte getLevel() { return level; }
public void setLevel(byte level) {
this.level = level;
packetSize += SINGLE_BYTES;
}
public byte[] getExtendHeaders() {
return extendHeadersBytes;
}
public void setExtendHeaders(Map<String, Object> extendHeaders) throws IOException {
this.extendHeaders = extendHeaders;
this.extendHeadersBytes = SerializeUtils.serialize(extendHeaders);
packetSize += INT_BYTES;
packetSize += this.extendHeadersBytes.length;
}
}
class Body {
private byte[] bytes;
}
public int getPacketSize() { return packetSize; }
public void setPacketSize(int packetSize) { this.packetSize = packetSize; }
public Header getHeader() { return header; }
public void setHeader(Header header) { this.header = header; }
public byte[] getBody() { return bodyBytes; }
public void setBody(byte[] body) throws IOException {
this.body = body;
this.bodyBytes = SerializeUtils.serialize(body);
packetSize += INT_BYTES;
packetSize += this.bodyBytes.length;
}
}
(4)基于 builder 设计模式重构 Packet 数据包
Builder 设计模式:Builder 是一个静态内部类、会 new 一个 Packet 实例对象。Builder 类中除了 build()方法返回实例对象,其他方法都是设置实例对象并返回 Builder。此 Packet 类还有一个静态方法 builder()返回一个静态内部类 Builder 的实例对象。
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private static final int ZERO_BYTES = 0;
private static final int SESSION_ID_BYTES = 8;
private static final int TYPE_BYTES = 1;
private static final int LEVEL_BYTES = 1;
private static final int EXTEND_HEADERS_SIZE_BYTES = 4;
private static final int BODY_SIZE_BYTES = 4;
private Header header = new Header();
private Object body = new Body();
private byte[] bodyBytes;
//4个字节检验值:magic number + major version + minor version
private int crc = 0xabef0101;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize = ZERO_BYTES;
class Header {
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//扩展消息头序列化后的字节数组
private byte[] extendHeadersBytes;
public long getSessionId() { return sessionId; }
public void setSessionId(long sessionId) { this.sessionId = sessionId; }
public byte getType() { return type; }
public void setType(byte type) { this.type = type; }
public byte getLevel() { return level; }
public void setLevel(byte level) { this.level = level; }
public Map<String, Object> getExtendHeaders() { return extendHeaders; }
public void setExtendHeaders(Map<String, Object> extendHeaders) { this.extendHeaders = extendHeaders; }
public byte[] getExtendHeadersBytes() { return extendHeadersBytes; }
public void setExtendHeadersBytes(byte[] extendHeadersBytes) { this.extendHeadersBytes = extendHeadersBytes; }
}
class Body {
private byte[] bytes;
}
public int getCrc() { return crc; }
public void setCrc(int crc) { this.crc = crc; }
public int getPacketSize() { return packetSize; }
public void setPacketSize(int packetSize) { this.packetSize = packetSize; }
public Header getHeader() { return header; }
public void setHeader(Header header) { this.header = header; }
public Object getBody() { return body; }
public void setBody(Object body) { this.body = body; }
public byte[] getBodyBytes() { return bodyBytes; }
public void setBodyBytes(byte[] bodyBytes) { this.bodyBytes = bodyBytes; }
static class Builder {
private Packet packet = new Packet();
public Builder sessionId(long sessionId) { packet.getHeader().setSessionId(sessionId); return this; }
public Builder type(byte type) { packet.getHeader().setType(type); return this; }
public Builder level(byte level) { packet.getHeader().setLevel(level); return this; }
public Builder extendHeader(String key, Object val) { packet.getHeader().getExtendHeaders().put(key, val); return this; }
public Builder body(Object body) { packet.setBody(body); return this; }
public Packet build() throws IOException {
byte[] extendHeadersBytes = SerializeUtils.serialize(packet.getHeader().getExtendHeaders());
packet.getHeader().setExtendHeadersBytes(extendHeadersBytes);
byte[] bodyBytes = SerializeUtils.serialize(packet.getBody());
packet.setBodyBytes(bodyBytes);
int packageSize = ZERO_BYTES + SESSION_ID_BYTES + TYPE_BYTES + LEVEL_BYTES
+ EXTEND_HEADERS_SIZE_BYTES + packet.getHeader().getExtendHeadersBytes().length
+ BODY_SIZE_BYTES + packet.getBodyBytes().length;
packet.setPacketSize(packageSize);
return packet;
}
}
public static Builder builder() {
return new Builder();
}
}
//数据包编码器
public class PacketEncoder extends MessageToMessageEncoder<Packet> {
protected void encode(ChannelHandlerContext ctx, Packet msg, List<Object> out) throws Exception {
Packet packet = msg;
if (packet == null || packet.getHeader() == null) {
throw new ProtocolException("packet or header is null.");
}
//分配一块不在内存池子里的临时内存块,底层就是一个byte[]字节数组
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeInt(packet.getCrc());
byteBuf.writeInt(packet.getPacketSize());
byteBuf.writeLong(packet.getHeader().getSessionId());
byteBuf.writeByte(packet.getHeader().getType());
byteBuf.writeByte(packet.getHeader().getLevel());
byteBuf.writeInt(packet.getHeader().getExtendHeadersBytes().length);
byteBuf.writeBytes(packet.getHeader().getExtendHeadersBytes());
byteBuf.writeInt(packet.getBodyBytes().length);
byteBuf.writeBytes(packet.getBodyBytes());
out.add(byteBuf);
}
}
(5)Packet 数据包的解码器实现
一.解码器解码扩展头字段
//数据包解码器
public class PacketDecoder extends LengthFieldBasedFrameDecoder {
//解码器会负责粘包和拆包的处理,确保拿到的ByteBuf是一段完整的协议对象字节数据
//frame是一个数据帧,就是一个完整的协议对象字节数据
//maxFrameLength,设定一下这个协议对象的帧数据的最大的大小
//lengthFieldOffset,代表帧数据有多少自己的field,它的offset是哪一位
//从offset那个位置开始读,读lengthFieldLength个字节数量,就可以了
public PacketDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);//利用父类实现粘包拆包
if (frame == null) {
return null;
}
Packet packet = Packet.builder()
.crc(frame.readInt())
.packetSize(frame.readInt())
.sessionId(frame.readLong())
.type(frame.readByte())
.level(frame.readByte())
.extendHeaders(decodeExtendHeaders(frame))
.body(decodeBody(frame))
.build();
return packet;
}
private Map<String, Object> decodeExtendHeaders(ByteBuf in) throws IOException {
int extendHeadersBytesSize = in.readInt();
byte[] extendHeadersBytes = new byte[extendHeadersBytesSize];
in.readBytes(extendHeadersBytes);
Map<String, Object> extendHeaders = (Map<String, Object>)SerializeUtils.deserialize(extendHeadersBytes, Map.class);
return extendHeaders;
}
private Object decodeBody(ByteBuf in) throws IOException, ClassNotFoundException {
...
}
}
二.解码器解码 PacketBody 对象
首先给 Packet 数据包增加 bodyClazz 字段表明 body 的类名:
//数据包
//encode编码:把这种自定义协议对象转换为字节数组
//decode解码:拿到一段完整的字节数组之后,解决粘包拆包问题,然后把完整字节数组还原自定义协议对象
public class Packet {
private static final int ZERO_BYTES = 0;
private static final int SESSION_ID_BYTES = 8;
private static final int TYPE_BYTES = 1;
private static final int LEVEL_BYTES = 1;
private static final int EXTEND_HEADERS_SIZE_BYTES = 4;
private static final int BODY_SIZE_BYTES = 4;
private static final int BODY_CLAZZ_SIZE_BYTES = 4;
private Header header = new Header();
private Object body = new Body();
private byte[] bodyBytes;
private String bodyClazz;
//4个字节检验值:magic number + major version + minor version
private int crc = 0xabef0101;
//完整header+body大小,解决粘包拆包问题,4个字节大小
private int packetSize = ZERO_BYTES;
class Header {
//节点之间建立的会话的id,物理链路一旦建立好了以后,就必然会分配一个唯一的sessionId,8个字节大小
private long sessionId;
//数据包类型,区分包是个什么,握手请求包,握手应答包
private byte type;
//数据包重要级别
private byte level;
//扩展消息头
private Map<String, Object> extendHeaders = new HashMap<String, Object>();
//扩展消息头序列化后的字节数组
private byte[] extendHeadersBytes;
public long getSessionId() { return sessionId; }
public void setSessionId(long sessionId) { this.sessionId = sessionId; }
public byte getType() { return type; }
public void setType(byte type) { this.type = type; }
public byte getLevel() { return level; }
public void setLevel(byte level) { this.level = level; }
public Map<String, Object> getExtendHeaders() { return extendHeaders; }
public void setExtendHeaders(Map<String, Object> extendHeaders) { this.extendHeaders = extendHeaders; }
public byte[] getExtendHeadersBytes() { return extendHeadersBytes; }
public void setExtendHeadersBytes(byte[] extendHeadersBytes) { this.extendHeadersBytes = extendHeadersBytes; }
}
class Body {
private byte[] bytes;
}
public int getCrc() { return crc; }
public void setCrc(int crc) { this.crc = crc; }
public int getPacketSize() { return packetSize; }
public void setPacketSize(int packetSize) { this.packetSize = packetSize; }
public Header getHeader() { return header; }
public void setHeader(Header header) { this.header = header; }
public Object getBody() { return body; }
public void setBody(Object body) { this.body = body; }
public byte[] getBodyBytes() { return bodyBytes; }
public void setBodyBytes(byte[] bodyBytes) { this.bodyBytes = bodyBytes; }
public String getBodyClazz() { return bodyClazz; }
public void setBodyClazz(String bodyClazz) { this.bodyClazz = bodyClazz; }
static class Builder {
private Packet packet = new Packet();
public Builder crc(int crc) { packet.setCrc(crc); return this; }
public Builder packetSize(int packetSize) { packet.setPacketSize(packetSize); return this; }
public Builder sessionId(long sessionId) { packet.getHeader().setSessionId(sessionId); return this; }
public Builder type(byte type) { packet.getHeader().setType(type); return this; }
public Builder level(byte level) { packet.getHeader().setLevel(level); return this; }
public Builder extendHeader(String key, Object val) { packet.getHeader().getExtendHeaders().put(key, val); return this; }
public Builder extendHeaders(Map<String, Object> extendHeaders) { packet.getHeader().setExtendHeaders(extendHeaders); return this; }
public Builder body(Object body) { packet.setBody(body); return this; }
public Packet build() throws IOException {
if (packet.getBody() == null) {
throw new ProtocolException("packet body is null.");
}
if (packet.getPacketSize() > 0) {
return packet;
}
byte[] extendHeadersBytes = SerializeUtils.serialize(packet.getHeader().getExtendHeaders());
packet.getHeader().setExtendHeadersBytes(extendHeadersBytes);
byte[] bodyBytes = SerializeUtils.serialize(packet.getBody());
packet.setBodyBytes(bodyBytes);
packet.setBodyClazz(packet.getBody().getClass().getName());
int packageSize = ZERO_BYTES + SESSION_ID_BYTES + TYPE_BYTES + LEVEL_BYTES
+ EXTEND_HEADERS_SIZE_BYTES + packet.getHeader().getExtendHeadersBytes().length
+ BODY_SIZE_BYTES + packet.getBodyBytes().length
+ BODY_CLAZZ_SIZE_BYTES + packet.getBodyClazz().getBytes().length;
packet.setPacketSize(packageSize);
return packet;
}
}
public static Builder builder() {
return new Builder();
}
}
下面是对字节数组 body 部分进行解码的逻辑:
//数据包解码器
public class PacketDecoder extends LengthFieldBasedFrameDecoder {
//解码器会负责粘包和拆包的处理,确保拿到的ByteBuf是一段完整的协议对象字节数据
//frame是一个数据帧,就是一个完整的协议对象字节数据
//maxFrameLength,设定一下这个协议对象的帧数据的最大的大小
//lengthFieldOffset,代表帧数据有多少自己的field,它的offset是哪一位
//从offset那个位置开始读,读lengthFieldLength个字节数量,就可以了
public PacketDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, in);
if (frame == null) {
return null;
}
Packet packet = Packet.builder()
.crc(frame.readInt())
.packetSize(frame.readInt())
.sessionId(frame.readLong())
.type(frame.readByte())
.level(frame.readByte())
.extendHeaders(decodeExtendHeaders(frame))
.body(decodeBody(frame))
.build();
return packet;
}
private Map<String, Object> decodeExtendHeaders(ByteBuf in) throws IOException {
int extendHeadersBytesSize = in.readInt();
byte[] extendHeadersBytes = new byte[extendHeadersBytesSize];
in.readBytes(extendHeadersBytes);
Map<String, Object> extendHeaders = (Map<String, Object>)SerializeUtils.deserialize(extendHeadersBytes, Map.class);
return extendHeaders;
}
private Object decodeBody(ByteBuf in) throws IOException, ClassNotFoundException {
int bodyBytesSize = in.readInt();
byte[] bodyBytes = new byte[bodyBytesSize];
in.readBytes(bodyBytes);
int bodyClazzBytesSize = in.readInt();
byte[] bodyClazzBytes = new byte[bodyClazzBytesSize];
in.readBytes(bodyClazzBytes);
String bodyClazzString = new String(bodyClazzBytes);
Class bodyClazz = Class.forName(bodyClazzString);
Object body = SerializeUtils.deserialize(bodyBytes, bodyClazz);
return body;
}
}
12.私有协议栈的会话 ID 处理器
客户端在通道激活时会由会话生成器生成一个会话 ID,并利用 channelId 存放在会话缓存里。服务端在读取数据包时则先尝试根据 channelId 去会话缓存里获取会话 ID,获取不到再从 Packet 数据包里取出来然后进行缓存。
//负责生成以及传递会话ID处理器handler
public class SessionIdHandler extends ChannelInboundHandlerAdapter {
private boolean needGenerate;
public SessionIdHandler(boolean needGenerate) {
this.needGenerate = needGenerate;
}
//客户端逻辑,通道激活时的处理
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (needGenerate) {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionIdGenerator.generate();
SessionManager sessionManager = SessionManager.getInstance();
sessionManager.putSessionId(channelId, sessionId);
}
//触发往后handler的channelActive
ctx.fireChannelActive();
}
//服务端逻辑
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
SessionManager sessionManager = SessionManager.getInstance();
String channelId = ctx.channel().id().asLongText();
Long sessionId = sessionManager.getSessionId(channelId);
if (sessionId == null) {
sessionId = packet.getHeader().getSessionId();
sessionManager.putSessionId(channelId, sessionId);
}
//触发往后handler的channelRead
ctx.fireChannelRead(msg);
}
}
//连接会话管理
public class SessionManager {
private SessionManager() {
}
private static class Singleton {
static SessionManager instance = new SessionManager();
}
public static SessionManager getInstance() {
return Singleton.instance;
}
//用来记录channelId和sessionId的map
private Map<String, Long> sessionIds = new ConcurrentHashMap<String, Long>();
//用来记录address和session的map,可判断请求是否重复
private Map<String, Session> sessions = new ConcurrentHashMap<String, Session>();
public void putSessionId(String channelId, Long sessionId) {
sessionIds.put(channelId, sessionId);
}
public long getSessionId(String channelId) {
return sessionIds.get(channelId);
}
public long getSessionId(ChannelHandlerContext ctx) {
String channelId = ctx.channel().id().asLongText();
return sessionIds.get(channelId);
}
public void putSession(String remoteAddress, Session session) {
sessions.put(remoteAddress, session);
}
public Session getSession(String remoteAddress) {
return sessions.get(remoteAddress);
}
}
//会话id生成组件
public class SessionIdGenerator {
public static long generate() {
String uuid = UUID.randomUUID().toString();
return uuid.hashCode();
}
}
//连接会话
public class Session {
private String remoteAddress;
public Session(String remoteAddress) {
this.remoteAddress = remoteAddress;
}
public String getRemoteAddress() {
return remoteAddress;
}
public void setRemoteAddress(String remoteAddress) {
this.remoteAddress = remoteAddress;
}
}
13.私有协议栈的握手处理器
(1)握手处理器说明
当连接刚建立通道刚被激活时,客户端需要发起握手请求,服务端则需要启动一个延时线程检查握手是否超时,比如通道激活 1 分钟后还没有会话 ID。
当读取 Packet 数据包时,需要判断请求是握手请求还是握手响应。客户端处理握手响应,服务端处理握手请求。
服务端处理握手请求时,还要根据白名单 IP 看是否为非法请求,并根据会话缓存避免重复握手。
所以握手处理器主要包括三个功能:白名单 IP 处理 + 握手超时处理 + 重复握手处理。
(2)发起握手请求与激活通道
首先在连接刚建立时客户端通过 channelActive()方法发起握手请求。
//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);
//当连接刚建立时,客户端需要发送握手请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionManager.getInstance().getSessionId(channelId);
Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
ctx.writeAndFlush(handshakeRequestPacket);
//触发往后handler的channelActive
ctx.fireChannelActive();
}
//创建握手请求数据包
private Packet createHandshakeRequestPacket(long sessionId) throws IOException {
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.HandshakeRequest.value())
.level(PacketLevel.DEFAULT.value())
.body(new HandshakeRequest())
.build();
return packet;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
...
}
...
}
//握手请求
public class HandshakeRequest implements Serializable {
private String requestId = RequestIdGenerator.generate();
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
//请求ID生成组件
public class RequestIdGenerator {
public static String generate() {
return UUID.randomUUID().toString().replace("-", "");
}
}
然后服务端收到握手请求后通过 channelRead()方法进行处理,客户端收到握手响应也是通过 channelRead()方法进行处理。
//发起握手的处理器Handler
//握手的发起是在客户端和服务端TCP链路建立成功通道被激活时
//握手消息的接入和安全认证在服务端处理
//两个节点通信时,发起通信的一方是客户端,接收通信的一方是服务端
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(HandshakeHandler.class);
//当连接刚建立时,客户端需要发送握手请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionManager.getInstance().getSessionId(channelId);
Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
ctx.writeAndFlush(handshakeRequestPacket);
ctx.fireChannelActive();
}
//创建握手请求数据包
private Packet createHandshakeRequestPacket(long sessionId) throws IOException {
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.HandshakeRequest.value())
.level(PacketLevel.DEFAULT.value())
.body(new HandshakeRequest())
.build();
return packet;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
//对不同请求的处理
if (isHandshakeRequest(packet)) {
handleHandshakeRequest(ctx, packet);
} else if (isHandshakeResponse(packet)) {
handleHandshakeResponse(ctx, packet);
} else {
ctx.fireChannelRead(msg);
}
}
//是否是握手请求
private boolean isHandshakeRequest(Packet packet) {
return packet.getHeader().getType() == PacketType.HandshakeRequest.value();
}
//是否是握手响应
private boolean isHandshakeResponse(Packet packet) {
return packet.getHeader().getType() == PacketType.HandshakeResponse.value();
}
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
...
}
//处理握手响应
private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {
...
}
}
public enum PacketType {
HandshakeRequest(1),
HandshakeResponse(2),
KeepAlivePing(3),
KeepAlivePong(4);
byte value;
PacketType(int value) {
this.value = (byte) value;
}
public byte value() {
return value;
}
}
(3)握手请求的重复会话判断与响应
//握手响应
public class HandshakeResponse implements Serializable {
public static final String SESSION_EXISTED_ERROR_MESSAGE = "Session Existed.";
public static final String NOT_IN_WHITE_LIST_ERROR_MESSAGE = "IP is not in white list.";
private String requestId;
private boolean success = true;
private String errorMessage;
//构造函数私有化不给外部进行new
private HandshakeResponse() {
}
public static HandshakeResponse success(String requestId) {
HandshakeResponse handshakeResponse = new HandshakeResponse();
handshakeResponse.setRequestId(requestId);
return handshakeResponse;
}
public static HandshakeResponse error(String requestId, String errorMessage) {
HandshakeResponse handshakeResponse = new HandshakeResponse();
handshakeResponse.setRequestId(requestId);
handshakeResponse.setSuccess(false);
handshakeResponse.setErrorMessage(errorMessage);
return handshakeResponse;
}
public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }
public boolean isSuccess() { return success; }
public void setSuccess(boolean success) { this.success = success; }
public String getErrorMessage() { return errorMessage; }
public void setErrorMessage(String errorMessage) { this.errorMessage = errorMessage; }
}
握手请求的重复会话判断处理:
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (isHandshakeRequest(packet)) {
handleHandshakeRequest(ctx, packet);
} else if(isHandshakeResponse(packet)) {
handleHandshakeResponse(ctx, packet);
} else {
ctx.fireChannelRead(msg);
}
}
//是否是握手请求
private boolean isHandshakeRequest(Packet packet) {
return packet.getHeader().getType() == PacketType.HandshakeRequest.value();
}
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
Packet handshakeResponsePacket = null;
//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
if (existSession(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
}
ctx.writeAndFlush(handshakeResponsePacket);
}
//判断当前连接是否已经存在一个session了
private boolean existSession(ChannelHandlerContext ctx) {
String remoteAddress = ctx.channel().remoteAddress().toString();
SessionManager sessionManager = SessionManager.getInstance();
Session session = sessionManager.getSession(remoteAddress);
return session != null;
}
//创建握手应答Packet对象
private Packet createHandshakeResponsePacket(ChannelHandlerContext ctx, Packet handshakeRequestPacket, boolean success, String errorMessage) throws IOException {
HandshakeRequest handshakeRequest = (HandshakeRequest) handshakeRequestPacket.getBody();
HandshakeResponse handshakeResponse = success ? HandshakeResponse.success(handshakeRequest.getRequestId()) :
HandshakeResponse.error(handshakeRequest.getRequestId(), errorMessage);
Packet packet = Packet.builder()
.sessionId(handshakeRequestPacket.getHeader().getSessionId())
.type(PacketType.HandshakeResponse.value())
.level(PacketLevel.DEFAULT.value())
.body(handshakeResponse)
.build();
return packet;
}
...
}
(4)握手请求 IP 是否在白名单的判断
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
Packet handshakeResponsePacket = null;
//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
if (existSession(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
}
//如果发送握手请求的机器IP,不在白名单列表里,则为非法请求
else if(!inWhiteList(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);
}
ctx.writeAndFlush(handshakeResponsePacket);
}
//判断发送握手请求过来的机器IP地址,是否在白名单里
private boolean inWhiteList(ChannelHandlerContext ctx) {
InetSocketAddress socketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = socketAddress.getAddress().getHostAddress();
WhiteListManager whiteListManager = WhiteListManager.getInstance();
boolean inWhiteList = whiteListManager.inWhiteList(ip);
return inWhiteList;
}
...
}
public class WhiteListManager {
private WhiteListManager() {
whiteList.add("125.33.200.123");
}
private static class Singleton {
static WhiteListManager instance = new WhiteListManager();
}
public static WhiteListManager getInstance() {
return Singleton.instance;
}
private List<String> whiteList = new CopyOnWriteArrayList<String>();
public boolean inWhiteList(String ip) {
return whiteList.contains(ip);
}
}
(5)握手请求的响应以及非法连接问题
握手请求的应答处理:
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (isHandshakeRequest(packet)) {
handleHandshakeRequest(ctx, packet);
} else if(isHandshakeResponse(packet)) {
handleHandshakeResponse(ctx, packet);
} else {
ctx.fireChannelRead(msg);
}
}
//处理握手请求
private void handleHandshakeRequest(ChannelHandlerContext ctx, Packet packet) throws IOException {
Packet handshakeResponsePacket = null;
//如果当前连接已经存在一个session了,说明握手已经进行过了,此时是重复握手
if (existSession(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.SESSION_EXISTED_ERROR_MESSAGE);
}
//如果发送握手请求的机器IP,不在白名单列表里,则为非法请求
else if(!inWhiteList(ctx)) {
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, false, HandshakeResponse.NOT_IN_WHITE_LIST_ERROR_MESSAGE);
}
//当前连接不存在重复session,同时握手请求ip地址在白名单里
else {
initSession(ctx);
handshakeResponsePacket = createHandshakeResponsePacket(ctx, packet, true, null);
}
ctx.writeAndFlush(handshakeResponsePacket);
}
private void initSession(ChannelHandlerContext ctx) {
String remoteAddress = ctx.channel().remoteAddress().toString();
SessionManager sessionManager = SessionManager.getInstance();
sessionManager.putSession(remoteAddress, new Session(remoteAddress));
}
//处理握手响应
private void handleHandshakeResponse(ChannelHandlerContext ctx, Packet packet) {
HandshakeResponse handshakeResponse = (HandshakeResponse) packet.getBody();
//如果是握手成功了
if (handshakeResponse.isSuccess()) {
logger.info("handshake success.");
}
//如果是握手失败了
else {
logger.error(handshakeResponse.getErrorMessage());
ctx.close();
}
}
...
}
如果客户端非法跟服务端建立一个 Netty 物理连接后,却一直不发送握手请求,这会导致服务端的连接资源被非法占用。为了解决这个问题,需要进行握手超时检查。无握手、连接资源被非法侵占问题可以通过延时线程解决。
public class HandshakeHandler extends ChannelInboundHandlerAdapter {
...
private int mode;
public HandshakeHandler(int mode) {
this.mode = mode;
}
//当连接刚被建立时,需要发送握手请求
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (mode == NettyMode.CLIENT.value()) {
String channelId = ctx.channel().id().asLongText();
long sessionId = SessionManager.getInstance().getSessionId(channelId);
Packet handshakeRequestPacket = createHandshakeRequestPacket(sessionId);
ctx.writeAndFlush(handshakeRequestPacket);
} else if(mode == NettyMode.SERVER.value()) {
//检查是否在指定时间范围内把握手请求发送过来
new HandshakeRequestTimeoutThread(ctx).start();
}
ctx.fireChannelActive();
}
//握手请求超时没收到检查的线程
private class HandshakeRequestTimeoutThread extends Thread {
private static final long HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD = 1 * 60 * 1000;
private ChannelHandlerContext ctx;
public HandshakeRequestTimeoutThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
//休眠1分钟,1分钟后再继续(1分钟后线程才能被唤醒, await则可以随时被唤醒)
Thread.sleep(HANDSHAKE_REQUEST_TIMEOUT_THRESHOLD);
} catch (InterruptedException e) {
logger.error("HandshakeRequestTimeoutThread interrupted exception.");
}
if (!existSession(ctx)) {
logger.error("Client did not send handshake request in 1 minute.");
ctx.close();
}
}
}
...
}
public enum NettyMode {
CLIENT(1),
SERVER(2);
int value;
Mode(int value) {
this.value = value;
}
int value() {
return value;
}
}
14.私有协议栈的链路保活处理器
(1)链路保活处理器说明
当连接刚建立通道刚被激活时,客户端和服务端各自启动一个链路保活的检查线程。该线程会每隔 10 分钟做一次保活检查,具体的检查判断如下:
一.如果当前时间距离上一次收到数据包已超 1 小时,则启动链路保活探测,即向对方发送 Ping 消息
二.发送 Ping 消息时会记录 Ping 消息次数,但只要收到 Pong 消息或业务消息,就要清空该记录
三.只要记录的 Ping 消息次数超过 3 次,就关闭连接
(2)链路保活处理器框架搭建
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private long lastPacketTimestamp = -1;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
}
ctx.fireChannelRead(msg);
}
}
//探测Ping包
public class KeepAlivePing implements Serializable {
private String requestId;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
//探测Pong包
public class KeepAlivePong implements Serializable {
private String requestId;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
}
(3)定时检测长时间未通信的连接
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
//每隔10分钟做一次保活检查
private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
//1小时都没有通信就开启链路保活探测
private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
private long lastPacketTimestamp = -1;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
private int keepAlivePingRetryTimes = 0;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
for(;;) {
//每隔10分钟做一次链路保活检查
try {
sleep(KEEP_ALIVE_CHECK_INTERNAL);
} catch (InterruptedException e) {
logger.error("Keep alive thread interrupted exception.");
}
//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
long now = System.currentTimeMillis();
if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
//TODO
}
}
}
private Packet createKeepAlivePingPacket() throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePing.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePing())
.build();
return packet;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
}
ctx.fireChannelRead(msg);
}
}
(4)链路保活探测数据包封装与发送
启动链路保活探测,发送保活探测包:
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
//每隔10分钟做一次保活检查
private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
//1个小时都没有通信就开启链路保活探测
private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
private long lastPacketTimestamp = -1;
//存放对于该长连接已经发送的保活探测包请求
//map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
private int keepAlivePingRetryTimes = 0;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
for(;;) {
//每隔10分钟做一次链路保活检查
try {
sleep(KEEP_ALIVE_CHECK_INTERNAL);
} catch (InterruptedException e) {
logger.error("Keep alive thread interrupted exception.");
}
//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
long now = System.currentTimeMillis();
if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
try {
Packet keepAlivePingPacket = createKeepAlivePingPacket();
ctx.writeAndFlush(keepAlivePingPacket);
KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();
keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);
} catch (Exception e) {
logger.error("keep alive ping packet serialization exception.");
}
}
}
}
private Packet createKeepAlivePingPacket() throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePing.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePing())
.build();
return packet;
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
}
ctx.fireChannelRead(msg);
}
}
(5)链路保活探测数据包的处理与响应
收到保活探测包之后的处理:
//通信链路保活Handler
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogManager.getLogger(KeepAliveHandler.class);
//每隔10分钟做一次保活检查
private static final long KEEP_ALIVE_CHECK_INTERNAL = 10 * 60 * 1000;
//1个小时都没有通信就开启链路保活探测
private static final long KEEP_ALIVE_TIMEOUT = 1 * 60 * 60 * 1000;
private long lastPacketTimestamp = -1;
//存放对于该长连接已经发送的保活探测包请求
//map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
...
//收到保活探测包之后的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
lastPacketTimestamp = System.currentTimeMillis();
keepAlivePingPackets.clear();
}
if (isKeepAlivePingPacket(packet)) {
//收到的是保活探测ping包就构建pong包
Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);
ctx.writeAndFlush(keepAlivePongPacket);
} else if(isKeepAlivePongPacket(packet)) {
//收到的是保活探测pong包就代表成功
KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();
keepAlivePingPackets.remove(keepAlivePong.getRequestId());
logger.info("Keep alive ping pong success.");
}
ctx.fireChannelRead(msg);
}
private boolean isKeepAlivePingPacket(Packet packet) {
return packet.getHeader().getType() == PacketType.KeepAlivePing.value();
}
private boolean isKeepAlivePongPacket(Packet packet) {
return packet.getHeader().getType() == PacketType.KeepAlivePong.value();
}
private Packet createKeepAlivePongPacket(ChannelHandlerContext ctx, Packet keepAliveRequestPacket) throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
KeepAlivePing keepAlivePing = (KeepAlivePing) keepAliveRequestPacket.getBody();
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePong.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePong(keepAlivePing.getRequestId()))
.build();
return packet;
}
}
(6)链路保活探测包发送失败的重试
如果 10 分钟了还没收到第一次发的 Ping 探测包的 Pong 响应,那么就进行重试,且最多重试 3 次。连续成功发送了 3 次 Ping,结果一直没有 Pong 响应,则关闭连接。
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
...
private static final int KEEP_ALIVE_PING_RETRY_TIMES = 3;
private long lastPacketTimestamp = -1;
//存放对于该长连接已经发送的保活探测包请求
//map是用于存放已经发送ping探测包后但还没收到pong探测包的packet
private Map<String, Packet> keepAlivePingPackets = new ConcurrentHashMap<String, Packet>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
new KeepAliveThread(ctx).start();
}
//通信链路保活检查线程
private class KeepAliveThread extends Thread {
private ChannelHandlerContext ctx;
private int keepAlivePingRetryTimes = 0;
public KeepAliveThread(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
for(;;) {
//每隔10分钟做一次链路保活检查
try {
sleep(KEEP_ALIVE_CHECK_INTERNAL);
} catch (InterruptedException e) {
logger.error("Keep alive thread interrupted exception.");
}
//每隔10分钟检查一下最近一次发送的keep alive ping是否已经收到pong了
if (keepAlivePingPackets.size() > 0) {
//发送重试
if (keepAlivePingPackets.size() < KEEP_ALIVE_PING_RETRY_TIMES) {
if (!sendKeepAlivePingPacketWithRetry(ctx)) {
ctx.close();
}
}
//连续发送成功了3次ping,结果一直没有pong回来,此时也是关闭物理连接
if (keepAlivePingPackets.size() >= KEEP_ALIVE_PING_RETRY_TIMES) {
ctx.close();
}
}
//当前时间距离上一次收到数据包的时间超过了1小时,启动链路保活探测
long now = System.currentTimeMillis();
if (now - lastPacketTimestamp >= KEEP_ALIVE_TIMEOUT) {
//如果连续重试3次都发送不成功一个探测包,此时直接关闭物理连接
if (!sendKeepAlivePingPacketWithRetry(ctx)) {
ctx.close();
}
}
}
}
//10分钟内如果还没收到第一次发的ping探测包的pong响应,那么就进行重试,且最多重试3次
private boolean sendKeepAlivePingPacketWithRetry(ChannelHandlerContext ctx) {
boolean result = false;
int retryTimes = 0;
while(retryTimes < KEEP_ALIVE_PING_RETRY_TIMES) {
try {
sendKeepAlivePingPacket(ctx);
result = true;
break;
} catch (Exception e) {
logger.error("send keep alive ping packet exception.");
retryTimes++;
}
}
return result;
}
private void sendKeepAlivePingPacket(ChannelHandlerContext ctx) throws IOException {
Packet keepAlivePingPacket = createKeepAlivePingPacket();
ctx.writeAndFlush(keepAlivePingPacket);
KeepAlivePing keepAlivePing = (KeepAlivePing) keepAlivePingPacket.getBody();
keepAlivePingPackets.put(keepAlivePing.getRequestId(), keepAlivePingPacket);
}
private Packet createKeepAlivePingPacket() throws IOException {
SessionManager sessionManager = SessionManager.getInstance();
long sessionId = sessionManager.getSessionId(ctx);
Packet packet = Packet.builder()
.sessionId(sessionId)
.type(PacketType.KeepAlivePing.value())
.level(PacketLevel.DEFAULT.value())
.body(new KeepAlivePing())
.build();
return packet;
}
}
...
}
当有数据包发送过来时,那么就可以清空记录的 Ping 探测包了。
public class KeepAliveHandler extends ChannelInboundHandlerAdapter {
...
//收到保活探测包之后的处理
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Packet packet = (Packet) msg;
if (packet != null) {
//有包发送过来的时候,不管是什么样的包,ping探测包都可以清空了
lastPacketTimestamp = System.currentTimeMillis();
keepAlivePingPackets.clear();
}
if (isKeepAlivePingPacket(packet)) {
//收到的是保活探测ping包就构建pong包
Packet keepAlivePongPacket = createKeepAlivePongPacket(ctx, packet);
ctx.writeAndFlush(keepAlivePongPacket);
} else if(isKeepAlivePongPacket(packet)) {
//收到的是保活探测pong包就代表成功
KeepAlivePong keepAlivePong = (KeepAlivePong) packet.getBody();
keepAlivePingPackets.remove(keepAlivePong.getRequestId());
logger.info("Keep alive ping pong success.");
}
ctx.fireChannelRead(msg);
}
...
}
文章转载自:东阳马生架构

不在线第一只蜗牛
还未添加个人签名 2023-06-19 加入
还未添加个人简介
评论