写点什么

2021 年 Java 知识体系总结,部门老大:redis- 分布式锁再这么用

发布于: 2021 年 08 月 04 日

前言

Mysql 的锁机制确实非常重要,所以在这里做一个全面的总结整理,便于以后的查阅,也分享给大家。


Mysql 的锁机制还是有点难理解的,所以这篇文章采用图文结合的方式讲解难点,帮助大家理解,讲解的主要内容如下图的脑图所示,基本涵盖了 Mysql 锁机制的所有知识点。

本文脑图


            /**             * 扣减库存             */            。。。。。。        } else {            LOGGER.info("未获取到锁业务结束..");        }    } catch (Exception e) {        LOGGER.info("处理异常", e);    }     return "ok";
复制代码


}复制代码



结果业务代码执行完以后我忘了释放锁`lock.unlock()`,导致`redis`线程池被打满,`redis`服务大面积故障,造成库存数据扣减混乱,被领导一顿臭骂,这个月绩效~ 哎·~。
随着 使用`redis` 锁的时间越长,我发现 `redis` 锁的坑远比想象中要多。就算在面试题当中`redis`分布式锁的出镜率也比较高,比如:“用锁遇到过哪些问题?” ,“又是如何解决的?” 基本都是一套连招问出来的。
今天就分享一下我用`redis` 分布式锁的踩坑日记,以及一些解决方案,和大家一起共勉。
#### 一、锁未被释放
这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到`redis` 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用`Jedis`客户端会报如下的错误信息
复制代码


redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool 复制代码



`redis线程池`已经没有空闲线程来处理客户端命令。
解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且`sleep`一段时间。
复制代码


public void lock() {while (true) {boolean flag = this.getLock(key);if (flag) {TODO .........} else {// 释放当前 redis 连接 redis.close();// 休眠 1000 毫秒 sleep(1000);}}}复制代码



#### 二、B的锁被A给释放了
我们知道`Redis`实现锁的原理在于 `SETNX`命令。当 `key`不存在时将 `key`的值设为 `value` ,返回值为 `1`;若给定的 `key` 已经存在,则 `SETNX`不做任何动作,返回值为 `0` 。
复制代码


SETNX key value 复制代码



我们来设想一下这个场景:`A`、`B`两个线程来尝试给`key` `myLock`加锁,`A线程`先拿到锁(假如锁`3秒`后过期),`B线程`就在等待尝试获取锁,到这一点毛病没有。
那如果此时业务逻辑比较耗时,执行时间已经超过`redis`锁过期时间,这时`A线程`的锁自动释放(删除`key`),`B线程`检测到`myLock`这个`key`不存在,执行 `SETNX`命令也拿到了锁。
但是,此时`A线程`执行完业务逻辑之后,还是会去释放锁(删除`key`),这就导致`B线程`的锁被`A线程`给释放了。
为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的`value`值来标识,只释放指定`value`的`key`,否则就会出现释放锁混乱的场景。
#### 三、数据库事务超时
emm~ 聊`redis`锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:
复制代码


@Transactionpublic void lock() {


    while (true) {        boolean flag = this.getLock(key);        if (flag) {            insert();        }    }}
复制代码


复制代码



给这个方法添加一个`@Transaction`注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。
比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。
一旦你的`key`长时间获取不到锁,获取锁`等待的时间`远超过数据库事务`超时时间`,程序就会报异常。
一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。
复制代码


@AutowiredDataSourceTransactionManager dataSourceTransactionManager;
@Transactionpublic void lock() { //手动开启事务 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { while (true) { boolean flag = this.getLock(key); if (flag) { insert(); //手动提交事务 dataSourceTransactionManager.commit(transactionStatus); } } } catch (Exception e) { //手动回滚事务 dataSourceTransactionManager.rollback(transactionStatus); }}
复制代码


复制代码



#### 四、锁过期了,业务还没执行完
这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。
同样是`redis`分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,**把`redis`锁的过期时间再弄长点不就解决了吗?**
那还是有问题,我们可以在加锁的时候,手动调长`redis`锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。
**要是`redis`锁的过期时间能够自动续期就好了。**
为了解决这个问题我们使用`redis`客户端`redisson`,`redisson`很好的解决了`redis`在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对`Redis`的关注,将更多精力用在处理业务逻辑上。
`redisson`对分布式锁做了很好封装,只需调用`API`即可。
复制代码


RLock lock = redissonClient.getLock("stockLock");复制代码



`redisson`在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对`过期时间`进行续期。默认过期时间30秒。这个机制也被叫做:“`看门狗`”,这名字。。。
**举例子**:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。
通过分析下边`redisson`的源码实现可以发现,不管是`加锁`、`解锁`、`续约`都是客户端把一些复杂的业务逻辑,通过封装在`Lua`脚本中发送给`redis`,保证这段复杂业务逻辑执行的`原子性`。
复制代码


@Slf4j@Servicepublic class RedisDistributionLockPlus {


/** * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象 */private static final long DEFAULT_LOCK_TIMEOUT = 30;
private static final long TIME_SECONDS_FIVE = 5 ;
/** * 每个key的过期时间 {@link LockContent} */private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
/** * redis执行成功的返回 */private static final Long EXEC_SUCCESS = 1L;
/** * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间 */private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " + "if redis.call('exists', KEYS[1]) == 0 then " + "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " + "for k, v in pairs(t) do " + "if v == 'OK' then return tonumber(ARGV[2]) end " + "end " + "return 0 end";
/** * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout */private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "local ctime = tonumber(ARGV[2]) " + "local biz_timeout = tonumber(ARGV[3]) " + "if ctime > 0 then " + "if redis.call('exists', KEYS[2]) == 1 then " + "local avg_time = redis.call('get', KEYS[2]) " + "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " + "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " + "else redis.call('del', KEYS[2]) end " + "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " + "end " + "return redis.call('del', KEYS[1]) " + "else return 0 end";/** * 续约lua脚本 */private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
private final StringRedisTemplate redisTemplate;
public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; ScheduleTask task = new ScheduleTask(this, lockContentMap); // 启动定时任务 ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);}
/** * 加锁 * 取到锁加锁,取不到锁一直等待知道获得锁 * * @param lockKey * @param requestId 全局唯一 * @param expire 锁过期时间, 单位秒 * @return */public boolean lock(String lockKey, String requestId, long expire) { log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId); for (; ; ) { // 判断是否已经有线程持有锁,减少redis的压力 LockContent lockContentOld = lockContentMap.get(lockKey); boolean unLocked = null == lockContentOld; // 如果没有被锁,就获取锁 if (unLocked) { long startTime = System.currentTimeMillis(); // 计算超时时间 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire; String lockKeyRenew = lockKey + "_renew";
RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class); List<String> keys = new ArrayList<>(); keys.add(lockKey); keys.add(lockKeyRenew); Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire)); if (null != lockExpire && lockExpire > 0) { // 将锁放入map LockContent lockContent = new LockContent(); lockContent.setStartTime(startTime); lockContent.setLockExpire(lockExpire); lockContent.setExpireTime(startTime + lockExpire * 1000); lockContent.setRequestId(requestId); lockContent.setThread(Thread.currentThread()); lockContent.setBizExpire(bizExpire); lockContent.setLockCount(1); lockContentMap.put(lockKey, lockContent); log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId); return true; } } // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁 if (Thread.currentThread() == lockContentOld.getThread() && requestId.equals(lockContentOld.getRequestId())){ // 计数 +1 lockContentOld.setLockCount(lockContentOld.getLockCount()+1); return true; }
// 如果被锁或获取锁失败,则等待100毫秒
复制代码

最后:学习总结——MyBtis 知识脑图(纯手绘 xmind 文档)

学完之后,若是想验收效果如何,其实最好的方法就是可自己去总结一下。比如我就会在学习完一个东西之后自己去手绘一份 xmind 文件的知识梳理大纲脑图,这样也可方便后续的复习,且都是自己的理解,相信随便瞟几眼就能迅速过完整个知识,脑补回来。下方即为我手绘的 MyBtis 知识脑图,由于是 xmind 文件,不好上传,所以小编将其以图片形式导出来传在此处,细节方面不是特别清晰。但可给感兴趣的朋友提供完整的 MyBtis 知识脑图原件(包括上方的面试解析 xmind 文档)



除此之外,前文所提及的 Alibaba 珍藏版 mybatis 手写文档以及一本小小的 MyBatis 源码分析文档——《MyBatis 源码分析》等等相关的学习笔记文档,也皆可分享给认可的朋友!


资料领取方式:戳这里免费下载

用户头像

还未添加个人签名 2021.07.29 加入

还未添加个人简介

评论

发布
暂无评论
2021年Java知识体系总结,部门老大:redis-分布式锁再这么用