之前的文章写了并发下的锁控制,但是这仅能针对一个应用内部,如果是在分布式环境下,这样的锁机制是达不到效果的,所以这一篇写写如何实现分布式锁。
分布式锁有几种常见的实现方式,比如借助数据库,使用 redis,还有就是 zookeeper 一类。数据库的方式性能通常要低不少,Redis 有一致性问题,就暂时不研究了,本文介绍如何利用 zookeeper 实现分布式锁,现在一些库已经有完整的实现,可以直接使用,而本文希望能说明其实现的基本原理,通过简易的代码来展示。
先假设一个需求,在文件服务器中有一个顺序文件,现在有多个服务涉及使用该服务读写文件的需求。接下来就对这个问题进行拆解,一步步的来实现一下。
利用 zookeeper 可以有两种实现方式。至于 zookeeper 的安装可以查阅相关文档,在本机实验的话,使用 docker 会很方便。
一种是利用插入重复节点会抛出异常这个特性。实现步骤如下:
1、当要获取锁时,尝试创建同名节点
2_1、如果创建成功则获得锁
2_2、如果创建不成功(即收到异常),则创建对该节点的监听器,监听该节点的删除事件,当该节点删除事件发生时,则重复步骤 1
3、锁使用后删除节点
另一种是利用插入同名有序节点时会生成有序号的子节点。实现步骤如下:
1、创建根节点
2、当切获取锁时,创建当前线程下的有序节点,获得节点路径
3_1、如果当前节点路径与根节点下的第一个有序节点同名,则获得锁
3_2、如果当前节点路径与根节点下的第一个有序节点不同名,则判断当前该节点前一个节点是否存存,存在则监听其删除事件,删除事件发生时重复 3_1,如果前一节点不存在,重复 3_1
4、锁使用后删除当前线程节点
上述两段可用流程表示如下
可以看出第一种实现要简单的多,但是监听都发生在同一个节点上,又是使用异常,性能必定不会太好,本机测试后者性能起码是前者的两倍。
现在开始来设计这个实现,因为要实现两个不同的锁,所以在结构上要设计得相对灵活些,容易扩展多种实现,而对外提供服务上,和一般的锁差不多,只需要对外提供 lock()和 unlock()方法即可。我们采用模板模式来设计一下锁的框架。如下图:
接口实现:
public interface ZkLock { public void lock(); public void unLock();}
复制代码
模板类实现:
public abstract class ZkAbstrackLock implements ZkLock { private static final String ZK_HOST = "127.0.0.1:2181"; protected ZkClient zkClient = new ZkClient(ZK_HOST);
@Override public void lock() { if (tryLock()) { System.out.println("got lock"); } else { await(); lock(); } }
protected abstract boolean tryLock(); protected abstract void await();}
复制代码
模板类中定义了 lock 的骨架,不管哪种锁实现,lock 的形式统一,这也就是采用模板的意义所在。
接下来是异常方式的实现:
public class ZkLockWithException extends ZkAbstrackLock {
private static final String LOCK_PATH = "/lockWithException";
private CountDownLatch latch = null;
@Override protected boolean tryLock() { try { zkClient.createEphemeral(LOCK_PATH); return true; } catch (Exception e) { System.out.println("exception thrown and wait"); return false; } }
@Override protected void await() { IZkDataListener dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception {
}
@Override public void handleDataDeleted(String s) throws Exception { if (latch != null) { System.out.printf("%s removed\n", s); latch.countDown(); } } }; zkClient.subscribeDataChanges(LOCK_PATH, dataListener); if (zkClient.exists(LOCK_PATH)) { latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(LOCK_PATH, dataListener); }
@Override public void unLock() { zkClient.delete(LOCK_PATH); zkClient.close(); System.out.println("lock released"); }}
复制代码
这里注意监听器中有两个事件,因为我们最终都会采用将节点删除的方式,所以这里使用删除事件来进行通知,至于 CountDownLatch 的作用之前的文章有介绍,作用就是当计数为零时自动唤起等待的线程。
接下来是使用有序节点的实现方式:
public class ZkLockWithSort extends ZkAbstrackLock { private static final String LOCK_PATH = "/lockWithSort"; private String currentPath; private String prePath; private CountDownLatch latch;
public ZkLockWithSort() { try { zkClient.createPersistent(LOCK_PATH); } catch (ZkNodeExistsException e) { System.out.println(LOCK_PATH + " is already existed"); } }
@Override protected boolean tryLock() { if (currentPath == null || currentPath.length() <= 0) { currentPath = zkClient.createEphemeralSequential(LOCK_PATH + "/", null); } List<String> children = zkClient.getChildren(LOCK_PATH); Collections.sort(children); if (currentPath.equals(LOCK_PATH + "/" + children.get(0))) { System.out.printf("path:%s got lock\n", currentPath); return true; } else { int sequence = Collections.binarySearch(children, currentPath.substring(LOCK_PATH.length() + 1)); prePath = LOCK_PATH + "/" + children.get(sequence - 1); } System.out.printf("path:%s wait pre:%s\n", currentPath, prePath); return false; }
@Override protected void await() { IZkDataListener dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception {
}
@Override public void handleDataDeleted(String s) throws Exception { if (latch != null) { System.out.printf("path:%s deleted\n", s); latch.countDown(); } } }; zkClient.subscribeDataChanges(prePath, dataListener); if (zkClient.exists(prePath)) { latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(prePath, dataListener); }
@Override public void unLock() { zkClient.delete(currentPath); zkClient.close(); System.out.printf("path:%s lock released\n", currentPath); }}
复制代码
回过头来我们实现之前提出的需求,读写文件,这里为体现出锁的作用,在文件中每行记录的当前的行号,最后我们启多个应用来同时执行,最后来看文件的内容是否是正确的。
文件顺序写入实现:
public class Sequence { private static final String FILE_PATH = "/xxx/sequence.txt"; private static int sequence = 0; private ZkLock lock;
public Sequence(ZkLock lock) { this.lock = lock; }
public void writeSequenceToFile() { try { lock.lock();
try (LineNumberReader lnr = new LineNumberReader(new FileReader(FILE_PATH)); FileWriter fw = new FileWriter(FILE_PATH, true)) { int lineNumber = 0; while (lnr.readLine() != null) { lineNumber++; } fw.write(lineNumber + 1 + "\r\n"); fw.flush(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } finally { lock.unLock(); } }}
复制代码
最后来测试一下:
public class Main1 { public static void main(String[] args) throws InterruptedException { int count = 100; CountDownLatch latch = new CountDownLatch(count); long start = System.currentTimeMillis(); for (int i = 0; i < count; i++) { new Thread(() -> { //指定不同的锁进行测试 new Sequence(new ZkLockWithException()).writeSequenceToFile(); latch.countDown(); }).start(); } latch.await(); System.out.printf("time elapsed:%d", System.currentTimeMillis() - start); }}
复制代码
为了模拟分布式访问,可以同时多启动几个测试程序。最后来看下文件写入的结果:
1234567891011121314151617181920……
复制代码
到这里,在 zookeeper 下实现分布式锁的两种方式就完成了。
本系列其他文章:
Java并发编程系列——分布式锁
Java并发编程系列——锁顺序
Java并发编程系列——锁
Java并发编程系列——常用并发工具类
Java并发编程系列——Fork-Join
Java并发编程系列——线程的等待与唤醒
Java并发编程系列插曲——对象的内存结构
Java并发编程系列——线程
评论