之前的文章写了并发下的锁控制,但是这仅能针对一个应用内部,如果是在分布式环境下,这样的锁机制是达不到效果的,所以这一篇写写如何实现分布式锁。
分布式锁有几种常见的实现方式,比如借助数据库,使用 redis,还有就是 zookeeper 一类。数据库的方式性能通常要低不少,Redis 有一致性问题,就暂时不研究了,本文介绍如何利用 zookeeper 实现分布式锁,现在一些库已经有完整的实现,可以直接使用,而本文希望能说明其实现的基本原理,通过简易的代码来展示。
先假设一个需求,在文件服务器中有一个顺序文件,现在有多个服务涉及使用该服务读写文件的需求。接下来就对这个问题进行拆解,一步步的来实现一下。
利用 zookeeper 可以有两种实现方式。至于 zookeeper 的安装可以查阅相关文档,在本机实验的话,使用 docker 会很方便。
一种是利用插入重复节点会抛出异常这个特性。实现步骤如下:
1、当要获取锁时,尝试创建同名节点
2_1、如果创建成功则获得锁
2_2、如果创建不成功(即收到异常),则创建对该节点的监听器,监听该节点的删除事件,当该节点删除事件发生时,则重复步骤 1
3、锁使用后删除节点
另一种是利用插入同名有序节点时会生成有序号的子节点。实现步骤如下:
1、创建根节点
2、当切获取锁时,创建当前线程下的有序节点,获得节点路径
3_1、如果当前节点路径与根节点下的第一个有序节点同名,则获得锁
3_2、如果当前节点路径与根节点下的第一个有序节点不同名,则判断当前该节点前一个节点是否存存,存在则监听其删除事件,删除事件发生时重复 3_1,如果前一节点不存在,重复 3_1
4、锁使用后删除当前线程节点
上述两段可用流程表示如下
可以看出第一种实现要简单的多,但是监听都发生在同一个节点上,又是使用异常,性能必定不会太好,本机测试后者性能起码是前者的两倍。
现在开始来设计这个实现,因为要实现两个不同的锁,所以在结构上要设计得相对灵活些,容易扩展多种实现,而对外提供服务上,和一般的锁差不多,只需要对外提供 lock()和 unlock()方法即可。我们采用模板模式来设计一下锁的框架。如下图:
接口实现:
public interface ZkLock {
public void lock();
public void unLock();
}
复制代码
模板类实现:
public abstract class ZkAbstrackLock implements ZkLock {
private static final String ZK_HOST = "127.0.0.1:2181";
protected ZkClient zkClient = new ZkClient(ZK_HOST);
@Override
public void lock() {
if (tryLock()) {
System.out.println("got lock");
} else {
await();
lock();
}
}
protected abstract boolean tryLock();
protected abstract void await();
}
复制代码
模板类中定义了 lock 的骨架,不管哪种锁实现,lock 的形式统一,这也就是采用模板的意义所在。
接下来是异常方式的实现:
public class ZkLockWithException extends ZkAbstrackLock {
private static final String LOCK_PATH = "/lockWithException";
private CountDownLatch latch = null;
@Override
protected boolean tryLock() {
try {
zkClient.createEphemeral(LOCK_PATH);
return true;
} catch (Exception e) {
System.out.println("exception thrown and wait");
return false;
}
}
@Override
protected void await() {
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (latch != null) {
System.out.printf("%s removed\n", s);
latch.countDown();
}
}
};
zkClient.subscribeDataChanges(LOCK_PATH, dataListener);
if (zkClient.exists(LOCK_PATH)) {
latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(LOCK_PATH, dataListener);
}
@Override
public void unLock() {
zkClient.delete(LOCK_PATH);
zkClient.close();
System.out.println("lock released");
}
}
复制代码
这里注意监听器中有两个事件,因为我们最终都会采用将节点删除的方式,所以这里使用删除事件来进行通知,至于 CountDownLatch 的作用之前的文章有介绍,作用就是当计数为零时自动唤起等待的线程。
接下来是使用有序节点的实现方式:
public class ZkLockWithSort extends ZkAbstrackLock {
private static final String LOCK_PATH = "/lockWithSort";
private String currentPath;
private String prePath;
private CountDownLatch latch;
public ZkLockWithSort() {
try {
zkClient.createPersistent(LOCK_PATH);
} catch (ZkNodeExistsException e) {
System.out.println(LOCK_PATH + " is already existed");
}
}
@Override
protected boolean tryLock() {
if (currentPath == null || currentPath.length() <= 0) {
currentPath = zkClient.createEphemeralSequential(LOCK_PATH + "/", null);
}
List<String> children = zkClient.getChildren(LOCK_PATH);
Collections.sort(children);
if (currentPath.equals(LOCK_PATH + "/" + children.get(0))) {
System.out.printf("path:%s got lock\n", currentPath);
return true;
} else {
int sequence = Collections.binarySearch(children, currentPath.substring(LOCK_PATH.length() + 1));
prePath = LOCK_PATH + "/" + children.get(sequence - 1);
}
System.out.printf("path:%s wait pre:%s\n", currentPath, prePath);
return false;
}
@Override
protected void await() {
IZkDataListener dataListener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {
}
@Override
public void handleDataDeleted(String s) throws Exception {
if (latch != null) {
System.out.printf("path:%s deleted\n", s);
latch.countDown();
}
}
};
zkClient.subscribeDataChanges(prePath, dataListener);
if (zkClient.exists(prePath)) {
latch = new CountDownLatch(1);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient.unsubscribeDataChanges(prePath, dataListener);
}
@Override
public void unLock() {
zkClient.delete(currentPath);
zkClient.close();
System.out.printf("path:%s lock released\n", currentPath);
}
}
复制代码
回过头来我们实现之前提出的需求,读写文件,这里为体现出锁的作用,在文件中每行记录的当前的行号,最后我们启多个应用来同时执行,最后来看文件的内容是否是正确的。
文件顺序写入实现:
public class Sequence {
private static final String FILE_PATH = "/xxx/sequence.txt";
private static int sequence = 0;
private ZkLock lock;
public Sequence(ZkLock lock) {
this.lock = lock;
}
public void writeSequenceToFile() {
try {
lock.lock();
try (LineNumberReader lnr = new LineNumberReader(new FileReader(FILE_PATH));
FileWriter fw = new FileWriter(FILE_PATH, true)) {
int lineNumber = 0;
while (lnr.readLine() != null) {
lineNumber++;
}
fw.write(lineNumber + 1 + "\r\n");
fw.flush();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
} finally {
lock.unLock();
}
}
}
复制代码
最后来测试一下:
public class Main1 {
public static void main(String[] args) throws InterruptedException {
int count = 100;
CountDownLatch latch = new CountDownLatch(count);
long start = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
new Thread(() -> {
//指定不同的锁进行测试
new Sequence(new ZkLockWithException()).writeSequenceToFile();
latch.countDown();
}).start();
}
latch.await();
System.out.printf("time elapsed:%d", System.currentTimeMillis() - start);
}
}
复制代码
为了模拟分布式访问,可以同时多启动几个测试程序。最后来看下文件写入的结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
……
复制代码
到这里,在 zookeeper 下实现分布式锁的两种方式就完成了。
本系列其他文章:
Java并发编程系列——分布式锁
Java并发编程系列——锁顺序
Java并发编程系列——锁
Java并发编程系列——常用并发工具类
Java并发编程系列——Fork-Join
Java并发编程系列——线程的等待与唤醒
Java并发编程系列插曲——对象的内存结构
Java并发编程系列——线程
评论