1.背景
生产环境操作的 redis 的框架原先使用的是 jedis,基于 spring-data-redis 封装的 template 进行读写。因为项目当中也用到了分布式锁,分布式锁的实现直接使用的开源框架 redissson。考虑到以下的区别:
jedis 本身是基于阻塞 I/O,且其方法调用都是同步的,程序流需要等到 socket 处理完 I/O 才能执行,不支持异步,jedis 客户端实例不是线程安全的,需要通过连接池来使用 jedis。
redisson 使用非阻塞的 I/O 和基于 Netty 框架的事件驱动的通信层,其方法调用是异步的。redisson 的 API 是线程安全的,可以操作单个 redisson 连接来完成各种操作。
且项目中 jedis 与 redisson 并存,那么希望直接通过 redisson 来完成 redis 的所有操作,包括正常的缓存读写和分布式锁。翻看了 redisson 的文档之后,发现其也支持 spring-data-redis 封装 template 进行操作,于是对项目进行了改造上线。在上线之后,我们缓存出现了不少问题,以下是部分的日志:
第一张截图是数据写入缓存超时,第二张截图是 ping 操作超时,出现的时间没有固定的规律,属于偶现。
2. 问题分析
截图中的问题不是同一个,我们暂且将其归为问题 1 和问题 2,问题 1 为写入超时,问题 2 为 ping 超时(ping 操作是 redisson 为了进行连接检测和重新连接断开的链接,使用 pingConnectionInterval 参数进行频率控制,默认 30s 执行一次)。首先尝试分析问题 1 写入超时,我们很容易想到,写入操作是不是 redis 服务器本身存在问题,于是我们第一时间联系了运维,查看了 redis 的慢查日志,统计了 redis 的响应延时,结果发现 redis 服务端没有任何问题,且在 redisson 改造上线前,使用 jedis 时没有出现任何 redis 写入或者读取操作的问题,于是我们认为 redis 服务器本身是没有问题的。同时,我们看到了问题 2ping 超时,那么会不会是网络问题导致的?
2.1 网络连接
对于问题 2,我们查阅了网上的相关资料,大部分的资料分析的是 redis 服务端关闭了连接,而 redisson client 端不知道,于是 client 端使用了该链接从而导致错误的发生,解决方案是设置 pingConnectionInterval 参数。但是,在我们使用的 redisson 版本中,pingConnectionInterval 参数默认 30s,也就是说,每 30s 就会对连接进行一次检测即 ping 命令,理论上来说服务端不可能在如此短的时间内就对连接进行关闭。为防万一,我们也查看了服务端的配置,首先查看 tcp keepalive 的配置(命令为 sysctl -a |grep tcp_keepalive)如下:
熟悉 tcp 的同学应该知道,tcp 可以有长连接和短连接(此处不同于 http 的长连接和短连接),在有些场景中,我们希望连接能长期保持以减少重复断开连接造成的系统开销,因此会使用长连接,不同的 redis 客户端一般都使用长连接。在使用长连接时,客户端(两端都是可能的)可能意外断电、死机、崩溃、重启,还是中间路由网络无故断开,这些 TCP 连接并未来得及正常释放,那么,连接的另一方并不知道对端的情况,它会一直维护这个连接,长时间的积累会导致非常多的半打开连接,造成服务端系统资源的消耗和浪费,因此就有了保活探测机制(keepalive)。在上图的配置中,net.ipv4.tcp_keepalive_time=30
表示允许的空闲时长为 30s,即在客服端最后一次发送之后,过了 30s,服务端会进行主动的探测。探测的次数和时间间隔由net.ipv4.tcp_keepalive_probes
和net.ipv4.tcp_keepalive_intvl
控制,上图中net.ipv4.tcp_keepalive_intvl=15
表示探测的时间间隔为 15s,net.ipv4.tcp_keepalive_probes=5
表示探测次数为 5 次, 也就是说,在客户端发送了报文之后,30s 没有新的报文,服务端会往客户端发送一个 keepalive 探测包,如果客户端没有响应,那么间隔 15s 之后会继续发 keepalive 探测包,如果一直没有响应,那么服务端在发送 5 次之后会主动断开连接,从上面的配置看,如果客户端一直没有响应 redis 服务端的 keepalive 探测包,那么在 30 + 5*15 =105s 之后,redis 服务端会关闭这个连接。基于上面的论述,redisson 客户端会每隔 30s 发送一次 ping 命令,理论上来说 redis 服务端不可能会主动关闭连接而造成 redisson 客户端无感知。为了验证连接是否还在,我们也查看了 redis 服务端的连接情况(netstat -no|grep 6379):
最后一列的 keepalive 说明 tcp keepalive 的是开启的,也就是说 redis 服务端会进行探测保活。这里的结果多列,我们来分析一下具体的含义,首先 netstat 命令中的-o 参数的含义是输出相关的网络定时器:
-o, --timersInclude information related to networking timers.
结果中最后一列的含义(man netstat)如下:
Timer(this needs to be written)
一头冷汗,官方的文档竟然没写,直接拷贝一下网上的资料如下:
第一列,一般有一下几种状态;keepalive - #表示是 keepalive 的时间计时 on - #表示是重发(retransmission)的时间计时 off - #表示没有时间计时 timewait - #表示等待(timewait)时间计时第二列,(51.16/0/0) -> (a/b/c)a - #计时时间值(当第一列为 keepalive 的时候,a 代表 keepalive 计时时间;当第一列为 on 的时候,a 代表重发(retransmission)的时间计时;当第一列为 timewait 的时候,a 代表等待(timewait)的时间计时)b - #已经产生的重发(retransmission)次数 c - #keepalive 已经发送的探测(probe)包的次数
从以上的说明,我们可以看出截图中的连接不存发送探测包的情况,也即不存在空闲的情况(因为一直客户端每隔 30s 进行一次 ping)。但是截图上有一列的计时是 51.16,这大于了系统的 30s,这又是怎么来的呢?实际上在应用层面可以自行设定 tcp keepalive 的时间,这里是因为 redis 服务器自身进行了设定,具体的配置如下:
# Close the connection after a client is idle for N seconds (0 to disable)
timeout 0
# TCP keepalive.
#
# If non-zero, use SO_KEEPALIVE to send TCP ACKs to clients in absence
# of communication. This is useful for two reasons:
#
# 1) Detect dead peers.
# 2) Take the connection alive from the point of view of network
# equipment in the middle.
#
# On Linux, the specified value (in seconds) is the period used to send ACKs.
# Note that to close the connection the double of the time is needed.
# On other kernels the period depends on the kernel configuration.
#
# A reasonable value for this option is 60 seconds.
tcp-keepalive 60
复制代码
对于 redis 服务端而言,实际的 keepalive 空闲时间是 60s。至此,我们发现,在网络连接层面是不会出现问题的,那么,问题到底在哪里?回到日志,日志有详细的错误堆栈信息,是不是可以从这里再挖出一些有用的信息呢?
2.2 问题复现
在问题 1 的日志里,可以看到此处的逻辑是 set 操作,在 spring-data-redis 的 template 操作最终走到了 RedissonConnection 的 write 操作,而后是 sync 阻塞等待实际的 write 结果,那么 write 阶段是否有异常发生了,我们尝试找了更细节的日志:
2022-01-24 02:55:39.685 [http-nio-8080-exec-6] ERROR [o.a.c.c.C.[.[.[.[dispatcherServlet]:175] [TID:N/A] - Servlet.service() for servlet [dispatcherServlet] in context with path [/cmccplus] threw exception
org.springframework.dao.InvalidDataAccessApiUsageException: Unexpected exception while processing command; nested exception is org.redisson.client.RedisException: Unexpected exception while processing command
at org.redisson.spring.data.connection.RedissonExceptionConverter.convert(RedissonExceptionConverter.java:52)
at org.redisson.spring.data.connection.RedissonExceptionConverter.convert(RedissonExceptionConverter.java:35)
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44)
at org.redisson.spring.data.connection.RedissonConnection.transform(RedissonConnection.java:195)
at org.redisson.spring.data.connection.RedissonConnection.syncFuture(RedissonConnection.java:190)
at org.redisson.spring.data.connection.RedissonConnection.sync(RedissonConnection.java:356)
at org.redisson.spring.data.connection.RedissonConnection.write(RedissonConnection.java:722)
at org.redisson.spring.data.connection.RedissonConnection.setEx(RedissonConnection.java:530)
at org.springframework.data.redis.connection.DefaultStringRedisConnection.setEx(DefaultStringRedisConnection.java:1138)
at org.springframework.data.redis.core.DefaultValueOperations$8.potentiallyUsePsetEx(DefaultValueOperations.java:337)
at org.springframework.data.redis.core.DefaultValueOperations$8.doInRedis(DefaultValueOperations.java:330)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:97)
at org.springframework.data.redis.core.DefaultValueOperations.set(DefaultValueOperations.java:325)
at com.cmcc.hy.plus.biz.auth.JwtAuthenticationTokenFilter.doFilterInternal(JwtAuthenticationTokenFilter.java:150)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540)
at org.apache.catalina.core.StandardHostValve.invoke$original$j3CdN2QH(StandardHostValve.java:135)
at org.apache.catalina.core.StandardHostValve.invoke$original$j3CdN2QH$accessor$lXTjUmdl(StandardHostValve.java)
at org.apache.catalina.core.StandardHostValve$auxiliary$mFQ4Aq8A.call(Unknown Source)
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.redisson.client.RedisException: Unexpected exception while processing command
at org.redisson.command.CommandAsyncService.convertException(CommandAsyncService.java:328)
at org.redisson.command.CommandAsyncService.get(CommandAsyncService.java:128)
at org.redisson.spring.data.connection.RedissonConnection.syncFuture(RedissonConnection.java:188)
... 50 common frames omitted
Caused by: java.lang.NullPointerException: null
at org.redisson.client.handler.CommandEncoder.encode(CommandEncoder.java:130)
at org.redisson.client.handler.CommandEncoder.encode(CommandEncoder.java:99)
at org.redisson.client.handler.CommandEncoder.encode(CommandEncoder.java:55)
at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:107)
at org.redisson.client.handler.CommandEncoder.write(CommandEncoder.java:75)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:120)
at org.redisson.client.handler.CommandBatchEncoder.write(CommandBatchEncoder.java:45)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:801)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at org.redisson.client.handler.CommandsQueue.write(CommandsQueue.java:84)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:106)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 common frames omitted
复制代码
这个日志比较详细,可以看到这里抛出了 NullPointerException,具体抛出了地方是 redisson 的 CommandEncoder,这就很有意义了。同时,我们发现了这个异常的原因是 com.cmcc.hy.plus.biz.auth.JwtAuthenticationTokenFilter.doFilterInternal(JwtAuthenticationTokenFilter.java:150),在这一行,我们真的发现了系统的一个 bug,这里有可能会出现使用 redisTemplate set null 的情况,因此,我们在 IDE 中模拟了这个错误,发现使用基于 redisson 的 redisTemplate set null 的确会出现 CommandEncoder 的 NullPointerException,堆栈信息也一样,到这里我们发现好像事情在变得逐渐明朗起来。但是,前述的问题 1 和问题 2 是怎么来的呢,好像和 NullPointerException 是完全不同的问题,两者是不是有什么联系呢?基于此,我们对日志进行了更多了分析,分析之后发现了一个有趣的现象,在 CommandEncoder 的 NullPointerException 的周边,都会出现要么是问题 1 或者问题 2 的错误,有较强的规律。那么是不是 CommandEncoder NullPointerException 蔓延出了其他的错误呢?巧合的是,我们在本地 IDE 复现了这个问题,在验证 CommandEncoder NullPointerException 时,测试用例并没有关闭,在 30s 之后,出现了 Ping 异常:
[redisson-timer-4-1] ERROR [o.r.c.handler.PingConnectionHandler:77] [TID: N/A] - Unable to send PING command over channel: [id: 0x3564ed6a, L:/2.0.4.28:61450 - R:172.21.41.10/172.21.41.10:6379]
org.redisson.client.RedisTimeoutException: Command execution timeout for command: (PING), params: [], Redis client: [addr=redis://172.21.41.10:6379]
at org.redisson.client.RedisConnection.lambda$async$1(RedisConnection.java:251)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:670)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:745)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:473)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
复制代码
之后,我们缩小了 ping 的间隔时间,以便更好的观察,结果很令人欣慰,在使用 redisTemplate set null 时,出现 CommandEncoder NullPointerException 之后,必然会出现 ping 异常,走到这里,颇有柳暗花明又一村的感觉!问题 2 ping 超时已经复现了,事实证明不是网络连接的原因,那么问题 1 写入超时呢,是不是同样的问题导致的?按照这个想法,接下来复现的方法就很简单了,在 redisTemplate set null 之后写入一个正常的数据,结果是出现了写入超时(此处不再贴日志了),且必现!并且在之后的写入和读取操作会一直报错,也就是说这条连接后续的操作会一直出错,直至出现 ping 异常。ping 异常之后的操作将不再报错,这也侧面说明了为何生产环境不会一直报错,至于 ping 异常之后不会继续出错的原因是 PingConnectionHandler(定时发送 ping 指令的处理器)会将出错的连接 close, 如下代码的第 18 行:
private void sendPing(ChannelHandlerContext ctx) {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
RFuture<String> future;
if (connection.getUsage() == 0) {
future = connection.async(StringCodec.INSTANCE, RedisCommands.PING);
} else {
future = null;
}
config.getTimer().newTimeout(timeout -> {
if (connection.isClosed() || ctx.isRemoved()) {
return;
}
if (connection.getUsage() == 0
&& future != null
&& (future.cancel(false) || !future.isSuccess())) {
ctx.channel().close();
if (future.cause() != null && !future.isCancelled()) {
log.error("Unable to send PING command over channel: " + ctx.channel(), future.cause());
}
log.debug("channel: {} closed due to PING response timeout set in {} ms", ctx.channel(), config.getPingConnectionInterval());
} else {
sendPing(ctx);
}
}, config.getPingConnectionInterval(), TimeUnit.MILLISECONDS);
复制代码
同时,ConnectionWatchdog 会尝试进行重连(这里的重连是指 RedisConnection,而不是原先的 tcp 连接):
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
RedisConnection connection = RedisConnection.getFrom(ctx.channel());
if (connection != null) {
connection.fireDisconnected();
if (!connection.isClosed()) {
if (connection.isFastReconnect()) {
tryReconnect(connection, 1);
} else {
reconnect(connection, 1);
}
}
}
ctx.fireChannelInactive();
}
复制代码
在重连之后,会注册一条新的 channel,并使用新的 RedisConnection 进行后续的操作。既然问题已经复现并且基于此进行了简单的分析,那么要解决这个问题实际也比较简单,在我们的应用代码里出现 set null bug 的地方先做一层判断,就能避免整个 redisson 问题的蔓延。但是,这里不禁要问,为什么使用 jedis 的时候没有这个问题?因为 jedis 在执行命令的时候对所有的 key、value 都进行了非空校验,而 redisson 没有!以下是 jedis 的 set 代码:
以下是 redisson 的 set 代码:
非常遗憾的是,redisson 作为一个比较成熟的开源项目,竟然没有做最基本的非空校验,并且出现问题时出现了异常的扩散。走到这里,实际我们已经能解决生产的问题了,至少能保证不会再继续出现写入超时或者 ping 超时,但是,禀着“知其然而知其所以然”的宗旨,我们不禁要问,到底是什么原因,redisson 出现了异常的扩散?
2.3 源码分析
我们先看看 redisson set 时 write 方法中的内容:
<T> T write(byte[] key, Codec codec, RedisCommand<?> command, Object... params) {
RFuture<T> f = executorService.writeAsync(key, codec, command, params);
indexCommand(command);
return sync(f);
}
复制代码
这是一个异步转同步的过程,executorService.writeAsync
异步执行操作,sync
阻塞等待结果。在这方法中RedisCommand
封装了 redis 的命令,params
是命令的参数。在 set null 的过程中,从之前的日志可以分析出在执行writeAsync
的过程中出现了问题,堆栈信息表明是CommandEncoder
抛出了空指针,CommandEncoder
的作用是对请求的 redis 指令进行编码,它是一个MessageToByteEncoder
。它的编码方法如下:
@Override
protected void encode(ChannelHandlerContext ctx, CommandData<?, ?> msg, ByteBuf out) throws Exception {
try {
out.writeByte(ARGS_PREFIX);
int len = 1 + msg.getParams().length;
if (msg.getCommand().getSubName() != null) {
len++;
}
out.writeCharSequence(Long.toString(len), CharsetUtil.US_ASCII);
out.writeBytes(CRLF);
writeArgument(out, msg.getCommand().getName().getBytes(CharsetUtil.UTF_8));
if (msg.getCommand().getSubName() != null) {
writeArgument(out, msg.getCommand().getSubName().getBytes(CharsetUtil.UTF_8));
}
for (Object param : msg.getParams()) {
ByteBuf buf = encode(param);
writeArgument(out, buf);
if (!(param instanceof ByteBuf)) {
buf.release();
}
}
if (log.isTraceEnabled()) {
String info = out.toString(CharsetUtil.UTF_8);
if (RedisCommands.AUTH.equals(msg.getCommand())) {
info = info.substring(0, info.indexOf(RedisCommands.AUTH.getName()) + RedisCommands.AUTH.getName().length()) + "(password masked)";
}
log.trace("channel: {} message: {}", ctx.channel(), info);
}
} catch (Exception e) {
msg.tryFailure(e);
throw e;
}
}
复制代码
在以上代码第 18 行的encode
方法中,对参数进行具体的编码,null 导致此处抛出空指针异常,在这个方法的末尾也捕获了这个异常同时msg.tryFailure
结束了调用过程并将异常往外抛出,并最终在外层对异常进行封装转换,这里符合 set null 的异常堆栈信息。那么,它是如何对后面的操作产生影响的呢?我们采用如下的代码进行测试:
try {
stringRedisTemplate.opsForValue().set("test-null", null);
} catch (Exception e) {
log.error("write null", e);
}
try {
stringRedisTemplate.opsForValue().set("test-normal-write", "1");
} catch (Exception e) {
log.error("write normal", e);
}
复制代码
代码的逻辑很简单,使用 StringRedisTemplate 先写入 null 值,然后再写入一个正常值,观察 log。在写入 null 值时,结果就是之前的堆栈信息,在写入正常值时,错误日志如下:
org.springframework.dao.QueryTimeoutException: Redis server response timeout (3000 ms) occured after 0 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: (SET), params: [[116, 101, 115, 116, 45, 110, 111, 114, 109, 97, ...], [49]], channel: [id: 0x5fc67cb9, L:/2.0.4.190:55103 - R:172.21.41.10/172.21.41.10:6379]; nested exception is org.redisson.client.RedisResponseTimeoutException: Redis server response timeout (3000 ms) occured after 0 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: (SET), params: [[116, 101, 115, 116, 45, 110, 111, 114, 109, 97, ...], [49]], channel: [id: 0x5fc67cb9, L:/2.0.4.190:55103 - R:172.21.41.10/172.21.41.10:6379]
at org.redisson.spring.data.connection.RedissonExceptionConverter.convert(RedissonExceptionConverter.java:48)
at org.redisson.spring.data.connection.RedissonExceptionConverter.convert(RedissonExceptionConverter.java:35)
at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44)
at org.redisson.spring.data.connection.RedissonConnection.transform(RedissonConnection.java:195)
at org.redisson.spring.data.connection.RedissonConnection.syncFuture(RedissonConnection.java:190)
at org.redisson.spring.data.connection.RedissonConnection.sync(RedissonConnection.java:356)
at org.redisson.spring.data.connection.RedissonConnection.write(RedissonConnection.java:722)
at org.redisson.spring.data.connection.RedissonConnection.set(RedissonConnection.java:490)
at org.springframework.data.redis.connection.DefaultStringRedisConnection.set(DefaultStringRedisConnection.java:1102)
at org.springframework.data.redis.core.DefaultValueOperations$7.inRedis(DefaultValueOperations.java:309)
at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:61)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:97)
at org.springframework.data.redis.core.DefaultValueOperations.set(DefaultValueOperations.java:305)
at com.cmcc.hy.plus.job.controller.TestController.redis(TestController.java:47)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1067)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:963)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:681)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:764)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at com.cmcc.hy.plus.common.filter.LoggingFilter.doFilterInternal(LoggingFilter.java:243)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:96)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:197)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:540)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:135)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:382)
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:895)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1732)
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191)
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.redisson.client.RedisResponseTimeoutException: Redis server response timeout (3000 ms) occured after 0 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: (SET), params: [[116, 101, 115, 116, 45, 110, 111, 114, 109, 97, ...], [49]], channel: [id: 0x5fc67cb9, L:/2.0.4.190:55103 - R:172.21.41.10/172.21.41.10:6379]
at org.redisson.command.RedisExecutor.lambda$scheduleResponseTimeout$5(RedisExecutor.java:341)
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:670)
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:745)
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:473)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 common frames omitted
复制代码
从日志的表面看,的确就是超时了,第一句话“org.springframework.dao.QueryTimeoutException: Redis server response timeout (3000 ms) occured after 0 retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. ”表示超过了 3000ms 还没有响应,3000ms 是 redisson 的响应超时时间,那么这个具体是从哪里来的呢?在我们前面的分析中,redisson 的写入操作是一个异步转同步的过程,使用了 sync 方法阻塞等待 future 的结果。既然是异步的过程,它是如何控制超时时间的,答案是org.redisson.command.RedisExecutor#scheduleResponseTimeout
方法,这个方法会起一个定时任务,任务的在 3000ms(可配置)之后执行,任务的内容是将操作结果置为超时,也就是说,如果 3000ms 内该任务没有被 cancel 掉,就会抛出超时的异常。核心逻辑是这一段代码:
TimerTask timeoutResponseTask = timeout -> {
if (isResendAllowed(attempt, attempts)) {
if (!attemptPromise.cancel(false)) {
return;
}
connectionManager.newTimeout(t -> {
attempt++;
if (log.isDebugEnabled()) {
log.debug("attempt {} for command {} and params {}",
attempt, command, LogHelper.toString(params));
}
mainPromiseListener = null;
execute();
}, retryInterval, TimeUnit.MILLISECONDS);
return;
}
attemptPromise.tryFailure(
new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured"
+ " after " + attempt + " retry attempts. Increase nettyThreads and/or timeout settings. Try to define pingConnectionInterval setting. Command: "
+ LogHelper.toString(command, params) + ", channel: " + connection.getChannel()));
};
timeout = connectionManager.newTimeout(timeoutResponseTask, timeoutTime, TimeUnit.MILLISECONDS);
复制代码
可以看到,timeoutResponseTask 中组装了 RedisResponseTimeoutException 异常。scheduleResponseTimeout
方法是在何处被调用的呢?从源码可以看到只有一个地方调用了这个方法,这个地方是org.redisson.command.RedisExecutor#checkWriteFuture
,checkWriteFuture
的作用是检查写入请求的结果,如果写入成功的话,就会调用这个方法,具体如下:
connectionFuture.onComplete((connection, e) -> {
if (connectionFuture.isCancelled()) {
connectionManager.getShutdownLatch().release();
return;
}
if (!connectionFuture.isSuccess()) {
connectionManager.getShutdownLatch().release();
exception = convertException(connectionFuture);
return;
}
sendCommand(attemptPromise, connection);
writeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(writeFuture, attemptPromise, connection);
}
});
});
复制代码
在上述代码的第 15 行,表明writeFuture
有一个 Listener,在 write 操作完成的时候进行检查checkWriteFuture
。也就是说,在scheduleResponseTimeout
之前,实际请求已经发出去了,那么 redis 服务端有没有收到呢?使用 redis-cli 的 monitor 功能查看服务端接收到的请求,的确能看到写入的请求:
从截图中可以看到操作是 SET,key 是 test-normal-write,且 redis 服务端没有收到 SET test-null 的指令,因为 redisson 在CommandEncoder
阶段已经抛出空指针了。从 redis 的服务端看,对 SET 操作的处理也非常快,那问题是不是在响应处理这一端出现的呢?接下来我们来看一看 redisson 在处理请求/响应时到底干了什么,具体可以参考org.redisson.client.handler.RedisChannelInitializer#initChannel
方法,代码如下:
@Override
protected void initChannel(Channel ch) throws Exception {
initSsl(config, ch);
if (type == Type.PLAIN) {
ch.pipeline().addLast(new RedisConnectionHandler(redisClient));
} else {
ch.pipeline().addLast(new RedisPubSubConnectionHandler(redisClient));
}
ch.pipeline().addLast(
connectionWatchdog,
CommandEncoder.INSTANCE,
CommandBatchEncoder.INSTANCE);
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandsQueue());
} else {
ch.pipeline().addLast(new CommandsQueuePubSub());
}
if (pingConnectionHandler != null) {
ch.pipeline().addLast(pingConnectionHandler);
}
if (type == Type.PLAIN) {
ch.pipeline().addLast(new CommandDecoder(config.getAddress().getScheme()));
} else {
ch.pipeline().addLast(new CommandPubSubDecoder(config));
}
ch.pipeline().addLast(new ErrorsLoggingHandler());
config.getNettyHook().afterChannelInitialization(ch);
}
复制代码
代码中可以看到很多熟悉的身影,比如pingConnectionHandler
、CommandEncoder
、connectionWatchdog
。从以上的代码中,我们可以简单画一下输入和输出的 pipeline,输出(请求 redis)的 pipieline 如下:
ErrorsLoggingHandler -> CommandsQueue -> CommandBatchEncoder -> CommandEncoder
复制代码
输入(响应 redis,还包括建立连接等)的 pipeline 如下:
RedisConnectionHandler -> ConnectionWatchdog -> PingConnectionHandler -> CommandDecoder -> ErrorsLoggingHandler
复制代码
在输出即请求链路上,按我们的分析,set null 时在CommandEncoder
抛出了异常,中断了请求,在 set 正常值时链路时正常的。在输入即响应链路上,最主要的是CommandDecoder
,其他要么是处理日志,要么是处理连接事件,这里我们着重分析一下CommandDecoder
,即解码器。解码的第一步很重要,就是取出对应请求时的操作命令,redisson 是这么做的:
protected QueueCommand getCommand(ChannelHandlerContext ctx) {
Queue<QueueCommandHolder> queue = ctx.channel().attr(CommandsQueue.COMMANDS_QUEUE).get();
QueueCommandHolder holder = queue.peek();
if (holder != null) {
return holder.getCommand();
}
return null;
}
复制代码
它从 channel 中取出了一个 queue,然后从 queue 中 peek 出了QueueCommandHolder
作为当前的要处理的命令响应。那既然可以从 queue 中取,那又是什么时候塞入 queue 的呢,我们可以看到在请求的 pipiline 中,有一个CommandsQueue
,它是在这里把命令塞到 queue 里面的。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
QueueCommandHolder holder = new QueueCommandHolder(data, promise);
Queue<QueueCommandHolder> queue = ctx.channel().attr(COMMANDS_QUEUE).get();
while (true) {
if (lock.compareAndSet(false, true)) {
try {
queue.add(holder);
ctx.writeAndFlush(data, holder.getChannelPromise());
} finally {
lock.set(false);
}
break;
}
}
} else {
super.write(ctx, msg, promise);
}
}
复制代码
此时,我们已经大致知道 queue 写入和取出的对应关系,因为 peek 不会对 queue 中的数据做 remove 操作,所以必然有个地方把 queue 中的数据推出,redisson 的做法是在解码之后推出,decode 之后会调用如下的方法:
protected void sendNext(Channel channel) {
Queue<QueueCommandHolder> queue = channel.attr(CommandsQueue.COMMANDS_QUEUE).get();
queue.poll();
state(null);
}
复制代码
分析到这里,我们的问题其实基本已经有解了,在案例中 stringRedisTemplate set null 之后,由于在CommandEncode
阶段抛出了空指针异常,导致请求没发出去,那么肯定不会有CommandDecoder
阶段了,根据 pipeline 的顺序,CommandsQueue
是在CommandEncoder
之前执行,这也意味着 queue 中塞入了 set null(指指令 1)请求指令,而没有解码阶段将其从 queue 中推出,这导致在下一个 set 正常值(指令 2)的命令时,请求发到了 redis,redis 也进行了响应,但是在解码阶段,取出的操作指令是上一个请求(指令 1)的,从而解码异常,而且,由于此时取出的是第一次操作的指令(指令 1),第二次的指令(指令 2)还在等待下一次从 queue 中取出并处理(promise.tryFailure),进而导致正常 set(指令 2)的超时。自此,后续所有的请求都会出现错误,因为解码阶段取出的QueueCommand
永远是上一次请求的,直到PinConnectionHandler
出错,并断开连接,在ConnectionWatchDog
的协同处理下重置整个 channel 的数据。redisson 作为一个成熟的开源框架,有这个 bug 实际是非常不应该的,解决的方案有两种:
和 jedis 一样,在 set 等操作开始前进行前置的判空校验;
在CommandEncoder
的 encode 抛出异常时,使用 final 语句将 queue 中的QueueCommandHolder
推出。
当然,这里更倾向于方案 1。
3.总结
在分析 redisson 为什么会异常蔓延的过程中,实际经历的过程远比文中复杂。一个是 redisson 出错时的日志误导性比较强,看到错误日志的第一反应,要么是网络出错,要么是 redis 服务器出错,在分析了很久之后才排除了网络和 redis 的原因,从而转到分析 redisson 自身的实现机制。另外,在分析 redisson 的过程中,最开始的设想其实是在请求阶段关闭了连接,但是没有释放句柄或者引用,这也耗费了比较长的时间。在分析过程中,也打印了不少的 TRACE 日志,这也帮助排除了不少的错误方向。总而言之,分析问题是个比较耗时的过程,也是个不断学习进步的过程,在这个过程中,熟悉了 redisson,熟悉了 netty,也为今后自己的应用代码提供了很好的借鉴。
评论