1. 背景
在传统的单体项目中,即部署到单个 IIS 上,针对并发问题,比如进销存中的出库和入库问题,多个人同时操作,属于一个 IIS 进程中多个线程并发操作的问题,这个时候可以引入线程锁 lock/Monitor 等,轻松解决这类问题。但是随着业务量的逐渐增大,比如"秒杀业务", 肯定是集群部署,这个时候线程锁已经没用了, 必须引入分布式锁。
常见的分布式锁有:数据库、zookeeper、redis. 本节重点介绍 redis 的分布式锁.
如下图:
2. 分布式锁需要满足的条件
(1).在分布式系统环境下,一个锁在同一时间只能被一个服务器获取;(这是所有分布式锁的基础)
(2).高性能的获取锁和释放锁;(锁用完了,要及时释放,以供别人继续使用)
(3).具备锁失效机制,防止死锁;(防止因为某些意外,锁没有得到释放,那别人将永远无法使用)
(4).具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。(满足等待锁的同时,也要满足非阻塞锁特性,便于多样性的业务场景使用)
3. 分布式锁种类/原理
(1).阻塞锁
尝试在 redis 中创建一个字符串结构缓存,方法传入 key 和过期时间(AcquireLock), 其中 key 对应的 value 为锁的过期时间 timeout 的时间戳。
若 redis 中没有这个 key,则创建成功(即抢到锁),然后立即返回;若已经有这个 key,则先 watch,然后校验 value 中的时间戳是否已经超过当前时间。
若已超过,则尝试使用提交事务的方式覆盖新的时间戳,事务提交成功(即抢到锁),然后立即返回;若未超过当前时间或事务提交失败(即被别人抢到锁),则进入一个内部优化过的微循环,不断重试。
传入的 timeout 还有一个作用,就是控制重试时间,重试超时后则抛异常,using 完成方法调用或者显式调用 dispose,都会直接清除 key。
总结:
timeout 有两个意思:一是如果成功加锁后锁的过期时间, 二是未成功加锁后阻塞等待的时间。数据锁服务通过检查 value 中时间戳来判断是否过期,并不是利用 redis 在 key 上设置 expire time 来通过 key 的过期实现的。
(2).非阻塞锁
尝试在 redis 中创建一个字符串结构缓存项,方法传入 key、value、timeout(Add),其中 value 无实际意义,过期时间为传入的 timeout。
若 redis 中没有这个 key,则创建成功(即抢到锁),然后立即返回 true.若已经有这个 key,则立即返回 false。以上过程为全局单线程原子操作,整个过程为独占式操作。IsLock 可以检测 key 是否存在。
注意:
timeout 即成功加锁后锁的过期时间,利用 redis 在 key 上设置 expire time 来通过 key 的过期实现。不要先用 IsLock 判断是否有锁再用 Add 加锁,因为这两个操作非原子性操作,期间会被其他操作干扰。
(3).底层实现主要用到以下几个指令
A.setnx
setnx key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0
B.expire
expire key timeout:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁
C.delete
delete key:删除 key
二. 案例模拟实现
1.场景模拟分析
模拟多个用户进行秒杀业务,扣减库存→创建订单。 (PS:这里只是为了演示分布式锁而已,实际场景可以利用 redis 自减 Api 原子性实现扣减库存,从而干掉锁的问题)
总结:真正的秒杀是不会用分布式锁的, 因为用锁会存在等待的问题,会产生大量无响应的情况, 实际情况下可以利用 Lua 脚本结合 redis 原子性的特点,编写秒杀业务。详见:https://www.cnblogs.com/yaopengfei/p/13749772.html
下面分享 3 个不同的程序集实现分布式锁的业务.
2. ServiceStack.Redis
(1).阻塞锁
代码分享
/// <summary> /// 阻塞锁 /// </summary> public class BlockingLock {
public static void Show(int i, string key, TimeSpan timeout) { using var client = new RedisClient("119.45.174.249", 6379, "123456"); using (var myLock = client.AcquireLock(key, timeout)) //获取锁 (此处阻塞,其它线程等待) { var goodNum = client.Get<int>("goodNum"); if (goodNum > 0) { client.Set<int>("goodNum", goodNum - 1); //扣减库存 var orderNum = client.Incr("orderNum"); Console.WriteLine($"{i}抢购成功,此时的库存为{goodNum - 1},订单数量为:{orderNum}");
} else { Console.WriteLine($"商品已经卖光了"); } } } }
复制代码
调用
static void Main(string[] args) {
Console.WriteLine("请输入开始抢购的时间:"); int minute = int.Parse(Console.ReadLine()); using var client = new RedisClient("119.45.174.249", 6379, "123456"); //商品数量(设置为10) client.Set<int>("goodNum", 10); //订单数(默认为0) client.Set<int>("orderNum", 0);
//开启30个线程去抢购 Console.WriteLine($"在{minute}分0秒正式开启秒杀!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { BlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //阻塞锁
//NoBlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //非阻塞锁 }); }); } } Console.ReadKey(); }
复制代码
copy 两套程序同时运行
(2).非阻塞锁
代码分享:
/// <summary> /// 非阻塞锁 /// </summary> public class NoBlockingLock { public static void Show(int i, string key, TimeSpan timeout) { using var client = new RedisClient("119.45.174.249", 6379, "123456"); bool isLocked = client.Add<string>(key, "xxxx", timeout); if (isLocked) { try { var goodNum = client.Get<int>("goodNum"); if (goodNum > 0) { client.Set<int>("goodNum", goodNum - 1); //扣减库存 var orderNum = client.Incr("orderNum"); //订单数量自增1 Console.WriteLine($"{i}抢购成功,此时的库存为{goodNum - 1},订单数量为:{orderNum}"); } else { Console.WriteLine($"{i}商品已经卖光了"); } } catch (Exception ex) { Console.WriteLine($"{i}报错了{ex.Message}"); } finally { client.Remove(key); } } else { Console.WriteLine($"{i}抢购失败:原因:没有拿到锁"); } } }
复制代码
调用
static void Main(string[] args) {
Console.WriteLine("请输入开始抢购的时间:"); int minute = int.Parse(Console.ReadLine()); using var client = new RedisClient("119.45.174.249", 6379, "123456"); //商品数量(设置为10) client.Set<int>("goodNum", 10); //订单数(默认为0) client.Set<int>("orderNum", 0);
//开启30个线程去抢购 Console.WriteLine($"在{minute}分0秒正式开启秒杀!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { //BlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //阻塞锁
NoBlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //非阻塞锁 }); }); } } Console.ReadKey(); }
复制代码
copy 两套程序同时运行
3. StackExchange.Redis
代码分享
public class MyLock1 { public static void Show(int i, string key, TimeSpan timeout) {
RedisHelp redis = new RedisHelp("119.45.174.249:6379,password=123456"); var client = redis.GetDatabase();
bool isLocked = client.LockTake(key, Environment.MachineName, timeout); //timeout秒后自动释放 if (isLocked) { try { var goodNum = int.Parse(client.StringGet("goodNum")); if (goodNum > 0) { client.StringSet("goodNum", goodNum - 1); //扣减库存 var orderNum = client.StringIncrement("orderNum"); //订单数量自增1 Console.WriteLine($"{i}抢购成功,此时的库存为{goodNum - 1},订单数量为:{orderNum}"); } else { Console.WriteLine($"{i}商品已经卖光了"); } } catch (Exception ex) { Console.WriteLine($"{i}报错了{ex.Message}"); } finally { client.LockRelease(key, Environment.MachineName); } } else { Console.WriteLine($"{i}抢购失败:原因:没有拿到锁"); } } }
复制代码
调用:
static void Main(string[] args){
Console.WriteLine("请输入开始抢购的时间:"); int minute = int.Parse(Console.ReadLine()); RedisHelp redis = new RedisHelp("119.45.174.249:6379,password=123456"); var client = redis.GetDatabase(); //商品数量(设置为10) client.StringSet("goodNum", 10); //订单数(默认为0) client.StringSet("orderNum", 0);
//开启30个线程去抢购 Console.WriteLine($"在{minute}分0秒正式开启秒杀!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { MyLock1.Show(i, "akey", TimeSpan.FromSeconds(2)); }); }); } } Console.ReadKey();}
复制代码
4. CSRedisCore
代码分享
public class MyLock1 { public static void Show(int i, string key, int timeout) {
RedisHelper.Initialization(new CSRedis.CSRedisClient("119.45.174.249:6379,password=123456,defaultDatabase=0")); var isLocked = RedisHelper.Lock(key, timeout, true); //timeout秒后自动释放 if (isLocked != null) //获取超时则返回null { try { var goodNum = int.Parse(RedisHelper.Get("goodNum")); if (goodNum > 0) { RedisHelper.Set("goodNum", goodNum - 1); //扣减库存 var orderNum = RedisHelper.IncrBy("orderNum"); //订单数量自增1 Console.WriteLine($"{i}抢购成功,此时的库存为{goodNum - 1},订单数量为:{orderNum}"); } else { Console.WriteLine($"商品已经卖光了"); } } catch (Exception ex) { Console.WriteLine($"报错了{ex.Message}"); } finally { RedisHelper.Del(key); //上面可以自动删除,还需要手动删除吗? } } else { Console.WriteLine($"{i}抢购失败:原因:没有拿到锁"); } } }
复制代码
调用
static void Main(string[] args) {
Console.WriteLine("请输入开始抢购的时间:"); int minute = int.Parse(Console.ReadLine()); var client = new CSRedis.CSRedisClient("119.45.174.249:6379,password=123456,defaultDatabase=0"); //商品数量(设置为10) client.Set("goodNum", 10); //订单数(默认为0) client.Set("orderNum", 0);
//开启30个线程去抢购 Console.WriteLine($"在{minute}分0秒正式开启秒杀!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => {
MyLock1.Show(i, "akey", 2); }); }); } } Console.ReadKey(); }
复制代码
原文链接:第十五节:Redis 分布式锁剖析和几种客户端的实现 - Yaopengfei - 博客园
评论