分布式锁的前提介绍
因为分布式系统之间是不同进程的,单机版的锁无法满足要求。所以我们可以借助中间件 Redis 的 setnx()命令实现分布式锁。setnx()命令只会对不存在的 key 设值,返回 1 代表获取锁成功。
分布式锁的基础要点
分布式锁的特性是排他、避免死锁、高可用。
分布式锁的实现原理
分布式锁的实现可以通过数据库的乐观锁(通过版本号)或者悲观锁(通过 for update)、Redis 的 setnx()命令、Zookeeper(在某个持久节点添加临时有序节点,判断当前节点是否是序列中最小的节点,如果不是则监听比当前节点还要小的节点。如果是,获取锁成功。当被监听的节点释放了锁(也就是被删除),会通知当前节点。然后当前节点再尝试获取锁,如此反复)。
Zookeeper 的分布式锁原理
数据库的分布式锁原理
如果获取锁的逻辑只有这三行代码的话,会造成死循环,明显不符合分布式锁的特性。
我们知道分布式锁的特性是排他、避免死锁、高可用。分布式锁的实现可以通过数据库的乐观锁(通过版本号)或者悲观锁(通过 for update)。
Redis 的分布式锁原理
分布式案例分析
比如现在有 A,B 俩个客户端。A 客户端获取了锁,执行业务中做了骚操作导致阻塞了很久,时间应该远远超过 200ms,当 A 客户端从阻塞状态下恢复继续执行业务代码时,A 客户端持有的锁由于过期已经被其他客户端占有。这时候 A 客户端执行释放锁的操作,那么有可能释放掉其他客户端的锁。
这里设置的客户端等待锁的时间是 200ms。这里通过轮询的方式去让客户端获取锁。如果客户端在 200ms 之内没有锁的话,直接返回 false。实际场景要设置合适的客户端等待锁的时间,避免消耗 CPU 资源。
接下来我们就要用 redis 去开发一个我们自己的一个常用的分布式锁的组件。
总体设计结构图
引用 Maven 配置
首先我们先进行配置相关的 maven 的依赖,这些依赖呢大家选择性进行使用即可。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.3</version>
</dependency>
<dependency>
<groupId>com.fengwenyi</groupId>
<artifactId>JavaLib</artifactId>
<version>2.1.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<!--joda-->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0-jre</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.8</version>
</dependency>
</dependencies>
复制代码
建立分布式锁的参数模型
构建分布式锁的参数模型类:DistributeLockParam。
@Data
public class DistributeLockParam {
private String lockUUid;
private String lockNamePrefix;
private Long expireTime;
private Long waitTime;
private TimeUnit timeUnit;
private String delimiter;
private DistributeLockType lockType;
}
复制代码
参数的一个大概的一个分析介绍:
lockUUid:分布式锁的唯一 ID 主键标识,作为主键操作。
lockNamePrefix:锁名称的前缀,用于作为查询锁状态的标准,
expireTime:为了防止死锁,我们需要加入一个参数作为过期时间,防止系统宕机后,或者长时间占用进行资源不释放的问题。
waitTime:与过期时间不同,等待时间作为锁需要占用或者其他线程会等待获取锁的时间。
timeUnit:等待时间和过期时间的时间单位
delimiter:锁标识 key 的分隔符,redis 而言一般采用”:“的方式进行控制。
lockType:锁的类型。
所以还需要定义分布式锁类型:
public enum DistributeLockType {
/**
* 重入锁
*/
REENTRANT_LOCK,
/**
* 非公平锁
*/
FAIR_LOCK,
/**
* 联和锁
*/
MULTI_LOCK,
/**
* 红锁
*/
RED_LOCK,
/**
* 读写锁
*/
READ_WRITE_LOCK,
;
}
复制代码
定义分布式锁的核心接口
接下来我们要定义一下分布式锁的核心逻辑接口 DistributeLockSupport。
public interface DistributeLockSupport<T> {
/**
* 默认的分隔符
*/
String DEFAULT_DELIMTER = ":";
String DEFAULT_KEY_PREFIX = "LOCK";
Long DEFAULT_EXPIRE_TIME = 10L;
Long DEFAULT_WAIT_TIME = 10L;
Joiner DEFAULT_JOINER = Joiner.on(DistributeLockSupport.DEFAULT_DELIMTER).
skipNulls();
/**
* 加锁
* @param distributeLockParam
* @return
*/
T lock(DistributeLockParam distributeLockParam);
/**
* 解锁
* @param distributeLockParam
*/
void unlock(T param, DistributeLockParam distributeLockParam);
}
复制代码
其中前四个属性静态常量值主要作用是给我们的分布式所提供默认值。
/**
* 默认的分隔符
*/
String DEFAULT_DELIMTER = ":";
String DEFAULT_KEY_PREFIX = "LOCK";
Long DEFAULT_EXPIRE_TIME = 10L;
Long DEFAULT_WAIT_TIME = 10L;
复制代码
分别代表
分布是所的键值的分割符。
默认的 key 的前缀。
还有就是锁的过期时间和等待时间。
这里我们采用了 Guava 的连接器,进行我们的特殊风格符的连接。
Joiner DEFAULT_JOINER = Joiner.on(DistributeLockSupport.DEFAULT_DELIMTER).
skipNulls();
复制代码
业务加锁和解锁方法
主要用于枷锁和解锁我们的分布式锁。
/**
* 加锁
* @param distributeLockParam
* @return
*/
T lock(DistributeLockParam distributeLockParam);
/**
* 解锁
* @param distributeLockParam
*/
void unlock(T param, DistributeLockParam distributeLockParam);
复制代码
定义分布式锁的键 Key 生成接口
接下来主要去定一个接口,专门为我们生成不同样式,不同格式的键值,进行一个扩展的一个接口(LockKeyGenerator)。
public interface LockKeyGenerator {
String getLockKey(ProceedingJoinPoint pjp);
}
复制代码
可以看到啊对应的参数是 AOP 的一个代理参数:ProceedingJoinPoint, 这也被我们后面进行批处理奠定一定的基础。
定义分布式锁的异常类
主要用于定义分布式锁的异常输出类:RedisDistributedLockException。
public class RedisDistributedLockException extends RuntimeException {
private String key;
public RedisDistributedLockException (String key) {
super("key [" + key + "] tryLock fail");
this.key = key;
}
public RedisDistributedLockException (String key, String errorMessage) {
super("key [" + key + "] tryLock fail error message :" + errorMessage);
this.key = key;
}
}
复制代码
可以看到我们的该类是实现了 RuntimeException 的运行时异常类。
定义我们分布式锁的基础抽象类
接下来我们就定一下我们分布式锁的一个基础抽象类:AbstractDistributeLockSupport。这个类主要实现了我们之前的那个接口 DistributeLockSupport。
public abstract class AbstractDistributeLockSupport<T> implements DistributeLockSupport<T> {
/**
* 检验参数
* @param distributeLockParam
* @return
*/
protected DistributeLockParam fullDistributeDefaultValue(DistributeLockParam distributeLockParam){
Preconditions.checkNotNull(distributeLockParam,"检测到了参数不允许为空!");
DistributeLockType distributeLockType = distributeLockParam.getLockType();
distributeLockParam.setLockType(Optional.ofNullable(distributeLockType).orElse(DistributeLockType.FAIR_LOCK));
distributeLockParam.setExpireTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_EXPIRE_TIME));
distributeLockParam.setWaitTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_WAIT_TIME));
distributeLockParam.setTimeUnit(Optional.ofNullable(distributeLockParam.getTimeUnit()).orElse(TimeUnit.SECONDS));
return distributeLockParam;
}
/**
* 构建相关的锁key值
* @param distributeLockParam
* @return
*/
protected String buildLockKey(DistributeLockParam distributeLockParam){
String lockId = StringUtils.defaultIfEmpty(distributeLockParam.getLockUUid(),
UUID.fastUUID().toString());
distributeLockParam.setLockUUid(lockId);
String delmiter = StringUtils.defaultIfEmpty(distributeLockParam.getDelimiter(),
DEFAULT_DELIMTER);
distributeLockParam.setDelimiter(delmiter);
String prefix = StringUtils.defaultIfEmpty(distributeLockParam
.getLockNamePrefix(),DEFAULT_KEY_PREFIX);
distributeLockParam.setLockNamePrefix(prefix);
String lockFullName = "";
if(!delmiter.equals(DEFAULT_DELIMTER)){
//todo 待优化
Joiner joiner = Joiner.on(delmiter).skipNulls();
lockFullName = joiner.join(prefix,lockId);
}else{
lockFullName = DEFAULT_JOINER.join(prefix,lockId);
}
return lockFullName;
}
复制代码
该类主要包含两个方法。分别是 fullDistributeDefaultValue 和 buildLockKey。
fullDistributeDefaultValue 方法
这个方法主要的目的是为了校验以及填充一些我们没有写的参数的默认值。
protected DistributeLockParam fullDistributeDefaultValue(DistributeLockParam distributeLockParam){
Preconditions.checkNotNull(distributeLockParam,"检测到了参数不允许为空!");
DistributeLockType distributeLockType = distributeLockParam.getLockType();
distributeLockParam.setLockType(Optional.ofNullable(distributeLockType).orElse(DistributeLockType.FAIR_LOCK));
distributeLockParam.setExpireTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_EXPIRE_TIME));
distributeLockParam.setWaitTime(Optional.ofNullable(distributeLockParam.getExpireTime()).orElse(DEFAULT_WAIT_TIME));
distributeLockParam.setTimeUnit(Optional.ofNullable(distributeLockParam.getTimeUnit()).orElse(TimeUnit.SECONDS));
return distributeLockParam;
}
复制代码
buildLockKey 方法
该类主要负责的是构建我们的分布式锁的 key。
protected String buildLockKey(DistributeLockParam distributeLockParam){
String lockId = StringUtils.defaultIfEmpty(distributeLockParam.getLockUUid(),
UUID.fastUUID().toString());
distributeLockParam.setLockUUid(lockId);
String delmiter = StringUtils.defaultIfEmpty(distributeLockParam.getDelimiter(),
DEFAULT_DELIMTER);
distributeLockParam.setDelimiter(delmiter);
String prefix = StringUtils.defaultIfEmpty(distributeLockParam
.getLockNamePrefix(),DEFAULT_KEY_PREFIX);
distributeLockParam.setLockNamePrefix(prefix);
String lockFullName = "";
if(!delmiter.equals(DEFAULT_DELIMTER)){
//todo 待优化
Joiner joiner = Joiner.on(delmiter).skipNulls();
lockFullName = joiner.join(prefix,lockId);
}else{
lockFullName = DEFAULT_JOINER.join(prefix,lockId);
}
return lockFullName;
}
复制代码
从在马上可以看出来,他主要就是将我们之前的那些所有的属性进行连接拼接到一起。
至此,我们的基础组建部分的抽象部分就已经完成了。那么接下来呢我们需要进行一个实现 Redis 模式下的分布式锁。
定义 Redis 分布式锁的注解
主要用于 AOP 的一个拦截以及获取一些特定化的参数。
RedisDistributedLock 注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisDistributedLock {
String prefix() default "";
/**
* 锁过期时间
*/
int expireTime() default 30;
/**
* 获取锁等待时间
*/
int waitTime() default 10;
TimeUnit timeUnit() default TimeUnit.SECONDS;
String delimiter() default ":";
LockCategory category() default LockCategory.COMMON;
}
复制代码
RedisDistributedLockParam 注解
主要用于参数方法上的修饰,获取参数相关的一些主要用于参数方法上的修饰,获取参数相关的一些参数值作为我们的分布式主键的 key。
@Target({ElementType.PARAMETER, ElementType.METHOD, ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface RedisDistributedLockParam {
String name() default "";
}
复制代码
实现分布式锁的唯一键的生成器
我们定义分布式锁组件抽象接口的 redis 版本为 RedisDistributedLockKeyGenerator。
public class RedisDistributedLockKeyGenerator implements LockKeyGenerator {
@Override
public String getLockKey(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
RedisDistributedLock lockAnnotation = method.getAnnotation(RedisDistributedLock.class);
final Object[] args = pjp.getArgs();
final Parameter[] parameters = method.getParameters();
StringBuilder builder = new StringBuilder();
// 默认解析方法里面带 CacheParam 注解的属性,如果没有尝试着解析实体对象中的
for (int i = 0; i < parameters.length; i++) {
final RedisDistributedLockParam annotation = parameters[i].getAnnotation(RedisDistributedLockParam.class);
if (annotation == null) {
continue;
}
builder.append(lockAnnotation.delimiter()).append(args[i]);
}
if (StringUtils.isEmpty(builder.toString())) {
final Annotation[][] parameterAnnotations = method.getParameterAnnotations();
for (int i = 0; i < parameterAnnotations.length; i++) {
final Object object = args[i];
final Field[] fields = object.getClass().getDeclaredFields();
for (Field field : fields) {
final RedisDistributedLockParam annotation = field.getAnnotation(RedisDistributedLockParam.class);
if (annotation == null) {
continue;
}
field.setAccessible(true);
builder.append(lockAnnotation.delimiter()).append(ReflectionUtils.getField(field, object));
}
}
}
return lockAnnotation.prefix() + builder.toString();
}
}
复制代码
上面的码主要是用鱼上面的代码主要是用鱼去提取注解上的参数以及一些呃参数的 key 的一个基本信息。
实现分布式锁实现类 RedisDistributeLockSupport
RedisDistributeLockSupport 主要实现了我们的抽象分布式锁的核心业务接口。
其中使用了 Redisson 的 RedissonClient 客户端服务,从而进行选择类型,进行选择分布式锁的方式。
@Slf4j
@Component
public class RedisDistributeLockSupport extends AbstractDistributeLockSupport<RLock> {
@Autowired
RedissonClient redissonClient;
/**
* 非阻塞方式锁
* @param distributeLockParam
* @return
*/
@Override
public RLock lock(DistributeLockParam distributeLockParam) {
distributeLockParam = fullDistributeDefaultValue(distributeLockParam);
String lockKey = buildLockKey(distributeLockParam);
RLock rLock = null;
try {
switch (distributeLockParam.getLockType()) {
// 可重入锁
case REENTRANT_LOCK: {
rLock = redissonClient.getLock(lockKey);
break;
}
// 非公平锁
case FAIR_LOCK: {
rLock = redissonClient.getFairLock(lockKey);
break;
}
default: {
throw new UnsupportedOperationException("暂时不支持此种方式的锁!");
}
}
Boolean result = rLock.tryLock(distributeLockParam.getWaitTime(), distributeLockParam.getExpireTime(), distributeLockParam.getTimeUnit());
return rLock;
} catch (InterruptedException e) {
log.error("加锁为阻塞模式下的锁进行失败!", e);
return rLock;
}
}
@Override
public void unlock(RLock param, DistributeLockParam distributeLockParam) {
try {
param.unlock();
} catch (Exception e) {
log.error("解我操!啊?锁为阻塞模式下的锁进行失败!", e);
}
}
}
复制代码
可以根据我们所的类型选择。公平锁或者是和重入锁。
实现分布式锁实现类 RedisDistributedLockAspect 切面类
@Aspect
@Order(4)
@Slf4j
public class RedisDistributedLockAspect {
@Autowired
private RedissonClient redissonClient;
@Autowired
private RedisDistributedLockKeyGenerator redisDistributedLockKeyGenerator;
@Around("execution(public * *(..)) && @annotation(com.hyts.assemble.distributeLock.redis.RedisDistributedLock)")
public Object interceptor(ProceedingJoinPoint pjp) throws Throwable {
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
RedisDistributedLock redisDistributedLock = method.getAnnotation(RedisDistributedLock.class);
if (StringUtils.isEmpty(redisDistributedLock.prefix())) {
throw new RuntimeException("lock key can't be null...");
}
final String lockKey = redisDistributedLockKeyGenerator.getLockKey(pjp);
RLock lock = chooseLock(redisDistributedLock,lockKey);
//key不存在才能设置成功
Boolean success = null;
Object proceed = null;
try {
success = lock.tryLock(redisDistributedLock.waitTime(), redisDistributedLock.expireTime(), redisDistributedLock.timeUnit());
if (success) {
log.debug("tryLock success key [{}]", lockKey);
proceed = pjp.proceed();
} else {
log.error("key is : {" + lockKey + "} tryLock fail ");
throw new RedisDistributedLockException(lockKey);
}
} catch (InterruptedException e) {
log.error("key is : {" + lockKey + "} tryLock error ", e);
throw new RedisDistributedLockException(lockKey, e.getMessage());
} finally {
if (success) {
log.debug("unlock [{}]", "key:" + lockKey);
lock.unlock();
}
}
return proceed;
}
private RLock chooseLock(RedisDistributedLock redisDistributedLock, String lockName) {
LockCategory category = redisDistributedLock.category();
switch (category) {
case FAIR:
return redissonClient.getFairLock(lockName);
}
return redissonClient.getLock(lockName);
}
}
复制代码
评论