写点什么

【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件

作者:洛神灬殇
  • 2023-05-04
    江苏
  • 本文字数:9304 字

    阅读完需:约 31 分钟

【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件

分布式锁的前提介绍

因为分布式系统之间是不同进程的,单机版的锁无法满足要求。所以我们可以借助中间件 Redis 的 setnx()命令实现分布式锁。setnx()命令只会对不存在的 key 设值,返回 1 代表获取锁成功。

分布式锁的基础要点

分布式锁的特性是排他、避免死锁、高可用。

分布式锁的实现原理

分布式锁的实现可以通过数据库的乐观锁(通过版本号)或者悲观锁(通过 for update)、Redis 的 setnx()命令、Zookeeper(在某个持久节点添加临时有序节点,判断当前节点是否是序列中最小的节点,如果不是则监听比当前节点还要小的节点。如果是,获取锁成功。当被监听的节点释放了锁(也就是被删除),会通知当前节点。然后当前节点再尝试获取锁,如此反复)。

Zookeeper 的分布式锁原理

  • Zookeeper(在某个持久节点添加临时有序节点,判断当前节点是否是序列中最小的节点,如果不是则监听比当前节点还要小的节点。如果是,获取锁成功。

  • 当被监听的节点释放了锁(也就是被删除),会通知当前节点。然后当前节点再尝试获取锁,如此反复)

数据库的分布式锁原理

如果获取锁的逻辑只有这三行代码的话,会造成死循环,明显不符合分布式锁的特性。


我们知道分布式锁的特性是排他、避免死锁、高可用。分布式锁的实现可以通过数据库的乐观锁(通过版本号)或者悲观锁(通过 for update)。

Redis 的分布式锁原理

  • Redis 对存在的 key 设值,会返回 0 代表获取锁失败。这里的 value 是 System.currentTimeMillis() (获取锁的时间)+锁持有的时间。

  • 这里设置锁持有的时间是 200ms,实际业务执行的时间远比这 200ms 要多的多,持有锁的客户端应该检查锁是否过期,保证锁在释放之前不会过期。因为客户端故障的情况可能是很复杂的。

分布式案例分析
  • 比如现在有 A,B 俩个客户端。A 客户端获取了锁,执行业务中做了骚操作导致阻塞了很久,时间应该远远超过 200ms,当 A 客户端从阻塞状态下恢复继续执行业务代码时,A 客户端持有的锁由于过期已经被其他客户端占有。这时候 A 客户端执行释放锁的操作,那么有可能释放掉其他客户端的锁。

  • 这里设置的客户端等待锁的时间是 200ms。这里通过轮询的方式去让客户端获取锁。如果客户端在 200ms 之内没有锁的话,直接返回 false。实际场景要设置合适的客户端等待锁的时间,避免消耗 CPU 资源。


接下来我们就要用 redis 去开发一个我们自己的一个常用的分布式锁的组件。

总体设计结构图



引用 Maven 配置

首先我们先进行配置相关的 maven 的依赖,这些依赖呢大家选择性进行使用即可。


  <dependencies>    <dependency>      <groupId>org.springframework.boot</groupId>      <artifactId>spring-boot-starter-aop</artifactId>    </dependency>    <dependency>      <groupId>junit</groupId>      <artifactId>junit</artifactId>      <version>4.11</version>      <scope>test</scope>    </dependency>    <dependency>      <groupId>org.redisson</groupId>      <artifactId>redisson-spring-boot-starter</artifactId>      <version>3.13.3</version>    </dependency>    <dependency>        <groupId>com.fengwenyi</groupId>        <artifactId>JavaLib</artifactId>        <version>2.1.6</version>     </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>            <version>1.18.20</version>        </dependency>        <!--joda-->        <dependency>            <groupId>joda-time</groupId>            <artifactId>joda-time</artifactId>            <version>2.9.1</version>        </dependency>         <dependency>            <groupId>org.apache.commons</groupId>            <artifactId>commons-lang3</artifactId>            <version>3.8.1</version>        </dependency>        <dependency>            <groupId>com.google.guava</groupId>            <artifactId>guava</artifactId>            <version>31.0-jre</version>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.78</version>        </dependency>        <dependency>            <groupId>cn.hutool</groupId>            <artifactId>hutool-all</artifactId>            <version>5.5.8</version>        </dependency>  </dependencies>
复制代码

建立分布式锁的参数模型

构建分布式锁的参数模型类:DistributeLockParam。


@Datapublic class DistributeLockParam {
private String lockUUid;
private String lockNamePrefix; private Long expireTime;
private Long waitTime;
private TimeUnit timeUnit;
private String delimiter;
private DistributeLockType lockType;}
复制代码


参数的一个大概的一个分析介绍:


  • lockUUid:分布式锁的唯一 ID 主键标识,作为主键操作。

  • lockNamePrefix:锁名称的前缀,用于作为查询锁状态的标准,

  • expireTime:为了防止死锁,我们需要加入一个参数作为过期时间,防止系统宕机后,或者长时间占用进行资源不释放的问题。

  • waitTime:与过期时间不同,等待时间作为锁需要占用或者其他线程会等待获取锁的时间。

  • timeUnit:等待时间和过期时间的时间单位

  • delimiter:锁标识 key 的分隔符,redis 而言一般采用”:“的方式进行控制。

  • lockType:锁的类型。




所以还需要定义分布式锁类型:


public enum DistributeLockType {

/** * 重入锁 */ REENTRANT_LOCK,
/** * 非公平锁 */ FAIR_LOCK,
/** * 联和锁 */ MULTI_LOCK,
/** * 红锁 */ RED_LOCK,
/** * 读写锁 */ READ_WRITE_LOCK, ;}
复制代码

定义分布式锁的核心接口

接下来我们要定义一下分布式锁的核心逻辑接口 DistributeLockSupport。


public interface DistributeLockSupport<T> {
/** * 默认的分隔符 */ String DEFAULT_DELIMTER = ":";
String DEFAULT_KEY_PREFIX = "LOCK";

Long DEFAULT_EXPIRE_TIME = 10L;

Long DEFAULT_WAIT_TIME = 10L;

Joiner DEFAULT_JOINER = Joiner.on(DistributeLockSupport.DEFAULT_DELIMTER). skipNulls(); /** * 加锁 * @param distributeLockParam * @return */ T lock(DistributeLockParam distributeLockParam);
/** * 解锁 * @param distributeLockParam */ void unlock(T param, DistributeLockParam distributeLockParam);
}
复制代码


其中前四个属性静态常量值主要作用是给我们的分布式所提供默认值。


    /**     * 默认的分隔符     */    String DEFAULT_DELIMTER = ":";      String DEFAULT_KEY_PREFIX = "LOCK";
Long DEFAULT_EXPIRE_TIME = 10L;
Long DEFAULT_WAIT_TIME = 10L;
复制代码

分别代表

  • 分布是所的键值的分割符。

  • 默认的 key 的前缀。

  • 还有就是锁的过期时间和等待时间。


这里我们采用了 Guava 的连接器,进行我们的特殊风格符的连接。


    Joiner DEFAULT_JOINER = Joiner.on(DistributeLockSupport.DEFAULT_DELIMTER).            skipNulls();
复制代码

业务加锁和解锁方法

主要用于枷锁和解锁我们的分布式锁。


    /**     * 加锁     * @param distributeLockParam     * @return     */    T lock(DistributeLockParam distributeLockParam);
/** * 解锁 * @param distributeLockParam */ void unlock(T param, DistributeLockParam distributeLockParam);
复制代码

定义分布式锁的键 Key 生成接口

接下来主要去定一个接口,专门为我们生成不同样式,不同格式的键值,进行一个扩展的一个接口(LockKeyGenerator)。


public interface LockKeyGenerator {    String getLockKey(ProceedingJoinPoint pjp);}
复制代码


可以看到啊对应的参数是 AOP 的一个代理参数:ProceedingJoinPoint, 这也被我们后面进行批处理奠定一定的基础。

定义分布式锁的异常类

主要用于定义分布式锁的异常输出类:RedisDistributedLockException。


public class RedisDistributedLockException extends RuntimeException {
private String key;
public RedisDistributedLockException (String key) { super("key [" + key + "] tryLock fail"); this.key = key; }
public RedisDistributedLockException (String key, String errorMessage) { super("key [" + key + "] tryLock fail error message :" + errorMessage); this.key = key; }}
复制代码


可以看到我们的该类是实现了 RuntimeException 的运行时异常类。

定义我们分布式锁的基础抽象类

接下来我们就定一下我们分布式锁的一个基础抽象类:AbstractDistributeLockSupport。这个类主要实现了我们之前的那个接口 DistributeLockSupport。


public abstract class AbstractDistributeLockSupport<T> implements DistributeLockSupport<T> {


/** * 检验参数 * @param distributeLockParam * @return */ protected DistributeLockParam fullDistributeDefaultValue(DistributeLockParam distributeLockParam){ Preconditions.checkNotNull(distributeLockParam,"检测到了参数不允许为空!"); DistributeLockType distributeLockType = distributeLockParam.getLockType(); distributeLockParam.setLockType(Optional.ofNullable(distributeLockType).orElse(DistributeLockType.FAIR_LOCK)); distributeLockParam.setExpireTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_EXPIRE_TIME)); distributeLockParam.setWaitTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_WAIT_TIME)); distributeLockParam.setTimeUnit(Optional.ofNullable(distributeLockParam.getTimeUnit()).orElse(TimeUnit.SECONDS)); return distributeLockParam; }

/** * 构建相关的锁key值 * @param distributeLockParam * @return */ protected String buildLockKey(DistributeLockParam distributeLockParam){ String lockId = StringUtils.defaultIfEmpty(distributeLockParam.getLockUUid(), UUID.fastUUID().toString()); distributeLockParam.setLockUUid(lockId); String delmiter = StringUtils.defaultIfEmpty(distributeLockParam.getDelimiter(), DEFAULT_DELIMTER); distributeLockParam.setDelimiter(delmiter); String prefix = StringUtils.defaultIfEmpty(distributeLockParam .getLockNamePrefix(),DEFAULT_KEY_PREFIX); distributeLockParam.setLockNamePrefix(prefix); String lockFullName = ""; if(!delmiter.equals(DEFAULT_DELIMTER)){ //todo 待优化 Joiner joiner = Joiner.on(delmiter).skipNulls(); lockFullName = joiner.join(prefix,lockId); }else{ lockFullName = DEFAULT_JOINER.join(prefix,lockId); } return lockFullName; }
复制代码


该类主要包含两个方法。分别是 fullDistributeDefaultValue 和 buildLockKey。

fullDistributeDefaultValue 方法

这个方法主要的目的是为了校验以及填充一些我们没有写的参数的默认值。


 protected DistributeLockParam fullDistributeDefaultValue(DistributeLockParam distributeLockParam){        Preconditions.checkNotNull(distributeLockParam,"检测到了参数不允许为空!");        DistributeLockType distributeLockType = distributeLockParam.getLockType();        distributeLockParam.setLockType(Optional.ofNullable(distributeLockType).orElse(DistributeLockType.FAIR_LOCK));        distributeLockParam.setExpireTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_EXPIRE_TIME));        distributeLockParam.setWaitTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_WAIT_TIME));        distributeLockParam.setTimeUnit(Optional.ofNullable(distributeLockParam.getTimeUnit()).orElse(TimeUnit.SECONDS));        return distributeLockParam;}
复制代码

buildLockKey 方法

该类主要负责的是构建我们的分布式锁的 key


 protected String buildLockKey(DistributeLockParam distributeLockParam){        String lockId = StringUtils.defaultIfEmpty(distributeLockParam.getLockUUid(),                        UUID.fastUUID().toString());        distributeLockParam.setLockUUid(lockId);        String delmiter = StringUtils.defaultIfEmpty(distributeLockParam.getDelimiter(),                            DEFAULT_DELIMTER);        distributeLockParam.setDelimiter(delmiter);        String prefix = StringUtils.defaultIfEmpty(distributeLockParam                .getLockNamePrefix(),DEFAULT_KEY_PREFIX);        distributeLockParam.setLockNamePrefix(prefix);        String lockFullName = "";        if(!delmiter.equals(DEFAULT_DELIMTER)){            //todo 待优化            Joiner joiner = Joiner.on(delmiter).skipNulls();            lockFullName = joiner.join(prefix,lockId);        }else{            lockFullName = DEFAULT_JOINER.join(prefix,lockId);        }        return lockFullName;}
复制代码


从在马上可以看出来,他主要就是将我们之前的那些所有的属性进行连接拼接到一起。


至此,我们的基础组建部分的抽象部分就已经完成了。那么接下来呢我们需要进行一个实现 Redis 模式下的分布式锁。



定义 Redis 分布式锁的注解

主要用于 AOP 的一个拦截以及获取一些特定化的参数。

RedisDistributedLock 注解

@Target(ElementType.METHOD)@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic @interface RedisDistributedLock {
String prefix() default "";
/** * 锁过期时间 */ int expireTime() default 30;
/** * 获取锁等待时间 */ int waitTime() default 10;
TimeUnit timeUnit() default TimeUnit.SECONDS;
String delimiter() default ":";
LockCategory category() default LockCategory.COMMON;}
复制代码

RedisDistributedLockParam 注解

主要用于参数方法上的修饰,获取参数相关的一些主要用于参数方法上的修饰,获取参数相关的一些参数值作为我们的分布式主键的 key。


@Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD})@Retention(RetentionPolicy.RUNTIME)@Documented@Inheritedpublic @interface RedisDistributedLockParam {    String name() default "";}
复制代码

实现分布式锁的唯一键的生成器

我们定义分布式锁组件抽象接口的 redis 版本为 RedisDistributedLockKeyGenerator。


public class RedisDistributedLockKeyGenerator  implements LockKeyGenerator {

@Override public String getLockKey(ProceedingJoinPoint pjp) { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); RedisDistributedLock lockAnnotation = method.getAnnotation(RedisDistributedLock.class); final Object[] args = pjp.getArgs(); final Parameter[] parameters = method.getParameters(); StringBuilder builder = new StringBuilder(); // 默认解析方法里面带 CacheParam 注解的属性,如果没有尝试着解析实体对象中的 for (int i = 0; i < parameters.length; i++) { final RedisDistributedLockParam annotation = parameters[i].getAnnotation(RedisDistributedLockParam.class); if (annotation == null) { continue; } builder.append(lockAnnotation.delimiter()).append(args[i]); } if (StringUtils.isEmpty(builder.toString())) { final Annotation[][] parameterAnnotations = method.getParameterAnnotations(); for (int i = 0; i < parameterAnnotations.length; i++) { final Object object = args[i]; final Field[] fields = object.getClass().getDeclaredFields(); for (Field field : fields) { final RedisDistributedLockParam annotation = field.getAnnotation(RedisDistributedLockParam.class); if (annotation == null) { continue; } field.setAccessible(true); builder.append(lockAnnotation.delimiter()).append(ReflectionUtils.getField(field, object)); } } } return lockAnnotation.prefix() + builder.toString(); }}
复制代码


上面的码主要是用鱼上面的代码主要是用鱼去提取注解上的参数以及一些呃参数的 key 的一个基本信息。

实现分布式锁实现类 RedisDistributeLockSupport

RedisDistributeLockSupport 主要实现了我们的抽象分布式锁的核心业务接口。


其中使用了 Redisson 的 RedissonClient 客户端服务,从而进行选择类型,进行选择分布式锁的方式。


@Slf4j@Componentpublic class RedisDistributeLockSupport extends AbstractDistributeLockSupport<RLock> {
@Autowired RedissonClient redissonClient;
/** * 非阻塞方式锁 * @param distributeLockParam * @return */ @Override public RLock lock(DistributeLockParam distributeLockParam) { distributeLockParam = fullDistributeDefaultValue(distributeLockParam); String lockKey = buildLockKey(distributeLockParam); RLock rLock = null; try { switch (distributeLockParam.getLockType()) { // 可重入锁 case REENTRANT_LOCK: { rLock = redissonClient.getLock(lockKey); break; } // 非公平锁 case FAIR_LOCK: { rLock = redissonClient.getFairLock(lockKey); break; } default: { throw new UnsupportedOperationException("暂时不支持此种方式的锁!"); } } Boolean result = rLock.tryLock(distributeLockParam.getWaitTime(), distributeLockParam.getExpireTime(), distributeLockParam.getTimeUnit()); return rLock; } catch (InterruptedException e) { log.error("加锁为阻塞模式下的锁进行失败!", e); return rLock; } } @Override public void unlock(RLock param, DistributeLockParam distributeLockParam) { try { param.unlock(); } catch (Exception e) { log.error("解我操!啊?锁为阻塞模式下的锁进行失败!", e); } }}
复制代码


可以根据我们所的类型选择。公平锁或者是和重入锁。

实现分布式锁实现类 RedisDistributedLockAspect 切面类

@Aspect@Order(4)@Slf4jpublic class RedisDistributedLockAspect {

@Autowired private RedissonClient redissonClient;
@Autowired private RedisDistributedLockKeyGenerator redisDistributedLockKeyGenerator;

@Around("execution(public * *(..)) && @annotation(com.hyts.assemble.distributeLock.redis.RedisDistributedLock)") public Object interceptor(ProceedingJoinPoint pjp) throws Throwable { MethodSignature signature = (MethodSignature) pjp.getSignature(); Method method = signature.getMethod(); RedisDistributedLock redisDistributedLock = method.getAnnotation(RedisDistributedLock.class);
if (StringUtils.isEmpty(redisDistributedLock.prefix())) { throw new RuntimeException("lock key can't be null..."); }
final String lockKey = redisDistributedLockKeyGenerator.getLockKey(pjp); RLock lock = chooseLock(redisDistributedLock,lockKey); //key不存在才能设置成功 Boolean success = null; Object proceed = null;
try { success = lock.tryLock(redisDistributedLock.waitTime(), redisDistributedLock.expireTime(), redisDistributedLock.timeUnit()); if (success) { log.debug("tryLock success key [{}]", lockKey); proceed = pjp.proceed(); } else { log.error("key is : {" + lockKey + "} tryLock fail "); throw new RedisDistributedLockException(lockKey); } } catch (InterruptedException e) { log.error("key is : {" + lockKey + "} tryLock error ", e); throw new RedisDistributedLockException(lockKey, e.getMessage()); } finally { if (success) { log.debug("unlock [{}]", "key:" + lockKey); lock.unlock(); } } return proceed; }

private RLock chooseLock(RedisDistributedLock redisDistributedLock, String lockName) { LockCategory category = redisDistributedLock.category(); switch (category) { case FAIR: return redissonClient.getFairLock(lockName); } return redissonClient.getLock(lockName); }}
复制代码


发布于: 刚刚阅读数: 2
用户头像

洛神灬殇

关注

🏆 InfoQ写作平台-签约作者 🏆 2020-03-25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【分布式技术专题】「分布式技术架构」手把手教你如何开发一个属于自己的分布式锁的功能组件_redis_洛神灬殇_InfoQ写作社区