写点什么

在代码世界游走,没几把“锁”防身可不行 | 京东云技术团队

  • 2023-08-21
    北京
  • 本文字数:10930 字

    阅读完需:约 36 分钟

在代码世界游走,没几把“锁”防身可不行 | 京东云技术团队

一、开篇背景

“锁”代表安全。在程序中(这里指 java)尤其多线程环境下,有了锁的帮助,会给数据安全带来保障,帮助线程更好的运作,避免竞争和互斥。


锁共有 15 种算法:乐观锁、悲观锁、自旋锁、重入锁、读写锁、公平锁、非公平锁、共享锁、独占锁、重量级锁、轻量级锁、偏向锁、分段锁、互斥锁、同步锁....一口气输出真的累,谁记这个啊。我们要吃现成的。ok,上面的一大堆在咱 java 里就是:


ReentrantLock,Synchronieed,ReentrantReadWriteLock,Atomic 全家桶,Concurrent 全家桶


已上在并发场景中都是被常常用到,想必大家都已炉火纯青般.....巴特!我们还有后浪同学们可能不熟悉,那我在这里聊下锁的用法和使用场景。

ReentrantLock:

ReentrantLock 是一个互斥的可重入锁。互斥的意思就是排他,独占,只能一个线程获取到锁。可重入的意思就是单个线程可以多次重复获取锁。它实现了悲观锁、重入锁、独占锁、非公平锁和互斥锁的算法。


常用方法



场景


递归嵌套的业务场景中,例如一棵树型的业务逻辑,方法有嵌套和调用,这时候我从外层加锁后,在递归遍历多次,每次都要是同一把锁,并且递归到其他层级时锁还不能失效。这个时候就可以使用重入锁了。江湖上还有个花名,叫递归锁。

Synchronized

Synchronized 是悲观锁,默认是偏向锁,解锁失败后会升级为轻量级锁,当竞争继续加剧,进入 CAS 自旋 10 次后会升级为重量级锁。它实现了悲观锁、重入锁(用关键字修饰方法或代码段时)、独占锁,非公平锁、轻量级锁、重量级锁、偏向锁和同步锁算法。


使用方法



场景


多个线程访问共享变量,为了保证期原子性就可以使用 synchronized 。一个典型的业务场景是银行转账操作。假设有多个用户在同时进行转账操作,需要确保转账过程的原子性和数据的一致性,避免出现重复转账或者转账金额错误的情况。在这种情况下,可以使用 synchronized 关键字来实现同步访问。

ReentrantReadWriteLock

ReentrantReadWriteLock 是 Java 提供的读写锁,它支持多个线程同时读取共享数据,但只允许一个线程进行写操作。 他实现了读写锁和共享锁算法。


常用方法



场景


读写锁应用在读多写少的情况下。读取时不涉及数据修改,写入时需要互斥操作。现在基本所有的号称效率高的准实时数据库都有实现读写锁的算法。

Atomic 全家桶

Atomic 全家桶 我们已 AtomicInteger 为例。他的特点是实现了 CAS 算法,同时解决了 ABA 问题保证原子性。还实现了自旋锁的 CLHLock 算法,用于 CAS 比较失败后自旋等待。它实现了乐观锁、自旋锁和轻量级锁的算法。


常用方法



场景


用来做计数器非常合适,再有就是线程通讯,数据共享。

Concurrent 全家桶

Concurrent 全家桶我们已 ConcurrentHashMap 为代表。它实现了分段锁算法(Segmented Locking)的策略,将整个数据结构划分成多个 Segments,每个段都拥有独立的锁。


常用方法



场景


在 java 中 ConcurrentHashMap,就是将数据分为 16 段,每一段都有单独的锁,并且处于不同锁段的数据互不干扰,以此来提升锁的性能。


比如在秒杀扣库存的场景中,现在的库存中有 2000 个商品,用户可以秒杀。为了防止出现超卖的情况,通常情况下,可以对库存加锁。如果有 1W 的用户竞争同一把锁,显然系统吞吐量会非常低。为了提升系统性能,我们可以将库存分段,比如:分为 100 段,这样每段就有 20 个商品可以参与秒杀。在秒杀的过程中,先把用户 id 获取 hash 值,然后除以 100 取模。模为 1 的用户访问第 1 段库存,模为 2 的用户访问第 2 段库存,模为 3 的用户访问第 3 段库存,后面以此类推,到最后模为 100 的用户访问第 100 段库存。如此一来,在多线程环境中,可以大大的减少锁的冲突。

二、重点分布式场景 redisson 和 zk 的锁的介绍

Redisson

我们日常开发中用用的最多的场景还是分布式锁。提到分布式锁就不可回避 Redisson。WHY? 他就是权威好用。使用场景最多没有之一。Redisson 官方一共提供了 8 把锁。我们逐一看一下。

1 可重入锁(Reentrant Lock)

基于 Redis 的 Redisson 分布式可重入锁**RLock** Java 对象实现了java.util.concurrent.locks.Lock接口。同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。


   public static void main(String[] args) throws InterruptedException {        // connects to 127.0.0.1:6379 by default        RedissonClient redisson = Redisson.create();                RLock lock = redisson.getLock("lock");        lock.lock(2, TimeUnit.SECONDS);
Thread t = new Thread() { public void run() { RLock lock1 = redisson.getLock("lock"); lock1.lock(); lock1.unlock(); }; };
t.start(); t.join();
redisson.shutdown(); }
复制代码


大家都知道,如果负责储存这个分布式锁的 Redisson 节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson 内部提供了一个监控锁的看门狗,它的作用是在 Redisson 实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是 30 秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定。


另外 Redisson 还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


// 加锁以后10秒钟自动解锁// 无需调用unlock方法手动解锁lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);if (res) { try { ... } finally { lock.unlock(); }}
复制代码


Redisson 同时还为分布式锁提供了异步执行的相关方法:


RLock lock = redisson.getLock("anyLock");lock.lockAsync();lock.lockAsync(10, TimeUnit.SECONDS);Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
复制代码


RLock对象完全符合 Java 的 Lock 规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。但是如果遇到需要其他进程也能解锁的情况,请使用分布式信号量Semaphore 对象.

2. 公平锁(Fair Lock)

基于 Redis 的 Redisson 分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。它保证了当多个 Redisson 客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson 会等待 5 秒后继续下一个线程,也就是说如果前面有 5 个线程都处于等待状态,那么后面的线程会等待至少 25 秒。


public static void main(String[] args) throws InterruptedException {        // connects to 127.0.0.1:6379 by default        RedissonClient redisson = Redisson.create();                RLock lock = redisson.getFairLock("test");
int size = 10; List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < size; i++) { final int j = i; Thread t = new Thread() { public void run() { lock.lock(); lock.unlock(); }; }; threads.add(t); } for (Thread thread : threads) { thread.start(); thread.join(5); } for (Thread thread : threads) { thread.join(); } }
复制代码


同样也有看门狗机制来防止死锁。另外 Redisson 还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


// 10秒钟以后自动解锁// 无需调用unlock方法手动解锁fairLock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);...fairLock.unlock();
复制代码


Redisson 同时还为分布式可重入公平锁提供了异步执行的相关方法:


RLock fairLock = redisson.getFairLock("anyLock");fairLock.lockAsync();fairLock.lockAsync(10, TimeUnit.SECONDS);Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
复制代码

3. 联锁(MultiLock)

基于 Redis 的 Redisson 分布式联锁**RedissonMultiLock**对象可以将多个RLock对象关联为一个联锁,每个RLock对象实例可以来自于不同的 Redisson 实例。




public static void main(String[] args) throws InterruptedException { // connects to 127.0.0.1:6379 by default RedissonClient client = Redisson.create(); // 同时加锁:lock1 lock2 lock3 所有的锁都上锁成功才算成功。 RLock lock1 = client.getLock("lock1"); RLock lock2 = client.getLock("lock2"); RLock lock3 = client.getLock("lock3"); Thread t = new Thread() { public void run() { RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); lock.lock(); try { Thread.sleep(3000); } catch (InterruptedException e) { } lock.unlock(); }; }; t.start(); t.join(1000);
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); lock.lock(); lock.unlock(); }
复制代码


同样也有看门狗机制来防止死锁。另外 Redisson 还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。


RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);// 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间,并在加锁成功10秒钟后自动解开boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);...lock.unlock();
复制代码

4. 红锁(RedLock)

基于 Redis 的 Redisson 红锁RedissonRedLock对象实现了 Redlock 介绍的加锁算法。该对象也可以用来将多个RLock对象关联为一个红锁,每个RLock对象实例可以来自于不同的 Redisson 实例。


RLock lock1 = redissonInstance1.getLock("lock1");RLock lock2 = redissonInstance2.getLock("lock2");RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);// 同时加锁:lock1 lock2 lock3// 红锁在大部分节点上加锁成功就算成功。lock.lock();...lock.unlock();
复制代码



public static void main(String[] args) throws InterruptedException { // connects to 127.0.0.1:6379 by default RedissonClient client1 = Redisson.create(); RedissonClient client2 = Redisson.create(); RLock lock1 = client1.getLock("lock1"); RLock lock2 = client1.getLock("lock2"); RLock lock3 = client2.getLock("lock3"); Thread t1 = new Thread() { public void run() { lock3.lock(); }; }; t1.start(); t1.join(); Thread t = new Thread() { public void run() { RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); lock.lock(); try { Thread.sleep(3000); } catch (InterruptedException e) { } lock.unlock(); }; }; t.start(); t.join(1000);
lock3.forceUnlock(); RedissonMultiLock lock = new RedissonRedLock(lock1, lock2, lock3); // 给lock1,lock2,lock3加锁,如果没有手动解开的话,10秒钟后将会自动解开 lock.lock(10, TimeUnit.SECONDS); lock.unlock();
client1.shutdown(); client2.shutdown(); }
复制代码

5. 读写锁(ReadWriteLock)

基于 Redis 的 Redisson 分布式可重入读写锁RReadWriteLock Java 对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了 RLock 接口。


分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。


RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");// 最常见的使用方法rwlock.readLock().lock();// 或rwlock.writeLock().lock();
复制代码



public static void main(String[] args) throws InterruptedException { // connects to 127.0.0.1:6379 by default RedissonClient redisson = Redisson.create();
final RReadWriteLock lock = redisson.getReadWriteLock("lock");
lock.writeLock().tryLock();
Thread t = new Thread() { public void run() { RLock r = lock.readLock(); // 10秒钟以后自动解锁,无需调用unlock方法手动解锁 //lock.readLock().lock(10, TimeUnit.SECONDS); r.lock();
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } r.unlock(); }; };
t.start(); t.join();
lock.writeLock().unlock();
t.join(); redisson.shutdown(); }
复制代码

6. 信号量(Semaphore)

基于 Redis 的 Redisson 的分布式信号量(Semaphore)Java 对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。



public static void main(String[] args) throws InterruptedException { // connects to 127.0.0.1:6379 by default RedissonClient redisson = Redisson.create();
RSemaphore s = redisson.getSemaphore("test"); s.trySetPermits(5); s.acquire(3);
Thread t = new Thread() { @Override public void run() { RSemaphore s = redisson.getSemaphore("test"); s.release(); s.release(); } };
t.start(); //或 s.acquire(4); redisson.shutdown(); }
复制代码

7. 可过期性信号量(PermitExpirableSemaphore)

基于 Redis 的 Redisson 可过期性信号量(PermitExpirableSemaphore)是在RSemaphore对象的基础上,为每个信号增加了一个过期时间。每个信号可以通过独立的 ID 来辨识,释放时只能通过提交这个 ID 才能释放。它提供了异步(Async)、反射式(Reactive)和 RxJava2 标准的接口。



public static void main(String[] args) throws InterruptedException { // connects to 127.0.0.1:6379 by default RedissonClient redisson = Redisson.create();
RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test"); s.trySetPermits(1); // 获取一个信号,有效期只有2秒钟。 String permitId = s.tryAcquire(100, 2, TimeUnit.SECONDS);
Thread t = new Thread() { public void run() { RPermitExpirableSemaphore s = redisson.getPermitExpirableSemaphore("test"); try { String permitId = s.acquire(); s.release(permitId); } catch (InterruptedException e) { e.printStackTrace(); } }; };
t.start(); t.join(); s.tryRelease(permitId); }
复制代码

8. 闭锁(CountDownLatch)

基于 Redisson 的 Redisson 分布式闭锁(CountDownLatch)Java 对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。


 public static void main(String[] args) throws InterruptedException {        // connects to 127.0.0.1:6379 by default        RedissonClient redisson = Redisson.create();
ExecutorService executor = Executors.newFixedThreadPool(2);
final RCountDownLatch latch = redisson.getCountDownLatch("latch1"); latch.trySetCount(1); // 在其他线程或其他JVM里 executor.execute(new Runnable() { @Override public void run() { latch.countDown(); } });
executor.execute(new Runnable() {
@Override public void run() { try { latch.await(550, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } });
executor.shutdown(); executor.awaitTermination(10, TimeUnit.SECONDS);
}
复制代码

Zookeeper

接着我们再看 zookeeper 的锁。zk 不是为高可用性设计的,但它使用**ZAB**协议达到了极高的一致性。所以它经常被选作注册中心、配置中心、分布式锁等场景。它的性能是非常有限的,而且 API 并不是那么好用。xjjdog 倾向于使用基于**Raft**协议的**Etcd**或者**Consul**,它们更加轻量级一些。我们看一下 zk 的加锁时序图。



Curator 是 netflix 公司开源的一套 zookeeper 客户端,目前是 Apache 的顶级项目。与 Zookeeper 提供的原生客户端相比,Curator 的抽象层次更高,简化了 Zookeeper 客户端的开发量。Curator 解决了很多 zookeeper 客户端非常底层的细节开发工作,包括连接重连、反复注册 wathcer 和 NodeExistsException 异常等。Curator 由一系列的模块构成,对于一般开发者而言,常用的是 curator-framework 和 curator-recipes,我们跳过他的其他能力,直接看分布式锁。

1. (可重入锁 Shared Reentrant Lock)

Shared 意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant 和 JDK 的 ReentrantLock 类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类 InterProcessMutex 来实现。 通过 acquire 获得锁,并提供超时机制。通过 release()方法释放锁。 InterProcessMutex 实例可以重用。 Revoking ZooKeeper recipes wiki 定义了可协商的撤销机制。 为了撤销 mutex, 调用 makeRevocable 方法。我们来看示例:


public class ExampleClientThatLocks {    private final InterProcessMutex lock;    private final FakeLimitedResource resource;    private final String clientName;
public ExampleClientThatLocks( CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) { this.resource = resource; this.clientName = clientName; lock = new InterProcessMutex(client, lockPath); }
public void doWork(long time, TimeUnit unit) throws Exception { if (!lock.acquire(time, unit)) { throw new IllegalStateException(clientName + " could not acquire the lock"); } try { System.out.println(clientName + " has the lock"); resource.use(); } finally { System.out.println(clientName + " releasing the lock"); //释放锁 lock.release(); } }}
复制代码

2. 不可重入锁(Shared Lock)

使用 InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入。

3. 可重入读写锁(Shared Reentrant Read Write Lock)

类似 JDK 的 ReentrantReadWriteLock. 一个读写锁管理一对相关的锁。 一个负责读操作,另外一个负责写操作。 读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。 此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。 主要由两个类实现:


InterProcessReadWriteLockInterProcessLock
复制代码

4. 信号量(Shared Semaphore)

一个计数的信号量类似 JDK 的 Semaphore。 JDK 中 Semaphore 维护的一组许可(permits),而 Cubator 中称之为租约(Lease)。注意,所有的实例必须使用相同的 numberOfLeases 值。 调用 acquire 会返回一个租约对象。 客户端必须在 finally 中 close 这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端 session 由于某种原因比如 crash 丢掉, 那么这些客户端持有的租约会自动 close, 这样其它客户端可以继续使用这些租约。 租约还可以通过下面的方式返还:


public void returnAll(Collection<Lease> leases)public void returnLease(Lease lease)
复制代码


注意一次你可以请求多个租约,如果 Semaphore 当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法:


public Lease acquire()public Collection<Lease> acquire(int qty)public Lease acquire(long time, TimeUnit unit)public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
复制代码


主要类有:


InterProcessSemaphoreV2LeaseSharedCountReader
复制代码

5. 多锁对象(Multi Shared Lock)

Multi Shared Lock 是一个锁的容器。 当调用 acquire, 所有的锁都会被 acquire,如果请求失败,所有的锁都会被 release。 同样调用 release 时所有的锁都被 release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。 主要涉及两个类:


InterProcessMultiLockInterProcessLock
复制代码


它的构造函数需要包含的锁的集合,或者一组 ZooKeeper 的 path。


public InterProcessMultiLock(List<InterProcessLock> locks)public InterProcessMultiLock(CuratorFramework client, List<String> paths)
复制代码

6. 完整锁示例

我们再看一个官方完整锁示例:


public class LockingExample {    private static final int QTY = 5;    private static final int REPETITIONS = QTY * 10;
private static final String PATH = "/examples/locks";
public static void main(String[] args) throws Exception { // all of the useful sample code is in ExampleClientThatLocks.java
// FakeLimitedResource simulates some external resource that can only be access by one process at a time final FakeLimitedResource resource = new FakeLimitedResource();
ExecutorService service = Executors.newFixedThreadPool(QTY); final TestingServer server = new TestingServer(); try { for (int i = 0; i < QTY; ++i) { final int index = i; Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { CuratorFramework client = CuratorFrameworkFactory.newClient( server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); try { client.start();
ExampleClientThatLocks example = new ExampleClientThatLocks(client, PATH, resource, "Client " + index); for (int j = 0; j < REPETITIONS; ++j) { example.doWork(10, TimeUnit.SECONDS); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { e.printStackTrace(); // log or do something } finally { CloseableUtils.closeQuietly(client); } return null; } }; service.submit(task); }
service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); } finally { CloseableUtils.closeQuietly(server); } }}
复制代码

三、总结

分布式环境中,我们始终绕不开 CAP 理论,这也是 Redisson 锁和 ZK 锁的本质区别。CAP 指的是在一个分布式系统中:一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。


这三个要素最多只能同时实现两点,不可能三者兼顾。如果你的实际业务场景,更需要的是保证数据一致性。那么请使用 CP 类型的分布式锁,比如:zookeeper,它是基于磁盘的,性能可能没那么好,但数据一般不会丢。


如果你的实际业务场景,更需要的是保证数据高可用性。那么请使用 AP 类型的分布式锁,比如:Redisson,它是基于内存的,性能比较好,但有丢失数据的风险。


其实,在我们绝大多数分布式业务场景中,使用 Redisson 分布式锁就够了,真的别太较真。因为数据不一致问题,可以通过最终一致性方案解决。但如果系统不可用了,对用户来说是暴击一万点伤害。

四、思考思考

以上就是我对锁的总结和。分布式锁不是完美的,总会有问题;如:在 redis 中,lockName 和已有的 key 不能重名 !unlock 的前提是 lock 成功!必须要设计自己的兜底方案......


回顾整个锁界,兄弟们都掌握了哪些锁呢?在分布式锁的场景中都遇到哪些疑难杂症?我们评论区继续

五、备注

redisson:


官方实例


官方代码示例


curator:


curator的ZK客户端示例


作者:京东保险 管顺利

来源:京东云开发者社区 转载请注明来源

发布于: 刚刚阅读数: 3
用户头像

拥抱技术,与开发者携手创造未来! 2018-11-20 加入

我们将持续为人工智能、大数据、云计算、物联网等相关领域的开发者,提供技术干货、行业技术内容、技术落地实践等文章内容。京东云开发者社区官方网站【https://developer.jdcloud.com/】,欢迎大家来玩

评论

发布
暂无评论
在代码世界游走,没几把“锁”防身可不行 | 京东云技术团队_分布式锁_京东科技开发者_InfoQ写作社区