写点什么

R2M 分布式锁原理及实践

  • 2023-02-07
    北京
  • 本文字数:4434 字

    阅读完需:约 15 分钟

R2M分布式锁原理及实践

作者:京东科技 张石磊


1 案例引入


•2 锁说明和分布式锁选择


•3 r2m 分布式锁原理


•1 加锁核心流程:


•2 释放锁核心逻辑:


•3 锁的健壮性思考


R2M 分布式锁原理及实践

1 案例引入

名词简介:


资源:可以理解为一条内容,或者图+文字+链接的载体。


档位 ID: 资源的分类组,资源必须归属于档位。


问题描述:当同一个档位下 2 条资源同时审批通过时,收到擎天审批系统 2 条消息,消费者应用部署了 2 台机器,此时正好由 2 台机器分别消费,在并发消费时,先更新资源状态,然后写缓存,每次取前 100 条资源,类似 select * from resource where gear_id=xxx limit 100 order by id desc;


在写档位缓存,此时事务未提交,并发查询时根据档位 Id 查询时查询不到对方的数据,全量写缓存时导致后写的缓存覆盖了先写的缓存,即缓存被覆盖,导致投放资源缺失。


方案思考 :


方案 1:一台机器消费 mq–单点问题


方案 2:将同档位 ID 的资源路由到同一个 queue,需要审批系统配合根据档位 Id 做路由,审批系统发的消息不只是 cms 审批数据,此方案不适用。


方案 3:在档位级别加分布式锁。


经比较,最终采用方案 3 是合适的方案.

2 锁说明和分布式锁选择

synchronized 锁的粒度是 JVM 进程维度,集群模式下,不能对共享资源加锁,此时需要跨 JVM 进程的分布式锁。


分布式锁方式核心实现方式优点缺点分析


1 数据库:


悲观锁,lock


乐观锁,通过版本号实现 version


实现简单,不依赖中间件


数据库 IO 瓶颈,性能差


单实例存在单点问题,主从架构存在数据不一致,主从切换时其他客户端可重复加锁。


2 zookeeper


创建临时节点


CP 模型,可靠性高,不存在主从切换不一致问题


频繁创建和销毁临时节点,且


集群模式下,leader 数据需要同步到 follower 才算加锁成功,性能不如 redis


主从切换服务不可用


3 redis 集群


setnx+expire


性能高


有封装好的框架 redission


支持超时自动删除锁


集群支持高可用,AP 模型


主从切换时其他客户端可重复加锁。


R2M 是基于开源的 Redis cluster(Redis 3.0 以上版本)构建的高性能分布式缓存系统,我们系统一直在使用,3.2.5 版本开始支持分布式锁。

3 r2m 分布式锁原理

示例代码:


String lockKey = CacheKeyHelper.getGearLockKey(EnvEnum.getEnvFlagEnum(envCode),resource.getGearId());


R2mLock lock = (R2mLock) r2mClusterClient.getLock(lockKey);


//获取锁,锁的默认有效期30s,获取不到锁就阻塞


lock.lock();


try {


//业务代码


resourceService.afterApprovedHandle(resource);


} finally {


//释放锁


lock.unlock();


}

1 加锁核心流程:

加锁流程图:



1):尝试获取锁,通过执行加锁 Lua 脚本来做;


2):若第一步未获取到锁,则去 redis 订阅解锁消息


3):一旦持有锁的线程释放了锁,就会广播解锁消息,其他线程自旋重新尝试获取锁。


核心加锁原理:使用 lua 脚本封装了 hset 和 pexpire 命令,保证是一个原子操作, KEYS[1]是加锁的 key,argv[2]是加锁的客户端 ID(UUID+线程 ID),ARGV[1]是锁的有效期,默认 30s.


private Object acquireInternal(List<String> args) {


if (!this.setLocked() && this.getHolderId() != Thread.currentThread().getId()) {


return -1L;


} else {


try {


//hash结构,hash的key是加锁的key,键值对的key为客户端的UUID+线程id,value为锁的重入计数器值。


return this.lockSha() != null ? this.executor.evalR2mLockSha(this.lockSha(),


"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;


if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;


return -2;", Collections.singletonList(this.lockName), args) : this.executor. == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return -2;", Collections.singletonList(this.lockName), args);


} catch (Exception var3) {


this.setUnlocked();


throw new R2mLockException("Failed to acquire lock " + this.lockName + ".", var3);


}


}


}


args 参数


private List<String> acquireArgs(long leaseTime) {


List<String> args = new ArrayList();


if (leaseTime != -1L) {


args.add(String.valueOf(leaseTime));


} else {


//默认30s


args.add(String.valueOf(this.internalLockLeaseTime()));


}


//UUID+当前线程id


args.add(this.currentThreadLockId(Thread.currentThread().getId()));


return args;


}


获取锁失败订阅锁的 channel


//获取锁失败,订阅释放锁的消息


private boolean failedAcquire() {


this.subLock();


return false;


}


private void subLock() {


CompletableFuture<Void> cf = R2mLock.this.executor.lockSubscribe(R2mLock.this.lockPubSub(), R2mLock.this.getLockChannelName(), R2mLock.this);


if (cf != null) {


cf.handleAsync(this::reSubIfEx);


}


}


锁释放后,订阅者通过自旋尝试获取锁。


//tryAcquire获取锁,!tryAcquire就是获取锁失败,锁释放后,通知线程唤醒后返回false,然后通过自旋,尝试获取锁,


public final void acquire(long arg) {


if (!tryAcquire(arg) &&


acquireQueued(addWaiter(Node.EXCLUSIVE), arg))


selfInterrupt();


}


final boolean acquireQueued(final Node node, long arg) {


boolean failed = true;


try {


boolean interrupted = false;


//内部自旋获取锁


for (;;) {


final Node p = node.predecessor();


if (p == head && tryAcquire(arg)) {


setHead(node);


p.next = null; // help GC


failed = false;


return interrupted;


}


if (shouldParkAfterFailedAcquire(p, node) &&


parkAndCheckInterrupt())


interrupted = true;


}


} finally {


if (failed)


cancelAcquire(node);


}


}

2 释放锁核心逻辑:

1)删除分布式锁 key(如果可重入锁计数为 0)


  1. 发释放锁的广播消息


3)取消 watchdog


private Object unlockInternal(List<String> args) {


logger.debug("{} trying to unlock.", Thread.currentThread().getId());


Object var2;


try {


//判断锁 key 是否存在,如果存在,然后递减hash的value值,当value值为0时再删除锁key,并且广播释放锁的消息


if (this.unlockSha() == null) {


var2 = this.executor. == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);


return var2;


}


var2 = this.executor.evalR2mLockSha(this.unlockSha(), "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.lockName, this.getLockChannelName()), args);


} catch (Exception var6) {


throw new R2mLockException("Failed to unlock " + this.lockName + ".", var6);


} finally {


this.finalizeRelease();


}


return var2;


}


//取消当前线程的watchdog


private void finalizeRelease() {


long threadId = Thread.currentThread().getId();


R2mLock.ExpirableEntry entry = (R2mLock.ExpirableEntry)this.entryCache.get(threadId);


if (entry != null) {


entry.release(threadId);


if (entry.isReleased()) {


//取消这个线程watchdog定时任务


entry.getExtTask().cancel();


this.expEntry.compareAndSet(entry, (Object)null);


//从缓存watchdog线程的map中删除该线程


this.entryCache.remove(threadId);


}


}


}

3 锁的健壮性思考

1 业务没执行完,锁超时过期怎么办?


客户端加锁默认有效期 30s,超过有效期后如果业务没执行完,还需要持有这把锁,r2m 客户端提供了续期机制,也就是 watchdog 机制。


watchdog 原理:客户端线程维度(UUID+线程 ID,客户端维护一个 MAP,key 就是 UUID+线程 ID)的后台定时线程,获取锁成功后,如果客户端还持有当前锁,每隔 10s(this.internalLockLeaseTime() / 3L),去延长锁的有效期(internalLockLeaseTime)


//watchdog核心机制 ,internalLockLeaseTime默认30s


private void extendLock(long holderId) {


if (this.expEntry.get() != null) {


R2mLock.ExpirableEntry holderEntry = (R2mLock.ExpirableEntry)this.entryCache.get(holderId);


if (holderEntry != null) {


//每隔10s,如果当前线程持有锁,则续期30s


if (this.expEntry.compareAndSet(holderEntry, holderEntry)) {


Timeout task = this.timer().newTimeout((timeout) -> {


if (this.extendLockInternal(holderId)) {


this.extendLock(holderId);


}


}, this.internalLockLeaseTime() / 3L, TimeUnit.MILLISECONDS);


if (this.expEntry.get() != null) {


((R2mLock.ExpirableEntry)this.expEntry.get()).setExtTask(task);


}


}


}


}


}


//执行续期lua脚本


private boolean extendLockInternal(long threadId) {


Object result;


try {


//只续期


if (this.extendLockSha() != null) {


result = this.executor.evalR2mLockSha(this.extendLockSha(), "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));


} else {


result = this.executor. == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;", Collections.singletonList(this.lockName), this.extendLockArgs(threadId));


}


} catch (Exception var5) {


return false;


}


return Long.parseLong(result.toString()) == 1L;


}


2 客户端宕机,锁如何释放?


分布式锁是有效期的,客户端宕机后,watchdog 机制失效,锁过期自动失效。


3 redis 分布式锁集群模式下缺陷


r2m 集群模式,极端情况,master 加锁成功,宕机,还未来得及同步到 slave,主从切换,slave 切换成 master,可以继续加锁,对于非及其严格加锁场景,该方案可满足,属于 AP;对于严格场景下的分布式锁,可采用基于 zookeeper 的分布式锁,属于 CP,leader 宕机,folllower 选举时不可用。性能上 redis 更优。


4 锁的释放问题


注意锁的释放在 finally 中释放,必须由锁的持有者释放,不能由其他线程释放别人的锁,示例代码中 lock 放到 try 的外面。


5 为什么写这个分享?


这是 r2m 接入文档【客户端使用帮助】R2M Client分布式锁,关于 r2m 分布式锁只有这一个文档,各种原理和机制都没有说明,了解其背后的原理才能更好地使用它,希望对使用 r2m 分布式锁的同学有所帮助。

发布于: 刚刚阅读数: 6
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
R2M分布式锁原理及实践_redis_京东科技开发者_InfoQ写作社区