写点什么

redis 分布式锁实现

用户头像
Sakura
关注
发布于: 2021 年 03 月 13 日

随着业务越来越复杂,应用服务都会朝着分布式、集群方向部署,而分布式CAP原则告诉我们,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分区容错性),三者不可得兼。


很多场景中,需要使用分布式事务、分布式锁等技术来保证数据最终一致性。有的时候,我们需要保证某一方法同一时刻只能被一个线程执行。

在单机(单进程)环境中,JAVA 提供了很多并发相关 API,但在多机(多进程)环境中就无能为力了。


对于分布式锁,最好能够满足以下几点


可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行

这把锁要是一把可重入锁(避免死锁)

这把锁最好是一把阻塞锁

有高可用的获取锁和释放锁功能

获取锁和释放锁的性能要好


实现思路


锁的实现主要基于 redis 的SETNX命令(SETNX 详细解释参考这里),我们来看SETNX的解释


SETNX key value

将 key 的值设为 value ,当且仅当 key 不存在。

若给定的 key 已经存在,则 SETNX 不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

返回值:

设置成功,返回 1 。

设置失败,返回 0 。



使用SETNX完成同步锁的流程及事项如下:


  1. 使用SETNX命令获取锁,若返回 0(key 已存在,锁已存在)则获取失败,反之获取成功

  2. 为了防止获取锁后程序出现异常,导致其他线程/进程调用SETNX命令总是返回 0 而进入死锁状态,需要为该 key 设置一个“合理”的过期时间

  3. 释放锁,使用DEL命令将锁数据删除


实现过程


创建同步锁实现类


/** * 同步锁 * * @property key Redis key * @property stringRedisTemplate RedisTemplate * @property expire Redis TTL/秒 * @property safetyTime 安全时间/秒 */class SyncLock(        private val key: String,        private val stringRedisTemplate: StringRedisTemplate,        private val expire: Long,        private val safetyTime: Long)
复制代码


key reids 中的 key,对应 java api synchronized 的对象

expire reids 中 key 的过期时间

safetyTime 下文介绍其作用


实现锁的获取功能


private val value: String get() = Thread.currentThread().name
/** * 尝试获取锁(立即返回) * * @return 是否获取成功 */fun tryLock(): Boolean { val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false if (locked) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) } return locked}
复制代码


这里使用setIfAbsent函数(对应SETNX命令)尝试设置 key 的值为 value(当前线程 id+线程名),若成功则同时设置 key 的过期时间并返回 true,否则返回 false


实现带超时时间的锁获取功能


private val waitMillisPer: Long = 10
/** * 尝试获取锁,并至多等待timeout时长 * * @param timeout 超时时长 * @param unit 时间单位 * * @return 是否获取成功 */fun tryLock(timeout: Long, unit: TimeUnit): Boolean { val waitMax = unit.toMillis(timeout) var waitAlready: Long = 0
while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) { Thread.sleep(waitMillisPer) waitAlready += waitMillisPer }
if (waitAlready < waitMax) { stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) return true } return false}
复制代码


这里使用 while 循环不断尝试锁的获取,并至多尝试 timeout 时长,在 timeout 时间内若成功则同时设置 key 的过期时间并返回 true,否则返回 false


其实以上两种tryLock函数还是有一种可能便是,在调用setIfAbsent后、调用expire之前若服务出现异常,也将导致该锁(key)无法释放(过期或删除),使得其他线程/进程再无法获取锁而进入死循环,为了避免此问题的产生,我们引入了safetyTime

该参数的作用为,从获取锁开始直到 safetyTime 时长,若仍未获取成功则认为某一线程/进程出现异常导致数据不正确,此时强制获取,其实现如下


实现带保护功能的锁获取功能


/** * 获取锁 */fun lock() {    val waitMax = TimeUnit.SECONDS.toMillis(safetyTime)    var waitAlready: Long = 0
while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) { Thread.sleep(waitMillisPer) waitAlready += waitMillisPer }
// stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS) stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS)}
复制代码


这里同样使用 while 循环不断尝试锁的获取,但至多等待 safetyTime 时长,最终不论是否成功,均使用SETEX 命令将 key 设置为当前先线程对应的 value,并同时设置该 key 的过期时间


实现锁的释放功能


/** * 释放锁 */fun unLock() {    stringRedisTemplate.opsForValue()[key]?.let {        if (it == value) {            stringRedisTemplate.delete(key)        }    }}
复制代码


锁的释放使用DEL命令删除 key,但需要注意的是,释放锁时只能释放本线程持有的锁

若 expire 设置不合理,如 expire 设置为 10 秒,结果在获取锁后线程运行了 20 秒,该锁有可能已经被其他线程强制获取,即该 key 代表的锁已经不是当前线程所持有的锁,此时便不能冒然删除该 key,而只能释放本线程持有的锁。




集成 spring boot


为了更好的与 spring 集成,我们创建一个工厂类来辅助创建同步锁实例


/** * SyncLock同步锁工厂类 */@Componentclass SyncLockFactory {    @Autowired    private lateinit var stringRedisTemplate: StringRedisTemplate
private val syncLockMap = mutableMapOf<String, SyncLock>()
/** * 创建SyncLock * * @param key Redis key * @param expire Redis TTL/秒,默认10秒 * @param safetyTime 安全时间/秒,为了防止程序异常导致死锁,在此时间后强制拿锁,默认 expire * 5 秒 */ @Synchronized fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock { if (!syncLockMap.containsKey(key)) { syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime) } return syncLockMap[key]!! }}
复制代码


在 spring 框架下可以更方便的使用


@Componentclass SomeLogic: InitializingBean {  @Autowired  lateinit var syncLockFactory: SyncLockFactory    lateinit var syncLock
override fun afterPropertiesSet() { syncLock = syncLockFactory.build("lock:some:name", 10) }
fun someFun() { syncLock.lock() try { // some logic } finally { syncLock.unlock() } }}
复制代码




注解的实现


借助 spring aop 框架,我们可以将 SyncLock 的使用进一步简化


创建注解类


/** * 同步锁注解 * * @property key Redis key * @property expire Redis TTL/秒,默认10秒 */@Target(AnnotationTarget.FUNCTION)@Retention(AnnotationRetention.RUNTIME)annotation class SyncLockable(        val key: String,        val expire: Long = 10)
复制代码


实现 AOP


/** * 同步锁注解处理 */@Aspect@Componentclass SyncLockHandle {    @Autowired    private lateinit var syncLockFactory: SyncLockFactory
/** * 在方法上执行同步锁 */ @Around("@annotation(syncLockable)") fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? { val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire)
try { lock.lock() return jp.proceed() } finally { lock.unLock() } }}
复制代码


如此一来,我们便可以按照如下方式使用 SyncLock


@Componentclass SomeLogic {    @SyncLockable("lock:some:name", 10)    fun someFun() {        // some logic    }}
复制代码


用户头像

Sakura

关注

还未添加个人签名 2020.09.22 加入

还未添加个人简介

评论

发布
暂无评论
redis分布式锁实现