前言
网络既连接了世界,也连接了你我。由于 TCP 连接的创建和关闭存在着较大的系统资源开销,因此在高频的 I/O 密集型场景下使用长连接已是共识,但如何保证连接的有效性和可用性却是一个永恒的话题。目前市面上大部分基于 TCP 长连接对内/外提供服务的中间件、存储系统、RPC 框架等都会在其内部通过应用层 HeartBeat 机制来提升长连接的可用性,大家思考下,既然 TCP 协议层缺省已经提供有 Keepalive 机制来实现连接保活,那为什么还需要在应用层引入 HeartBeat 逻辑?这是因为 TCP 的 Keepalive 机制只能探测连接的存活状态,而不能探测其可用性;其次,TCP 的 Keepalive 机制不能准实时探测到那些因异常而断开的连接,存在较大的滞后性,因此在实际的开发过程中,我们往往会选择在应用层设计和实现一套 HeartBeat 方案来有效提升连接的可用性。
或许对于大部分同学来说,HeartBeat 设计是一个比较陌生的领域,而设计出一个优秀的 HeartBeat 方案,更不是一件容易的事。因此,本文会深入剖析 dubbo 的源码(v3.0.8),为大家介绍应该如何设计出一套标准的应用层 HeartBeat 方案,相信大家在仔细阅读后一定能够有所裨益。
dubbo 学会了站在巨人的肩膀上
之所以我会选择 dubbo3.x 版本来讲解 HeartBeat 的标准实现原理是有原因的,因为在早期版本中,dubbo 所采用的 HeartBeat 方案存在着诸多问题,比如:不必要的 Two-Way HeartBeat 设计、定时器的滞后性问题,以及代码层面过度复杂化等问题,都会对其扩展性和维护性带来极大的挑战。直至在 dubbo3.x 及后续版本中,dubbo 开源社区才终于正式启用了基于 Netty 的 IdleStateHandler 来替代掉原有的 HeartBeat 方案。事实上,Netty 提供的 IdleStateHandler 非常 match 应用层的 HeartBeat 需求,代码层面开发人员可以以一种非常方便和简单的方式来实现空闲连接的检测和处理,事实上,这早已是业界标准了,虽然我不太清楚 dubbo 为什么一直要拖到 3.x 版本才选择更新(我在阿里主要还是以 HSF 为主,dubbo 也有几年没怎么具体关注过了),或许是出于底层兼容性和其它问题的考虑,当然这并不重要,重要的是,基于 IdleStateHandler 实现的 HeartBeat 方案具备高可靠性和低维护性等特质。
基于 IdleStateHandler 快速构建 HeartBeat 方案
既然 IdleStateHandler 可以非常方便和快速的帮助开发人员构建一个健壮的 HeartBeat 方案,那么接下来我就首先为大家演示 IdleStateHandler 的具体用法。和其他 Handler 一样,IdleStateHandler 也是一个标准的 ChannelHandler 实现,构造函数有 3 个必填项,如下所示:
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
}
复制代码
其中参数readerIdleTimeSeconds
代表读超时时间,参数writerIdleTimeSeconds
代表写超时时间,而参数allIdleTimeSeconds
则代表读/写超时时间。整体来说,我们可以理解为服务侧和客户侧如果在指定的单位之间内没有收到对端的任何读/写事件,待 IdleStateHandler 的定时任务检测到后就会回调与 channel 对应的 pipeline 上的下一个ChannelHandler#userEventTriggered()
函数,这里面就是我们要实现的空闲连接处理逻辑。在此大家需要注意,标准设计原则是,服务侧设置allIdleTimeSeconds
参数,而客户侧仅需设置参数readerIdleTimeSeconds
即可。
图1 heartbeat的标准处理流程
在任意的 ChannelHandler 中,我们都可以重写其userEventTriggered()
函数。如图 1 所示,如果服务侧单位时间内没有收到任何来自对端的读/写事件,标准的处理逻辑就是直接关闭 channel 会话;而客户侧在单位时间内没有收到对端的任何写事件,标准的处理逻辑是主动向服务侧发起 HeartBeat,直至对端响应,如果服务侧拒绝响应,客户侧就需要触发定量的重试机制,当超过重试阈值后,就需要尝试 close 和 reconnect,直至确保连接可用。伪代码如下所示:
// client
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
// close connection
ctx.close();
return;
}
super.userEventTriggered(ctx, evt);
}
// server
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.writeAndFlush(new Request(heartbeat)).
addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
// close & reconnect
}
});
}
}
复制代码
在此大家需要注意,早期 dubbo 的 HeartBeat 方案是基于 Two-Way 的设计,也即是说,客户侧会给服务侧发送 HeartBeat,反过来服务侧也会给客户侧发送 HeartBeat,然而实际上这几乎没有什么太大必要,大部分情况下只会造成通道资源的浪费,标准设计流程中 HeartBeat 的维护由客户侧单向负责就行了;其次,我看见很多同学在实现 HeartBeat 方案时,明明已经基于 IdleStateHandler 实现了空闲连接检测,然后还在 pipeline 中添加了一个 c2s 主动发送 ping 包的定时任务,假设在峰值流量场景下,这样的操作除了会急剧通道负载外,不会带来任何好处,所以大家在设计 HeartBeat 方案时一定要注意,千万不要进行任何不必要的通道资源占用。
标准的 HeartBeat 方案就介绍到此,接下来我们就一起来看看 dubbo3.x 的 HeartBeat 实现细节。
dubbo 空闲连接检测
刚才我也提及过了,3.x 及后续版本中,dubbo 已经从自实现空闲连接检测方案过渡到了 Netty 提供的 IdleStateHandler。那么接下来我们就首先看一下 provider 侧是如何配置空闲连接检测的。具体代码位置位于NettyServer#initServerBootstrap
中,如下所示:
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
bootstrap.group(bossGroup, workerGroup)
// 省略其他相关代码
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 将IdleStateHandler添加到pipeline中
.addLast("server-idle-handler",
new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
}
复制代码
与文前描述一致,在标准的 HeartBeat 设计原则中,服务侧需要同时设置读/写超时时间,即allIdleTimeSeconds
参数,这里的时间单位为毫秒,也就是说,provider 侧配置的读/写超时时间为180000ms
,为 consumer 侧重试时间的 3 倍(heartbeat * 3)。idleTimeout 的值来源于UrlUtils#getIdleTimeout
,如下所示:
public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
// 读/写超时时间
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
if (idleTimeout < heartBeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
return idleTimeout;
}
复制代码
而 consumer 侧的 IdleStateHandler 配置位于NettyClient#initBootstrap
中,如下所示:
protected void initBootstrap(NettyClientHandler nettyClientHandler) {
// 省略其他相关代码
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 添加IdleStateHandler
.addLast("client-idle-handler",
new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))
.addLast("handler", nettyClientHandler);
}
});
复制代码
标准的 HeartBeat 设计原则中,客户侧仅需设置readerIdleTimeSeconds
参数,这里的时间单位为毫秒,也就是说,consumer 侧配置的读超时时间为60000ms
。heartbeatInterval 的值来源于UrlUtils#getHeartbeat
,如下所示:
public static int getHeartbeat(URL url) {
return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
}
复制代码
大家思考下,为什么 provider 侧的超时时间要配置为 consumer 侧的 3 倍?这是因为在 HeartBeat 的标准设计中,客户侧是存在重试机制的,而 dubbo 的 consumer 侧的读超时是 60s,缺省重试次数是 3 次,这么设计的主要目的就是为了给 consumer 侧留有足够的重试时间和机会,只有当达到重试阈值后,provider/consumer 侧才会选择断开连接。
provider 侧的空闲连接处理逻辑
provider 侧的空闲连接处理逻辑非常简单,如果在指定的单位时间内没有收到对端的任何读/写事件,就直接断开对端连接,因为 provider 侧需要维护大量的 channel,以及处理这些 channel 上的各种 I/O 事件,负载要远大于 consumer 侧,所以一旦检测到存在空闲连接立即断开是正确的。provider 侧的空闲连接处理逻辑位于NettyServerHandler#userEventTriggered
中,如下所示:
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
logger.info("IdleStateEvent triggered, close channel " + channel);
// 超过重试阈值断开对端连接
channel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
super.userEventTriggered(ctx, evt);
}
复制代码
consumer 侧的空闲连接处理逻辑
相对于 provider 侧来说,consumer 侧的空闲连接处理逻辑就显得复杂得多,因为 consumer 侧的处理逻辑中包含 send heartbeat、try-again 和 reconnect 等 3 个主要逻辑。我们首先来看看读超时后 consumer 侧是如发送 HeartBeat 的,处理逻辑位于NettyClientHandler#userEventTriggered
中,如下所示:
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
if (logger.isDebugEnabled()) {
logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
}
// 封装心跳请求
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(HEARTBEAT_EVENT);
// 发送心跳请求
channel.send(req);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
复制代码
当 consumer 侧检测到读超时,并成功发送 HeartBeat 后,provider 侧会首先进行编/解码处理,然后位于 pipeline 中的NettyServerHandler#channelRead
会处理这个入站事件,然后依次交由AbstractPeer->MultiMessageHandler->HeartbeatHandler#received
执行请求处理,如下所示:
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
// 判断是否是心跳请求包
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(HEARTBEAT_EVENT);
channel.send(res);
if (logger.isDebugEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
return;
}
// 判断是否是心跳响应包
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
// 非心跳请求走这里
handler.received(channel, message);
}
复制代码
如果 provider 侧确认请求是 HeartBeat 时,则会由 HeartbeatHandler 负责封装一个 HeartBeat 响应包返回给 consumer 侧。整体流程如图 2 所示:
图2 send heartbeat request流程
实际上,consumer 侧和 provider 侧都是使用的HeartbeatHandler#received
来处理 HeartBeat(正常请求也会途径HeartbeatHandler#received
),当 consumer 侧收到 HeartBeat 响应包后不会进行任何处理。如图 3 所示:
图3 send heartbeat response流程
当然正常情况下,如果连接可用,consumer 侧是不会触发重试机制的,只有当 provider 侧拒绝回应 HeartBeat 响应包超过 3 次时,才会触发 consumer 侧的断连(close)和重连(reconnect)操作。这段定时任务的代码位于ReconnectTimerTask#doTask
中,如下所示:
protected void doTask(Channel channel) {
try {
Long lastRead = lastRead(channel);
Long now = now();
if (!channel.isConnected()) {
try {
logger.info("Initial connection to " + channel);
((Client) channel).reconnect();
} catch (Exception e) {
logger.error("Fail to connect to " + channel, e);
}
// 检查是否超过重试阈值
} else if (lastRead != null && now - lastRead > idleTimeout) {
logger.warn("Reconnect to channel " + channel + ", because heartbeat read idle time out: "
+ idleTimeout + "ms");
try {
// 发起重连
((Client) channel).reconnect();
} catch (Exception e) {
logger.error(channel + "reconnect failed during idle time.", e);
}
}
} catch (Throwable t) {
logger.warn("Exception when reconnect to remote channel " + channel.getRemoteAddress(), t);
}
}
复制代码
上述程序示例中,代码lastRead != null && now - lastRead > idleTimeout
用于判断整体的重试时间是否>180000ms
,如果超出 HeartBeat 的重试阈值就执行 reconnect 操作。
后记
关于 HeartBeat 方案的标准设计就讲到这里,感兴趣的同学可以自行阅读 dubbo3.x 的源码,或者参考其他相关文献资料。如果在阅读过程中有任何疑问,欢迎在评论区留言参与讨论。
推荐文章:
评论 (2 条评论)