写点什么

Redis 实现分布式锁

用户头像
黄敏
关注
发布于: 刚刚

一、抛出问题

1.为什么加锁?

加锁是在多线程并发访问的情况下,为了保证同一块代码只能有一个线程执行该代码。

Java 提供了 synchronized 语法以及 ReentrantLock 等等来保证

2.什么是分布式锁?

上述加锁可以保证在同一个 jvm 里多个线程的同步执行,但是如果在分布式集群环境中不同节点的线程同步就无法满足了。

所以分布式锁就是为了满足在分布式集群环境中保证多个节点只能有一个节点执行想要同步的代码。

3.分布式锁有哪些?

1、基于数据库的乐观锁

2、基于 redis 的分布式锁

3、基于 Zookeeper 的分布式锁

二、实现

只针对 redis 实现分布式锁做讨论

可靠性

首先,为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。

  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

  • 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。

  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

Redis 锁主要利用 Redis 的 setnx 命令。

  • 加锁命令:SETNX key value,当键不存在时,对键进行设置操作并返回成功,否则返回失败。KEY 是锁的唯一标识,一般按业务来决定命名。

  • 解锁命令:DEL key,通过删除键值对释放锁,以便其他线程可以通过 SETNX 命令来获取锁。

  • 锁超时:EXPIRE key timeout, 设置 key 的超时时间,以保证即使锁没有被显式释放,锁也可以在一定时间后自动释放,避免资源被永远锁住。

Redis 分布式锁姿势不对可能带来的问题

1. SETNX 和 EXPIRE 非原子性

如果 SETNX 成功,在设置锁超时时间后,服务器挂掉、重启或网络问题等,导致 EXPIRE 命令没有执行,锁没有设置超时时间变成死锁。

2. 锁误解除

如果线程 A 成功获取到了锁,并且设置了过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁;随后 A 执行完成,线程 A 使用 DEL 命令来释放锁,但此时线程 B 加的锁还没有执行完成,线程 A 实际释放的线程 B 加的锁。

3. 超时解锁导致并发

如果线程 A 成功获取锁并设置过期时间 30 秒,但线程 A 执行时间超过了 30 秒,锁过期自动释放,此时线程 B 获取到了锁,线程 A 和线程 B 并发执行。

A、B 两个线程发生并发显然是不被允许的,一般有两种方式解决该问题:

  • 将过期时间设置足够长,确保代码逻辑在锁释放之前能够执行完成。

  • 为获取锁的线程增加守护线程,为将要过期但未释放的锁增加有效时间。

轮子 1:Redis 实现的分布式锁

SpringBoot + Redis + AOP 的组合来实现一个简易的分布式锁

step1. 自定义注解

自定义一个注解,被注解的方法会执行获取分布式锁的逻辑

/** * @author Pecker * @Description 分布式锁注解 * @since 2020-04-23 */public @interface RedisLock {    /**     * 业务键     *     * @return     */    String key();    /**     * 锁的过期秒数,默认是5秒     *     * @return     */    long expire() default 30L;
/** * 锁的超时时间单位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS;}
复制代码
step2. AOP 拦截器实现

在 AOP 中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:

/** * @author Pecker * @Description 对标记了@RedisLock 注解的类进行切面处理,实现分布式锁 * @since 2020-09-03 */@Slf4j@Aspect@Componentpublic class RedisLockAspect {    @Autowired    private StringRedisTemplate redisTemplate;
@Pointcut("@annotation(com.example.demo.annotation.RedisLock)") public void proxyAspect() { }
@Around("@annotation(redisLock)") public Object doAround(ProceedingJoinPoint joinPoint, RedisLock redisLock) throws Throwable { //保证执行finally 释放锁是释放的自己的锁 String redisValue = UUID.randomUUID().toString(); try { //原子操作 Boolean result = redisTemplate.opsForValue() .setIfAbsent(redisLock.key(), redisValue, redisLock.expire(), redisLock.timeUnit()); if (!result){ //加锁失败,已有任务成功加锁 return null; } Object o = joinPoint.proceed(); return o; } finally { //解锁 if (redisValue.equals(redisTemplate.opsForValue().get(redisValue))) { redisTemplate.delete(redisLock.key()); } } }}
复制代码


step3.场景示例,@Scheduled 执行定时任务
@RedisLock@Scheduled(cron = "0 0/20 * * * ?")public void syncSolrData() {    //定时任务执行业务逻辑}
复制代码

轮子 2:Jedis 方式实现的分布式锁

SpringBoot + Jedis + AOP 的组合来实现一个简易的分布式锁

step1. 自定义注解

自定义一个注解,被注解的方法会执行获取分布式锁的逻辑

@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic @interface RedisLock {    /**     * 业务键     *     * @return     */    String key();    /**     * 锁的过期秒数,默认是5秒     *     * @return     */    int expire() default 5;
/** * 尝试加锁,最多等待时间 * * @return */ long waitTime() default Long.MIN_VALUE; /** * 锁的超时时间单位 * * @return */ TimeUnit timeUnit() default TimeUnit.SECONDS;}
复制代码
step2. AOP 拦截器实现

在 AOP 中我们去执行获取分布式锁和释放分布式锁的逻辑,代码如下:

@Aspect@Componentpublic class LockMethodAspect {    @Autowired    private RedisLockHelper redisLockHelper;    @Autowired    private JedisUtil jedisUtil;    private Logger logger = LoggerFactory.getLogger(LockMethodAspect.class);
@Around("@annotation(com.redis.lock.annotation.RedisLock)") public Object around(ProceedingJoinPoint joinPoint) { Jedis jedis = jedisUtil.getJedis(); MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod();
RedisLock redisLock = method.getAnnotation(RedisLock.class); String value = UUID.randomUUID().toString(); String key = redisLock.key(); try { final boolean islock = redisLockHelper.lock(jedis,key, value, redisLock.expire(), redisLock.timeUnit()); logger.info("isLock : {}",islock); if (!islock) { logger.error("获取锁失败"); throw new RuntimeException("获取锁失败"); } try { return joinPoint.proceed(); } catch (Throwable throwable) { throw new RuntimeException("系统异常"); } } finally { logger.info("释放锁"); redisLockHelper.unlock(jedis,key, value); jedis.close(); } }}
复制代码


step3. Redis 实现分布式锁核心类
@Componentpublic class RedisLockHelper {    private long sleepTime = 100;    /**     * 直接使用setnx + expire方式获取分布式锁     * 非原子性     *     * @param key     * @param value     * @param timeout     * @return     */    public boolean lock_setnx(Jedis jedis,String key, String value, int timeout) {        Long result = jedis.setnx(key, value);        // result = 1时,设置成功,否则设置失败        if (result == 1L) {            return jedis.expire(key, timeout) == 1L;        } else {            return false;        }    }
/** * 使用Lua脚本,脚本中使用setnex+expire命令进行加锁操作 * * @param jedis * @param key * @param UniqueId * @param seconds * @return */ public boolean Lock_with_lua(Jedis jedis,String key, String UniqueId, int seconds) { String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" + "redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end"; List<String> keys = new ArrayList<>(); List<String> values = new ArrayList<>(); keys.add(key); values.add(UniqueId); values.add(String.valueOf(seconds)); Object result = jedis.eval(lua_scripts, keys, values); //判断是否成功 return result.equals(1L); }
/** * 在Redis的2.6.12及以后中,使用 set key value [NX] [EX] 命令 * * @param key * @param value * @param timeout * @return */ public boolean lock(Jedis jedis,String key, String value, int timeout, TimeUnit timeUnit) { long seconds = timeUnit.toSeconds(timeout); return "OK".equals(jedis.set(key, value, "NX", "EX", seconds)); }
/** * 自定义获取锁的超时时间 * * @param jedis * @param key * @param value * @param timeout * @param waitTime * @param timeUnit * @return * @throws InterruptedException */ public boolean lock_with_waitTime(Jedis jedis,String key, String value, int timeout, long waitTime,TimeUnit timeUnit) throws InterruptedException { long seconds = timeUnit.toSeconds(timeout); while (waitTime >= 0) { String result = jedis.set(key, value, "nx", "ex", seconds); if ("OK".equals(result)) { return true; } waitTime -= sleepTime; Thread.sleep(sleepTime); } return false; } /** * 错误的解锁方法—直接删除key * * @param key */ public void unlock_with_del(Jedis jedis,String key) { jedis.del(key); }
/** * 使用Lua脚本进行解锁操纵,解锁的时候验证value值 * * @param jedis * @param key * @param value * @return */ public boolean unlock(Jedis jedis,String key,String value) { String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " + "return redis.call('del',KEYS[1]) else return 0 end"; return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L); }}
复制代码


参考

用户头像

黄敏

关注

还未添加个人签名 2019.11.30 加入

熟悉Java并发编程、数据结构和算法、FFmpeg音视频处理技术

评论

发布
暂无评论
Redis 实现分布式锁