摘要:分布式知识是考验一个程序员知识面广度和深度很好的度量标准,而分布式锁又是其中非常重要的一个知识点。
本文分享自华为云社区《分布式锁实现——超级详细、高级程序员必知必会》,作者: 李子捌 。
一、简介
分布式知识是考验一个程序员知识面广度和深度很好的度量标准,而分布式锁又是其中非常重要的一个知识点。
我们知道分布式应用在进行逻辑处理的时候经常涉及到并发的问题,比如对一个转账修改一个用户的账户金额数据,此时可能会涉及多个用户同时对这个用户进行转账,它们都需要将数据读取到各内存中进行修改,然后再存刷新回去,这个时候就会出现并发问题:
这里假设初始 account.money= 400
1. 客户端 A,向账户转账 100,并未提交到 Redis
2. 客户端 B,读取金额为 400,并向账户转账 200
3. 客户端 A 提交
4. 客户端 B 提交
最后 account.money= 600
执行流程如下所示:
出现上述问题的主要原因是,“读取”和“写入”这两个操作并不是一个原子操作(所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程))。
关于上述问题的解决,分布式锁就可以派上用场了,我们通过分布式锁来限制程序的并发执行,就像 Java 中的 synchronized,常见的分布式锁解决方案如下所示:
1. 基于数据库锁机制实现的分布式锁
2. 基于 Zookeeper 实现的分布式锁
3. 基于 Redis 实现的分布式锁
本文和大家共同探讨的是基于 Redis 的分布式锁。
二、分布式锁的演进
2.1 精细胞与卵细胞的爱情故事
嗯……学过生物的宝宝们都知道,精细胞和卵细胞相遇的故事,上亿精细胞中最后也只会有一个幸运的精细胞和卵细胞发生甜蜜的爱情故事,这是因为当有一个精子进入卵子后,卵子会发生皮质反应、透明带反应使透明带对精子的结合能力下降,阻止了多精受精。分布式锁就好像卵细胞,而万千访问客户端就好比精细胞,无论客户访问并发多激烈,我们也应该保证分布式锁只被一个线程获取到。
2.2 Redis 中的分布式锁
2.2.1 setnx
我们将 Redis 实现分布式锁理解为上厕所蹲坑排队,只有一个坑,但是有多个人要到坑里面去,所以就只能一个一个的来了。
很多人一开始想到的是 Redis 中一般使用 setnx(set if no exists)指令来实现,setnx 是如果不存在,则 SET 的简写,这个指令描述如下:
127.0.0.1:6379> exists lock # lock key不存在
(integer) 0
127.0.0.1:6379> setnx lock true # 设值成功
(integer) 1
127.0.0.1:6379> setnx lock false # 覆盖失败
(integer) 0
127.0.0.1:6379> del lock # 删除lock 释放
(integer) 1
复制代码
如上这种方案存在的问题非常明显,如果逻辑执行过程中间出现了异常,可能导致 del key 指令没有执行,这样会产生死锁。如下图所示:
2.2.2 setnx + expire
在第一种解决方案的基础上,可能部分人会相到,既然主动删除 key 可能会出现异常情况,那么就设值 key 的过期时间到期自动删除。
127.0.0.1:6379> setnx lock true
(integer) 1
127.0.0.1:6379> expire lock 10 # 设值过期时间10s
(integer) 1
127.0.0.1:6379> setnx lock true # 10s内再次设值失败
(integer) 0
127.0.0.1:6379> setnx lock true # 10skey过期,后设置成功
(integer) 1
复制代码
这种的方案和前面的方案其实并没有本质上的区别,它还是可能会出现服务器异常等情况,导致 expire 的不到执行的情况,换汤不换药,如下图所示:
2.2.3 原子操作
基于上面两种方案,我们可以发现,产生问题的本质在于两个操作并不是原子操作。方案一中是 setnx 指令加一个 del 指令,方案二中是 setnx 指令加一个 expire 指令,这两个指令并不是原子指令。基于这个问题,Redis 官方将这两个指令组合在了一起,解决 Redis 分布式锁原子性操作的问题。
先认真看 set 指令可选参数 EX 和 NX
setkey value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds :将键的过期时间设置为 seconds 秒。执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value 。
PXmilliseconds :将键的过期时间设置为 milliseconds 毫秒。执行 SET key value PX milliseconds 的效果等同于执行 PSETEX key milliseconds value 。
NX :只在键不存在时,才对键进行设置操作。执行 SET key value NX 的效果等同于执行 SETNX key value 。
XX:只在键已经存在时,才对键进行设置操作。
127.0.0.1:6379> set lock true EX 10 NX # 设置 10s生效
OK
127.0.0.1:6379> set lock true EX 10 NX # 10s内再次设值失败
(nil)
127.0.0.1:6379> set lock true EX 10 NX # 10s后设置成功
OK
复制代码
如上这个操作就成功的解决了 Redis 分布式锁的原子操作问题。
2.2.4 解锁
Redis 分布式锁加锁在上面讲述了,而 Redis 分布式锁的解锁过程其实就是将 key 删除,key 的删除有客户端调用 del 指令删除,也有设置 key 的过期时间自动删除。但是这个删除不能乱删除,不能说客户端 A 请求的锁被客户端 B 给删除了……,那这把锁就是一把烂锁了。
为了防止客户端 A 请求的锁被客户端 B 给删除了这种情况,我们通过匹配客户端传入的锁的值与当前锁的值是否相等来做判断(这个值是随机且保证不会重复的),如果相等就删除,解锁成功。
但是 Redis 并未提供这样的功能,我们只能通过 Lua 脚本来处理,因为 Lua 脚本可以保证多个指令的原子性执行。
示例:
首先设置一个 key,这个 key 的值是 123456789,通过客户端传入的 value 值是否相等来校验是否允许删除这个 key
127.0.0.1:6379> get lock
(nil)
127.0.0.1:6379> set lock 123456789 # 设置一个key 值为123456789
OK
127.0.0.1:6379> get lock
"123456789"
复制代码
在客户机上编写 lua 脚本,lock.lua 文件,文件内容如下
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
复制代码
测试通过错误的 value 值去执行 lua 脚本,这个时候删除 key 失败,返回 0
通过正确的 value 值执行则返回 1,说明 key 被删除了。
2.2.5 代码实现
一下演示一个 springboot 项目来实现 Redis 分布式锁,为了方便大家使用,我贴出的代码比较全面,篇幅稍多。
pom 依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.3.4</version>
</dependency>
</dependencies>
复制代码
Redis 配置文件
package com.lizba.config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
* <p>
* Redis简单配置文件
* </p>
*
* @Author: Liziba
* @Date: 2021/7/11 11:17
*/
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
protected static final Logger logger = LoggerFactory.getLogger(RedisConfig.class);
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
@Value("${spring.redis.jedis.pool.max-active}")
private int maxTotal;
@Value("${spring.redis.jedis.pool.max-idle}")
private int maxIdle;
@Value("${spring.redis.jedis.pool.min-idle}")
private int minIdle;
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.timeout}")
private int timeout;
@Bean
public JedisPool redisPoolFactory() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(maxTotal);
jedisPoolConfig.setMaxIdle(maxIdle);
jedisPoolConfig.setMinIdle(minIdle);
JedisPool jedisPool = new JedisPool(jedisPoolConfig, host, port, timeout, null);
logger.info("JedisPool注入成功!!");
logger.info("redis地址:" + host + ":" + port);
return jedisPool;
}
}
复制代码
application.yml 配置文件
server:
port: 18080
spring:
redis:
database: 0
host: 127.0.0.1
port: 6379
timeout: 10000
password:
jedis:
pool:
max-active: 20
max-idle: 20
min-idle: 0
复制代码
获取锁与释放锁代码
package com.lizba.utill;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.params.SetParams;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* <p>
* Redis分布式锁简单工具类
* </p>
*
* @Author: Liziba
* @Date: 2021/7/11 11:42
*/
@Service
public class RedisLockUtil {
private static Logger logger = LoggerFactory.getLogger(RedisLockUtil.class);
/**
* 锁键 -> key
*/
private final String LOCK_KEY = "lock_key";
/**
* 锁过期时间 -> TTL
*/
private Long millisecondsToExpire = 10000L;
/**
* 获取锁超时时间 -> get lock timeout for return
*/
private Long timeout = 300L;
/**
* LUA脚本 -> 分布式锁解锁原子操作脚本
*/
private static final String LUA_SCRIPT =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
/**
* set命令参数
*/
private SetParams params = SetParams.setParams().nx().px(millisecondsToExpire);
@Autowired
private JedisPool jedisPool;
/**
* 加锁 -> 超时锁
*
* @param lockId 一个随机的不重复id -> 区分不同客户端
* @return
*/
public boolean timeLock(String lockId) {
Jedis client = jedisPool.getResource();
long start = System.currentTimeMillis();
try {
for(;;) {
String lock = client.set(LOCK_KEY, lockId, params);
if ("OK".equalsIgnoreCase(lock)) {
return Boolean.TRUE;
}
// sleep -> 获取失败暂时让出CPU资源
TimeUnit.MILLISECONDS.sleep(100);
long time = System.currentTimeMillis() - start;
if (time >= timeout) {
return Boolean.FALSE;
}
}
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
} finally {
client.close();
}
return Boolean.FALSE;
}
/**
* 解锁
*
* @param lockId 一个随机的不重复id -> 区分不同客户端
* @return
*/
public boolean unlock(String lockId) {
Jedis client = jedisPool.getResource();
try {
Object result = client.eval(LUA_SCRIPT, Arrays.asList(LOCK_KEY), Arrays.asList(lockId));
if (result != null && "1".equalsIgnoreCase(result.toString())) {
return Boolean.TRUE;
}
return Boolean.FALSE;
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
return Boolean.FALSE;
}
}
复制代码
测试类
package com.lizba.controller;
import cn.hutool.core.util.IdUtil;
import com.lizba.utill.RedisLockUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>
* 测试
* </p>
*
* @Author: Liziba
* @Date: 2021/7/11 12:27
*/
@RestController
@RequestMapping("/redis")
public class TestController {
@Autowired
private RedisLockUtil redisLockUtil;
private AtomicInteger count ;
@GetMapping("/index/{num}")
public String index(@PathVariable int num) throws InterruptedException {
count = new AtomicInteger(0);
CountDownLatch countDownLatch = new CountDownLatch(num);
ExecutorService executorService = Executors.newFixedThreadPool(num);
Set<String> failSet = new HashSet<>();
long start = System.currentTimeMillis();
for (int i = 0; i < num; i++) {
executorService.execute(() -> {
long lockId = IdUtil.getSnowflake(1, 1).nextId();
try {
boolean isSuccess = redisLockUtil.timeLock(String.valueOf(lockId));
if (isSuccess) {
count.addAndGet(1);
System.out.println(Thread.currentThread().getName() + " lock success" );
} else {
failSet.add(Thread.currentThread().getName());
}
} finally {
boolean unlock = redisLockUtil.unlock(String.valueOf(lockId));
if (unlock) {
System.out.println(Thread.currentThread().getName() + " unlock success" );
}
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdownNow();
failSet.forEach(t -> System.out.println(t + " lock fail" ));
long time = System.currentTimeMillis() - start;
return String.format("Thread sum: %d, Time sum: %d, Success sum:%d", num, time, count.get());
}
}
复制代码
测试结果
2.3 Redis 的超时问题
Redis 分布式锁有一个问题是锁的超时问题,也就是说如果客户端 A 获取到锁之后去执行任务,任务没跑完锁的超时时间到了,锁就会自动释放,这个时候客户端 B 就能乘虚而入了,锁就会出现问题!
关于这个问题其实并没有完全的解决办法,但是能通过如下手段去优化:
1. 尽可能不要在 Redis 分布式锁中执行较长的任务,尽可能的缩小锁区间内执行代码,就像单 JVM 锁中的 synchronized 优化一样,我们可以考虑优化锁的区间
2. 多做压力测试和线上真实场景的模拟测试,估算一个合适的锁超时时间
3. 做好 Redis 分布式锁超时任务未执行完的问题发生后,数据恢复手段的准备
三、集群中的分布式锁
3.1 集群分布式锁存在的问题
上述的分布式锁,针对单节点实例的 Redis 是可行的;但是我们在公司根本不会用单节点的 Redis 实例,往往采用最简单的都是是 Redis 一主二从+Sentinel 监控配置;在 sentinel 集群中,虽然主节点挂掉时,从节点会取而代之,客户端无感知,但是上述的分布式锁就可能存在节点之间数据同步异常导致分布式锁失效的问题。
正常情况下客户端向 sentinel 监控的 Redis 集群申请分布式锁:
比如,客户端 A 在主节点(机器 1)上申请了一把锁,此时主节点(机器 1)挂掉了且锁没来得及同步到从节点(机器 2 和机器 3),此时从节点(机器 2)成为了新的主节点,但是锁在新的主节点(机器 2)上并不存在,所以客户端 B 申请锁成功,锁的定义在这种场景中就出现了问题!
主节点宕机锁同步失败情况,其他客户端申请锁成功:
上述这种情况虽然之后发生在主从发生 failover 的情况才产生,但显然是不安全的,普通的业务系统或许能接受,但大金额的业务场景是不允许出现的。
3.2 RedLock
3.2.1 简介
解决这个问题的办法就是使用 RedLock 算法,也称红锁。RedLock 通过使用多个 Redis 实例,各个实例之间没有主从关系,相互独立;加锁的时候,客户端向所有的节点发送加锁指令,如果过半的节点 set 成功,就加锁成功。释放锁时,需要向所有的节点发送 del 指令来释放锁。RedLock 的实现思路比较简单,但是实际算法比较复杂,需要考虑非常多的细节问题,如出错重试,时钟漂移等。此外 RedLock 需要新增较多的实例来申请分布式锁,不仅消耗服务器资源,也会有一定的性能下降。
其架构图如下,客户端向多个独立的 Redis 服务发送加锁指令(为了追求高吞吐量和低延时,客户端需要使用多路传输来对 N 个 RedisServer 服务器进行通信),过半反馈成功则加锁成功,Redis Server 的个数最后为奇数。
Redis 中文版网站介绍
http://redis.cn/topics/distlock.html
在上述的架构图中,存在 5 台 Redis 服务器用于获取锁,那么此时一个客户端获取锁需要做哪些操作呢?
1. 获取系统当前时间(ms)
2. 使用相同的 key 和随机值在 5 个节点上请求锁,请求锁的过程中包含多个时间值的定义,包括请求单个锁超时时间,请求锁的总耗时时间,锁自动释放时间。单个锁请求的超时时间不宜过大,防止请求宕机的 Redis 服务阻塞时间过长。
3. 客户端计算获取锁的总时长和获取锁成功的个数,当所得个数大于等于 3 且获取锁的时间小于锁的自动释放时间才算成功
4. 锁获取成功,则锁自动释放时间等于 TTL 减去获取所得消耗的时间(这个锁消耗的时间计算比较复杂)
5. 锁获取失败,向所有的 Redis 服务器发送删除指令,一定是所有的 Redis 服务器都要发送
6. 失败重试,锁获取失败后进行重试,重试的时间应该是一个随机值,避免与其他客户端同时请求锁而加大失败的可能,且这个时间应该大于获取锁消耗的时间
3.2.2 锁的最小有效时长
由于上面说到存在时钟漂移的问题,并且客户端向不同的 Redis 服务器请求锁的时间也会有细微的差异,所以有必要认真的研究一下客户端获取到的锁的最小有效时长计算:
假设客户端申请锁成功,第一个 key 设置成功的时间为 TF,最后一个 key 设置成功的时间为 TL,锁的超时时间为 TTL,不同进程之间的时钟差异为 CLOCK_DIFF,则锁的最小有效时长是:
TIME= TTL - (TF- TL) - CLOCK_DIFF
3.2.3 故障恢复
采用 Redis 来实现分布式锁,离不开服务器宕机等不可用问题,这里 RedLock 红锁也一样,即使是多台服务器申请锁,我们也要考虑服务器宕机后的处理,官方建议采用 AOF 持久化处理。
但是 AOF 持久化只对正常 SHUTDOWN 这种指令能做到重启恢复,但是如果是断电的情况,可能导致最后一次持久化到断电期间的锁数据丢失,当服务器重启后,可能会出现分布式锁语义错误的情况。所以为了规避这种情况,官方建议 Redis 服务重启后,一个最大客户端 TTL 时间内该 Redis 服务不可用(不提供申请锁的服务),这确实可以解决问题,但是显而易见这肯定影响 Redis 服务器的性能,并且在多数节点都出现这种情况的时候,系统将出现全局不可用的状态。
3.3 Redisson 实现分布式锁
3.3.1 Redission 简介
Redisson 是架设在 Redis 基础上的一个 Java 驻内存数据网格(In-MemoryData Grid)。【Redis 官方推荐】
Redisson 在基于 NIO 的 Netty 框架上,充分的利用了 Redis 键值数据库提供的一系列优势,在 Java 实用工具包中常用接口的基础上,为使用者提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间的协作。
Redissiongithub 地址
https://github.com/redisson/redisson
Redission 分布式锁和同步器 Wiki
https://github.com/redisson/redisson/wiki/8.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%E5%92%8C%E5%90%8C%E6%AD%A5%E5%99%A8
总而言之——Redisson 非常强大
3.3.2 Reddison 之 RedLock 使用
pom 依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.3.2</version>
</dependency>
复制代码
测试类
package com.liziba.util;
import org.redisson.Redisson;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
/**
* <p>
* 测试Redisson 之 RedLock
* </p>
*
* @Author: Liziba
* @Date: 2021/7/11 20:55
*/
public class LockTest {
private static final String resourceName = "REDLOCK_KEY";
private static RedissonClient cli_79;
private static RedissonClient cli_89;
private static RedissonClient cli_99;
static {
Config config_79 = new Config();
config_79.useSingleServer()
.setAddress("127.0.0.1:6379") // 注意这里我的Redis测试实例没密码
.setDatabase(0);
cli_79 = Redisson.create(config_79);
Config config_89 = new Config();
config_89.useSingleServer()
.setAddress("127.0.0.1:6389")
.setDatabase(0);
cli_89 = Redisson.create(config_89);
Config config_99 = new Config();
config_99.useSingleServer()
.setAddress("127.0.0.1:6399")
.setDatabase(0);
cli_99 = Redisson.create(config_99);
}
/**
* 加锁操作
*/
private static void lock () {
// 向3个Redis实例尝试加锁
RLock lock_79 = cli_79.getLock(resourceName);
RLock lock_89 = cli_89.getLock(resourceName);
RLock lock_99 = cli_99.getLock(resourceName);
RedissonRedLock redLock = new RedissonRedLock(lock_79, lock_89, lock_99);
try {
boolean isLock = redLock.tryLock(100, 10000, TimeUnit.MILLISECONDS);
if (isLock) {
// do something ...
System.out.println(Thread.currentThread().getName() + "Get Lock Success!");
TimeUnit.MILLISECONDS.sleep(10000);
} else {
System.out.println(Thread.currentThread().getName() + "Get Lock fail!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 无论如何一定要释放锁 -> 这里会像所有的Redis服务释放锁
redLock.unlock();
}
}
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(() -> lock()).start();
}
}
}
复制代码
点击关注,第一时间了解华为云新鲜技术~
评论