分布式锁—Curator 的分布式锁
- 2025-03-10 福建
本文字数:26679 字
阅读完需:约 88 分钟
1.Curator 的可重入锁的源码
(1)InterProcessMutex 获取分布式锁
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",
5000,
3000,
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端");
//获取分布式锁
InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");
lock.acquire();
Thread.sleep(1000);
lock.release();
}
}
(2)InterProcessMutex 的初始化
设置锁的节点路径 basePath + 初始化一个 LockInternals 对象实例。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
...
public InterProcessMutex(CuratorFramework client, String path) {
this(client, path, new StandardLockInternalsDriver());
}
public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
this(client, path, LOCK_NAME, 1, driver);
}
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//1.设置锁的节点路径
basePath = PathUtils.validatePath(path);
//2.初始化一个LockInternals对象实例
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
}
public class LockInternals {
private final LockInternalsDriver driver;
private final String lockName;
private volatile int maxLeases;
private final WatcherRemoveCuratorFramework client;
private final String basePath;
private final String path;
...
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
...
}
(3)InterProcessMutex.acquire()尝试获取锁
LockData 是 InterProcessMutex 的一个静态内部类。一个线程对应一个 LockData 实例对象,用来描述线程持有的锁的具体情况。多个线程对应的 LockData 存放在一个叫 threadData 的 ConcurrentMap 中。LockData 中有一个原子变量 lockCount,用于锁的重入次数计数。
在执行 InterProcessMutex 的 acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的 LockData 数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用 LockInternals 的 attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间 time = -1。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
//一个线程对应一个LockData数据对象
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//设置锁的路径
basePath = PathUtils.validatePath(path);
//初始化LockInternals
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
@Override
public void acquire() throws Exception {
//获取分布式锁,会一直阻塞等待直到获取成功
//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData数据
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
//可重入计算
lockData.lockCount.incrementAndGet();
return true;
}
//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
LockData newLockData = new LockData(currentThread, lockPath);
//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
//LockData是InterProcessMutex的一个静态内部类
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
protected byte[] getLockNodeBytes() {
return null;
}
...
}
(4)LockInternals.attemptLock()尝试获取锁
先创建临时节点,再判断是否满足获取锁的条件。
步骤一:首先调用 LockInternalsDriver 的 createsTheLock()方法创建一个临时顺序节点。其中 creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。
步骤二:然后调用 LockInternals 的 internalLockLoop()方法检查是否获取到了锁。在 LockInternals 的 internalLockLoop()方法的 while 循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的 PredicateResults 对象。
具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和 maxLeases 的大小。其中 maxLeases 代表了同时允许多少个客户端可以获取到锁,默认是 1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。
获取锁成功,则会中断 LockInternals 的 internalLockLoop()方法的 while 循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在 InterProcessMutex 的 internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个 LockData 对象,然后把该 LockData 对象存放到 InterProcessMutex.threadData 这个 Map 中。
获取锁失败,则通过 PredicateResults 对象先获取前一个节点路径名称。然后通过 getData()方法获取前一个节点路径在 zk 的信息,并添加 Watcher 监听。该 Watcher 监听主要是用来唤醒在 LockInternals 中被 wait()阻塞的线程。添加完 Watcher 监听后,便会调用 wait()方法将当前线程挂起。
所以前一个节点发生变化时,便会通知添加的 Watcher 监听。然后便会唤醒阻塞的线程,继续执行 internalLockLoop()方法的 while 循环。while 循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。
public class LockInternals {
private final LockInternalsDriver driver;
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
...
}
...
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//获取当前时间
final long startMillis = System.currentTimeMillis();
//默认情况下millisToWait=null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//默认情况下localLockNodeBytes也是null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;//是否已经获取到锁
boolean isDone = false;//是否正在获取锁
while (!isDone) {
isDone = true;
//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//2.检查是否获取到了锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
List<String> getSortedChildren() throws Exception {
//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
return getSortedChildren(client, basePath, lockName, driver);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = client.getChildren().forPath(basePath);
//对节点名称进行排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort(
sortedList,
new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//级联创建一个临时顺序节点
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
//默认情况下传入的lockNodeBytes=null
if (lockNodeBytes != null) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
//创建临时顺序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(5)不同客户端线程获取锁时的互斥实现
maxLeases 代表了同时允许多少个客户端可以获取到锁,默认值是 1。能否获取锁的判断就是:线程创建的节点的位置 outIndex < maxLeases。当线程 1 创建的节点在节点列表中排第一时,满足 outIndex = 0 < maxLeases = 1,可以获取锁。当线程 2 创建的节点再节点列表中排第二时,不满足 outIndex = 1 < maxLeases = 1,所以不能获取锁。从而实现线程 1 和线程 2 获取锁时的互斥。
(6)同一客户端线程可重入加锁的实现
客户端线程重复获取锁时,会重复调用 InterProcessMutex 的 internalLock()方法。在 InterProcessMutex 的 internalLock()方法中:线程第一次获取锁成功会创建一个 LockData 对象,并存放在一个 Map 中。线程第二次获取锁时,便会从这个 Map 中取出这个 LockData 对象,并对 LockData 对象中的重入计数器 lockCount 进行递增,接着就返回 true。以此实现可重入加锁。
(7)客户端线程释放锁的实现
客户端线程释放锁时会调用 InterProcessMutex 的 release()方法。
首先对 LockData 里的重入计数器进行递减。当重入计数器大于 0 时,直接返回。当重入计数器为 0 时才执行下一步删除节点的操作。
然后删除客户端线程创建的临时顺序节点,client.delete().guaranteed().forPath(ourPath)。
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
@Override
public void release() throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData对象
LockData lockData = threadData.get(currentThread);
if (lockData == null) {
throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
}
//1.首先对LockData里的重入计数器lockCount进行递减
int newLockCount = lockData.lockCount.decrementAndGet();
if (newLockCount > 0) {
//当重入计数器大于0时,直接返回
return;
}
if (newLockCount < 0) {
throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
}
try {
//2.当重入计数器为0时执行删除节点的操作
internals.releaseLock(lockData.lockPath);
} finally {
threadData.remove(currentThread);
}
}
...
}
public class LockInternals {
...
final void releaseLock(String lockPath) throws Exception {
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
private void deleteOurPath(String ourPath) throws Exception {
//删除节点
client.delete().guaranteed().forPath(ourPath);
}
...
}
(8)客户端线程释放锁后其他线程获取锁的实现
由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到 zk 的通知。于是会回调执行该线程添加的 Watcher 的 process()方法,也就是唤醒该线程,让其继续执行 while 循环获取锁。
public class LockInternals {
...
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
...
}
(9)InterProcessMutex 就是一个公平锁
因为所有客户端线程都会创建一个顺序节点,然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁,实现了所有客户端排队获取锁。

2.Curator 的非可重入锁的源码
(1)Curator 的非可重入锁 InterProcessSemaphoreMutex 的使用
非可重入锁:同一个时间只能有一个客户端线程获取到锁,其他线程都要排队,而且同一个客户端线程是不可重入加锁的。
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//非可重入锁
InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks");
lock.acquire();
Thread.sleep(3000);
lock.release();
}
}
(2)Curator 的非可重入锁 InterProcessSemaphoreMutex 的源码
Curator 的非可重入锁是基于 Semaphore 来实现的,也就是将 Semaphore 允许获取 Lease 的客户端线程数设置为 1,从而实现同一时间只能有一个客户端线程获取到 Lease。
public class InterProcessSemaphoreMutex implements InterProcessLock {
private final InterProcessSemaphoreV2 semaphore;
private final WatcherRemoveCuratorFramework watcherRemoveClient;
private volatile Lease lease;
public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
}
@Override
public void acquire() throws Exception {
//获取非可重入锁就是获取Semaphore的Lease
lease = semaphore.acquire();
}
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
Lease acquiredLease = semaphore.acquire(time, unit);
if (acquiredLease == null) {
return false;
}
lease = acquiredLease;
return true;
}
@Override
public void release() throws Exception {
//释放非可重入锁就是释放Semaphore的Lease
Lease lease = this.lease;
Preconditions.checkState(lease != null, "Not acquired");
this.lease = null;
lease.close();
watcherRemoveClient.removeWatchers();
}
}
3.Curator 的可重入读写锁的源码
(1)Curator 的可重入读写锁 InterProcessReadWriteLock 的使用
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//读写锁
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/locks");
lock.readLock().acquire();
lock.readLock().release();
lock.writeLock().acquire();
lock.writeLock().release();
}
}
(2)Curator 的可重入读写锁 InterProcessReadWriteLock 的初始化
读锁和写锁都是基于可重入锁 InterProcessMutex 的子类来实现的。读锁和写锁的获取锁和释放锁逻辑,就是使用 InterProcessMutex 的逻辑。
public class InterProcessReadWriteLock {
private final InterProcessMutex readMutex;//读锁
private final InterProcessMutex writeMutex;//写锁
//must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
...
//InterProcessReadWriteLock的初始化
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
//写锁的初始化
writeMutex = new InternalInterProcessMutex(
client,
basePath,
WRITE_LOCK_NAME,//写锁的lockName='__WRIT__'
lockData,
1,//写锁的maxLeases
new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
//读锁的初始化
readMutex = new InternalInterProcessMutex(
client,
basePath,
READ_LOCK_NAME,//读锁的lockName='__READ__'
lockData,
Integer.MAX_VALUE,//读锁的maxLeases
new SortingLockInternalsDriver() {
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
private static class InternalInterProcessMutex extends InterProcessMutex {
private final String lockName;
private final byte[] lockData;
InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver) {
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
this.lockData = lockData;
}
...
}
public InterProcessMutex readLock() {
return readMutex;
}
public InterProcessMutex writeLock() {
return writeMutex;
}
...
}
(3)InterProcessMutex 获取锁的源码
public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
private final LockInternals internals;
private final String basePath;
private static final String LOCK_NAME = "lock-";
//一个线程对应一个LockData数据对象
private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
...
//初始化InterProcessMutex
InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
//设置锁的路径
basePath = PathUtils.validatePath(path);
//初始化LockInternals
internals = new LockInternals(client, driver, path, lockName, maxLeases);
}
@Override
public void acquire() throws Exception {
//获取分布式锁,会一直阻塞等待直到获取成功
//相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
if (!internalLock(-1, null)) {
throw new IOException("Lost connection while trying to acquire lock: " + basePath);
}
}
private boolean internalLock(long time, TimeUnit unit) throws Exception {
//获取当前线程
Thread currentThread = Thread.currentThread();
//获取当前线程对应的LockData数据
LockData lockData = threadData.get(currentThread);
if (lockData != null) {
//可重入计算
lockData.lockCount.incrementAndGet();
return true;
}
//调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
if (lockPath != null) {
//获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
LockData newLockData = new LockData(currentThread, lockPath);
//然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
threadData.put(currentThread, newLockData);
return true;
}
return false;
}
//LockData是InterProcessMutex的一个静态内部类
private static class LockData {
final Thread owningThread;
final String lockPath;
final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
private LockData(Thread owningThread, String lockPath) {
this.owningThread = owningThread;
this.lockPath = lockPath;
}
}
protected byte[] getLockNodeBytes() {
return null;
}
...
}
public class LockInternals {
private final LockInternalsDriver driver;
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
this.driver = driver;
this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
...
}
...
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
//获取当前时间
final long startMillis = System.currentTimeMillis();
//默认情况下millisToWait=null
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
//默认情况下localLockNodeBytes也是null
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;//是否已经获取到锁
boolean isDone = false;//是否正在获取锁
while (!isDone) {
isDone = true;
//1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
//2.检查是否获取到了锁
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
if (hasTheLock) {
return ourPath;
}
return null;
}
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒LockInternals中被wait()阻塞的线程
client.postSafeNotify(LockInternals.this);
}
};
//检查是否获取到了锁
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
boolean haveTheLock = false;
boolean doDelete = false;
...
while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
//3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = getSortedChildren();
//4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
//5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if (predicateResults.getsTheLock()) {//获取锁成功
//返回true
haveTheLock = true;
} else {//获取锁失败
//获取前一个节点路径名称
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this) {
//use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
//通过getData()获取前一个节点路径在zk的信息,并添加watch监听
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//默认情况下,millisToWait = null
if (millisToWait != null) {
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait <= 0) {
doDelete = true;//timed out - delete our node
break;
}
wait(millisToWait);//阻塞
} else {
wait();//阻塞
}
}
}
}
...
return haveTheLock;
}
List<String> getSortedChildren() throws Exception {
//获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
return getSortedChildren(client, basePath, lockName, driver);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
//获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
List<String> children = client.getChildren().forPath(basePath);
//对节点名称进行排序
List<String> sortedList = Lists.newArrayList(children);
Collections.sort(
sortedList,
new Comparator<String>() {
@Override
public int compare(String lhs, String rhs) {
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
...
}
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//级联创建一个临时顺序节点
@Override
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
String ourPath;
//默认情况下传入的lockNodeBytes=null
if (lockNodeBytes != null) {
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
} else {
//创建临时顺序节点
ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
}
return ourPath;
}
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(4)先获取读锁 + 后获取读锁的情形分析
当线程创建完临时顺序节点,并获取到排好序的节点列表 children 后,执行 LockInternalsDriver 的 getsTheLock()方法获取能否成功加锁的信息时,会执行到 InterProcessReadWriteLock 的 readLockPredicate()方法。
由于此时 firstWriteIndex = Integer.MAX_VALUE,所以无论多少线程尝试获取读锁,都能满足 ourIndex < firstWriteIndex,也就是 getsTheLock 的值会为 true,即表示可以获取读锁。
所以读读不互斥。
public class InterProcessReadWriteLock {
...
//sequenceNodeName是当前线程创建的临时顺序节点的路径名称
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
if (writeMutex.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for (String node : children) {
if (node.contains(WRITE_LOCK_NAME)) {
firstWriteIndex = Math.min(index, firstWriteIndex);
} else if (node.startsWith(sequenceNodeName)) {
//找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
(5)先获取读锁 + 后获取写锁的情形分析
一.假设客户端线程 1 首先成功获取了读锁
那么在/locks 目录下,此时已经有了如下这个读锁的临时顺序节点。
/locks/43f3-4c2f-ba98-07a641d351f2-__READ__0000000004
二.然后另一个客户端线程 2 过来尝试获取写锁
于是该线程 2 会也会先在/locks 目录下创建出如下写锁的临时顺序节点:
/locks/9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005
接着该线程会获取/locks 目录的当前子节点列表并进行排序,结果如下:
[43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]
然后会执行 StandardLockInternalsDriver 的 getsTheLock()方法。由于初始化写锁时,设置了其 maxLeases 是 1,而在 StandardLockInternalsDriver 的 getsTheLock()方法中,判断线程能成功获取写锁的依据是:ourIndex < maxLeases。即如果要成功获取写锁,那么线程创建的节点在子节点列表里必须排第一。
而此时,由于之前已有线程获取过一个读锁,而后来又有其他线程往里面创建一个写锁的临时顺序节点。所以写锁的临时顺序节点在子节点列表 children 里排第二,ourIndex 是 1。所以 index = 1 < maxLeases = 1,条件不成立。
因此,此时客户端线程 2 获取写锁失败。于是该线程便会给前一个节点添加一个监听器,并调用 wait()方法把自己挂起。如果前面一个节点被删除释放了锁,那么该线程就会被唤醒,从而再次尝试判断自己创建的节点是否在当前子节点列表中排第一。如果是,那么就表示获取写锁成功。
public class StandardLockInternalsDriver implements LockInternalsDriver {
...
//获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
//根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
int ourIndex = children.indexOf(sequenceNodeName);
validateOurIndex(sequenceNodeName, ourIndex);
//maxLeases代表的是同时允许多少个客户端可以获取到锁
//getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
boolean getsTheLock = ourIndex < maxLeases;
//获取当前节点需要watch的前一个节点路径
String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
6)先获取写锁 + 后获取读锁的情形分析
一.假设客户端线程 1 先获取了写锁
那么在/locks 目录下,此时已经有了如下这个写锁的临时顺序节点。
/locks/4383-466e-9b86-fda522ea061a-__WRIT__0000000006
二.然后另一个客户端线程 2 过来尝试获取读锁
于是该线程 2 会也会先在/locks 目录下创建出如下读锁的临时顺序节点:
/locks/5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007
接着该线程会获取/locks 目录的当前子节点列表并进行排序,结果如下:
[4383-466e-9b86-fda522ea061a-__WRIT__0000000006,
5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007]
然后会执行 LockInternalsDriver 的 getsTheLock()方法获取能否加锁的信息,也就是会执行 InterProcessReadWriteLock 的 readLockPredicate()方法。
public class InterProcessReadWriteLock {
...
//sequenceNodeName是当前线程创建的临时顺序节点的路径名称
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
//如果是同一个客户端线程,先加写锁,再加读锁,是可以成功的,不会互斥
if (writeMutex.isOwnedByCurrentThread()) {
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for (String node : children) {
if (node.contains(WRITE_LOCK_NAME)) {
firstWriteIndex = Math.min(index, firstWriteIndex);
} else if (node.startsWith(sequenceNodeName)) {
//找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
...
}
在 InterProcessReadWriteLock 的 readLockPredicate()方法中,如果是同一个客户端线程,先获取写锁,再获取读锁,是不会互斥的。如果是不同的客户端线程,线程 1 先获取写锁,线程 2 再获取读锁,则互斥。因为线程 2 执行 readLockPredicate()方法在遍历子节点列表(children)时,如果在子节点列表(children)中发现了一个写锁,会设置 firstWriteIndex=0。而此时线程 2 创建的临时顺序节点的 ourIndex=1,所以不满足 ourIndex(1) < firstWriteIndex(0),于是线程 2 获取读锁失败。
总结,获取读锁时,在当前线程创建的节点前面:如果还有写锁对应的节点,那么 firstWriteIndex 就会被重置为具体位置。如果没有写锁对应的节点,那么 firstWriteIndex 就是 MAX_VALUE。而只要 firstWriteIndex 为 MAX_VALUE,那么就可以不断允许获取读锁。
(7)先获取写锁 + 再获取写锁的情形分析
如果客户端线程 1 先获取了写锁,然后后面客户端线程 2 来获取这个写锁。此时线程 2 会发现自己创建的节点排在节点列表中的第二,不是第一。于是获取写锁失败,进行阻塞挂起。等线程 1 释放了写锁后,才会唤醒线程 2 继续尝试获取写锁。
4.Curator 的 MultiLock 源码
(1)Curator 的 MultiLock 的使用
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//MultiLock
InterProcessLock lock1 = new InterProcessMutex(client, "/locks/lock_01");
InterProcessLock lock2 = new InterProcessMutex(client, "/locks/lock_02");
InterProcessLock lock3 = new InterProcessMutex(client, "/locks/lock_03");
List<InterProcessLock> locks = new ArrayList<InterProcessLock>();
locks.add(lock1);
locks.add(lock2);
locks.add(lock3);
InterProcessMultiLock multiLock = new InterProcessMultiLock(locks);
}
}
(2)Curator 的 MultiLock 的源码
MultiLock 原理:依次遍历获取每个锁,阻塞直到获取每个锁为止,然后返回 true。如果过程中有报错,依次释放已经获取到的锁,然后返回 false。
public class InterProcessMultiLock implements InterProcessLock {
private final List<InterProcessLock> locks;
public InterProcessMultiLock(List<InterProcessLock> locks) {
this.locks = ImmutableList.copyOf(locks);
}
//获取锁
@Override
public void acquire() throws Exception {
acquire(-1, null);
}
@Override
public boolean acquire(long time, TimeUnit unit) throws Exception {
Exception exception = null;
List<InterProcessLock> acquired = Lists.newArrayList();
boolean success = true;
//依次遍历获取每个锁,阻塞直到获取每个锁为止
for (InterProcessLock lock : locks) {
try {
if (unit == null) {
lock.acquire();
acquired.add(lock);
} else {
if (lock.acquire(time, unit)) {
acquired.add(lock);
} else {
success = false;
break;
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
success = false;
exception = e;
}
}
if (!success) {
for (InterProcessLock lock : reverse(acquired)) {
try {
lock.release();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
// ignore
}
}
}
if (exception != null) {
throw exception;
}
return success;
}
@Override
public synchronized void release() throws Exception {
Exception baseException = null;
for (InterProcessLock lock : reverse(locks)) {
try {
lock.release();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
if (baseException == null) {
baseException = e;
} else {
baseException = new Exception(baseException);
}
}
}
if (baseException != null) {
throw baseException;
}
}
...
}
5.Curator 的 Semaphore 源码
Semaphore 信号量,就是指定同时可以有多个线程获取到锁。
(1)基于 InterProcessSemaphoreV2 使用 Semaphore
public class Demo {
public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
final CuratorFramework client = CuratorFrameworkFactory.newClient(
"127.0.0.1:2181",//zk的地址
5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
3000,//连接zk时的超时时间
retryPolicy
);
client.start();
System.out.println("已经启动Curator客户端,完成zk的连接");
//获取Semaphore
InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphore", 3);
Lease lease = semaphore.acquire();//获取Semaphore的一个锁
Thread.sleep(3000);
semaphore.returnLease(lease);//向Semaphore返还一个锁
}
}
(2)InterProcessSemaphoreV2 的初始化
public class InterProcessSemaphoreV2 {
private final WatcherRemoveCuratorFramework client;
private final InterProcessMutex lock;
private final String leasesPath;
private volatile int maxLeases;
...
//maxLeases表示该实例可以允许获取的lease数量
public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
this(client, path, maxLeases, null);
}
//初始化InterProcessSemaphoreV2时,传入的参数path = "/semaphore",参数maxLeases = 3
private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
this.client = client.newWatcherRemoveCuratorFramework();
path = PathUtils.validatePath(path);
//锁的path是ZKPaths.makePath(path, LOCK_PARENT) => '/semaphore/locks'
//初始化一个InterProcessMutex分布式锁
this.lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
this.maxLeases = (count != null) ? count.getCount() : maxLeases;
//lease的path是:'/semaphore/leases'
this.leasesPath = ZKPaths.makePath(path, LEASE_PARENT);
...
}
...
}
(3)InterProcessSemaphoreV2.acquire()方法获取 Semaphore 的 Lease
客户端线程尝试获取 Semaphore 的一个 Lease。
步骤一:首先会获取初始化时创建的锁 InterProcessMutex
锁的路径是:/semaphore/locks。当多个客户端线程同时执行 acquire()获取 Lease 时只会有一个线程成功,而其他线程会基于锁路径下的临时顺序节点来排队获取锁。
步骤二:获取锁成功后才会尝试获取 Semaphore 的 Lease
Lease 的路径是:/semaphore/leases。此时会先到'/semaphore/leases'目录下创建一个临时顺序节点,然后会调用 InterProcessSemaphoreV2 的 makeLease()方法创建一个 Lease。这个 Lease 对象就是客户端线程成功获取 Semaphore 的一个 Lease。
创建完 Lease 对象后,接着会进入一个 for 循环,会先获取/semaphore/leases 目录下的所有临时顺序节点,并添加监听。然后判断/semaphore/leases 目录下节点的数量是否大于 maxLeases。如果临时顺序节点的数量小于 maxLeases,那么说明当前客户端线程成功获取 Semaphore 的 Lease,于是退出循环。如果临时顺序节点的数量大于 maxLeases,那么当前客户端线程就要调用 wait()进行阻塞等待。
public class InterProcessSemaphoreV2 {
private final InterProcessMutex lock;
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//唤醒在InterProcessSemaphoreV2对象中执行wait()而被阻塞的线程
client.postSafeNotify(InterProcessSemaphoreV2.this);
}
};
...
public Lease acquire() throws Exception {
Collection<Lease> leases = acquire(1, 0, null);
return leases.iterator().next();
}
public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception {
long startMs = System.currentTimeMillis();
boolean hasWait = (unit != null);
long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
Preconditions.checkArgument(qty > 0, "qty cannot be 0");
ImmutableList.Builder<Lease> builder = ImmutableList.builder();
boolean success = false;
try {
while (qty-- > 0) {
int retryCount = 0;
long startMillis = System.currentTimeMillis();
boolean isDone = false;
while (!isDone) {
switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) {
case CONTINUE: {
isDone = true;
break;
}
case RETURN_NULL: {
return null;
}
case RETRY_DUE_TO_MISSING_NODE: {
if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");
}
//try again
break;
}
}
}
}
success = true;
} finally {
if (!success) {
returnAll(builder.build());
}
}
return builder.build();
}
private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception {
if (client.getState() != CuratorFrameworkState.STARTED) {
return InternalAcquireResult.RETURN_NULL;
}
if (hasWait) {
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) {
return InternalAcquireResult.RETURN_NULL;
}
} else {
//1.首先获取一个分布式锁
lock.acquire();
}
Lease lease = null;
boolean success = false;
try {
//2.尝试获取Semaphore的Lease:创建一个临时顺序节点
PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
String nodeName = ZKPaths.getNodeFromPath(path);
lease = makeLease(path);
...
try {
synchronized(this) {
for(;;) {
List<String> children;
//3.获取./lease目录下的所有临时顺序节点,并添加watcher监听
children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
...
//4.判断临时顺序节点的数量是否大于maxLeases
//maxLeases表示最多允许多少个客户端线程获取Semaphore的Lease
if (children.size() <= maxLeases) {
//如果临时顺序节点的数量小于maxLeases
//那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环
break;
}
//如果临时顺序节点的数量大于maxLeases
//那么当前客户端线程就要调用wait()进行阻塞等待
if (hasWait) {
long thisWaitMs = getThisWaitMs(startMs, waitMs);
if (thisWaitMs <= 0) {
return InternalAcquireResult.RETURN_NULL;
}
...
wait(thisWaitMs);
} else {
...
wait();
}
}
success = true;
}
} finally {
if (!success) {
returnLease(lease);
}
client.removeWatchers();
}
} finally {
//释放掉之前获取的锁
lock.release();
}
builder.add(Preconditions.checkNotNull(lease));
return InternalAcquireResult.CONTINUE;
}
...
}
(4)InterProcessSemaphoreV2.returnLease()方法释放 Semaphore 的 Lease
执行 InterProcessSemaphoreV2 的 returnLease()方法时,最终会执行 makeLease()生成的 Lease 对象的 close()方法,而 close()方法会删除在/semaphore/leases 目录下创建的临时顺序节点。
当/semaphore/leases 目录下的节点发生变化时,那些对该目录进行 Watcher 监听的客户端就会收到通知,于是就会执行 Watcher 里的 process()方法,唤醒执行 wait()时被阻塞的线程,从而让这些没有成功获取 Semaphore 的 Lease 的线程继续尝试获取 Lease。
public class InterProcessSemaphoreV2 {
...
public void returnLease(Lease lease) {
//执行Lease的close()方法
CloseableUtils.closeQuietly(lease);
}
private Lease makeLease(final String path) {
return new Lease() {
@Override
public void close() throws IOException {
try {
client.delete().guaranteed().forPath(path);
} catch (KeeperException.NoNodeException e) {
log.warn("Lease already released", e);
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
throw new IOException(e);
}
}
@Override
public byte[] getData() throws Exception {
return client.getData().forPath(path);
}
@Override
public String getNodeName() {
return ZKPaths.getNodeFromPath(path);
}
};
}
...
}
文章转载自:东阳马生架构

不在线第一只蜗牛
还未添加个人签名 2023-06-19 加入
还未添加个人简介
评论