写点什么

Java 并发编程系列——分布式锁

用户头像
孙苏勇
关注
发布于: 2020 年 05 月 05 日
Java并发编程系列——分布式锁

之前的文章写了并发下的锁控制,但是这仅能针对一个应用内部,如果是在分布式环境下,这样的锁机制是达不到效果的,所以这一篇写写如何实现分布式锁。


分布式锁有几种常见的实现方式,比如借助数据库,使用 redis,还有就是 zookeeper 一类。数据库的方式性能通常要低不少,Redis 有一致性问题,就暂时不研究了,本文介绍如何利用 zookeeper 实现分布式锁,现在一些库已经有完整的实现,可以直接使用,而本文希望能说明其实现的基本原理,通过简易的代码来展示。


先假设一个需求,在文件服务器中有一个顺序文件,现在有多个服务涉及使用该服务读写文件的需求。接下来就对这个问题进行拆解,一步步的来实现一下。


利用 zookeeper 可以有两种实现方式。至于 zookeeper 的安装可以查阅相关文档,在本机实验的话,使用 docker 会很方便。


一种是利用插入重复节点会抛出异常这个特性。实现步骤如下:

1、当要获取锁时,尝试创建同名节点

2_1、如果创建成功则获得锁

2_2、如果创建不成功(即收到异常),则创建对该节点的监听器,监听该节点的删除事件,当该节点删除事件发生时,则重复步骤 1

3、锁使用后删除节点


另一种是利用插入同名有序节点时会生成有序号的子节点。实现步骤如下:

1、创建根节点

2、当切获取锁时,创建当前线程下的有序节点,获得节点路径

3_1、如果当前节点路径与根节点下的第一个有序节点同名,则获得锁

3_2、如果当前节点路径与根节点下的第一个有序节点不同名,则判断当前该节点前一个节点是否存存,存在则监听其删除事件,删除事件发生时重复 3_1,如果前一节点不存在,重复 3_1

4、锁使用后删除当前线程节点


上述两段可用流程表示如下


可以看出第一种实现要简单的多,但是监听都发生在同一个节点上,又是使用异常,性能必定不会太好,本机测试后者性能起码是前者的两倍。


现在开始来设计这个实现,因为要实现两个不同的锁,所以在结构上要设计得相对灵活些,容易扩展多种实现,而对外提供服务上,和一般的锁差不多,只需要对外提供 lock()和 unlock()方法即可。我们采用模板模式来设计一下锁的框架。如下图:


接口实现:

public interface ZkLock {    public void lock();    public void unLock();}
复制代码


模板类实现:

public abstract class ZkAbstrackLock implements ZkLock {    private static final String ZK_HOST = "127.0.0.1:2181";    protected ZkClient zkClient = new ZkClient(ZK_HOST);
@Override public void lock() { if (tryLock()) { System.out.println("got lock"); } else { await(); lock(); } }
protected abstract boolean tryLock(); protected abstract void await();}
复制代码


模板类中定义了 lock 的骨架,不管哪种锁实现,lock 的形式统一,这也就是采用模板的意义所在。


接下来是异常方式的实现:

public class ZkLockWithException extends ZkAbstrackLock {
private static final String LOCK_PATH = "/lockWithException";
private CountDownLatch latch = null;
@Override protected boolean tryLock() { try { zkClient.createEphemeral(LOCK_PATH); return true; } catch (Exception e) { System.out.println("exception thrown and wait"); return false; } }
@Override protected void await() { IZkDataListener dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception {
}
@Override public void handleDataDeleted(String s) throws Exception { if (latch != null) { System.out.printf("%s removed\n", s); latch.countDown(); } } }; zkClient.subscribeDataChanges(LOCK_PATH, dataListener); if (zkClient.exists(LOCK_PATH)) { latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(LOCK_PATH, dataListener); }
@Override public void unLock() { zkClient.delete(LOCK_PATH); zkClient.close(); System.out.println("lock released"); }}
复制代码


这里注意监听器中有两个事件,因为我们最终都会采用将节点删除的方式,所以这里使用删除事件来进行通知,至于 CountDownLatch 的作用之前的文章有介绍,作用就是当计数为零时自动唤起等待的线程。


接下来是使用有序节点的实现方式:

public class ZkLockWithSort extends ZkAbstrackLock {    private static final String LOCK_PATH = "/lockWithSort";    private String currentPath;    private String prePath;    private CountDownLatch latch;
public ZkLockWithSort() { try { zkClient.createPersistent(LOCK_PATH); } catch (ZkNodeExistsException e) { System.out.println(LOCK_PATH + " is already existed"); } }
@Override protected boolean tryLock() { if (currentPath == null || currentPath.length() <= 0) { currentPath = zkClient.createEphemeralSequential(LOCK_PATH + "/", null); } List<String> children = zkClient.getChildren(LOCK_PATH); Collections.sort(children); if (currentPath.equals(LOCK_PATH + "/" + children.get(0))) { System.out.printf("path:%s got lock\n", currentPath); return true; } else { int sequence = Collections.binarySearch(children, currentPath.substring(LOCK_PATH.length() + 1)); prePath = LOCK_PATH + "/" + children.get(sequence - 1); } System.out.printf("path:%s wait pre:%s\n", currentPath, prePath); return false; }
@Override protected void await() { IZkDataListener dataListener = new IZkDataListener() { @Override public void handleDataChange(String s, Object o) throws Exception {
}
@Override public void handleDataDeleted(String s) throws Exception { if (latch != null) { System.out.printf("path:%s deleted\n", s); latch.countDown(); } } }; zkClient.subscribeDataChanges(prePath, dataListener); if (zkClient.exists(prePath)) { latch = new CountDownLatch(1); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } zkClient.unsubscribeDataChanges(prePath, dataListener); }
@Override public void unLock() { zkClient.delete(currentPath); zkClient.close(); System.out.printf("path:%s lock released\n", currentPath); }}
复制代码


回过头来我们实现之前提出的需求,读写文件,这里为体现出锁的作用,在文件中每行记录的当前的行号,最后我们启多个应用来同时执行,最后来看文件的内容是否是正确的。


文件顺序写入实现:

public class Sequence {    private static final String FILE_PATH = "/xxx/sequence.txt";    private static int sequence = 0;    private ZkLock lock;
public Sequence(ZkLock lock) { this.lock = lock; }
public void writeSequenceToFile() { try { lock.lock();
try (LineNumberReader lnr = new LineNumberReader(new FileReader(FILE_PATH)); FileWriter fw = new FileWriter(FILE_PATH, true)) { int lineNumber = 0; while (lnr.readLine() != null) { lineNumber++; } fw.write(lineNumber + 1 + "\r\n"); fw.flush(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } finally { lock.unLock(); } }}
复制代码


最后来测试一下:

public class Main1 {    public static void main(String[] args) throws InterruptedException {        int count = 100;        CountDownLatch latch = new CountDownLatch(count);        long start = System.currentTimeMillis();        for (int i = 0; i < count; i++) {            new Thread(() -> {                //指定不同的锁进行测试                new Sequence(new ZkLockWithException()).writeSequenceToFile();                latch.countDown();            }).start();        }        latch.await();        System.out.printf("time elapsed:%d", System.currentTimeMillis() - start);    }}
复制代码


为了模拟分布式访问,可以同时多启动几个测试程序。最后来看下文件写入的结果:

1234567891011121314151617181920……
复制代码


到这里,在 zookeeper 下实现分布式锁的两种方式就完成了。

本系列其他文章:

Java并发编程系列——分布式锁

Java并发编程系列——锁顺序

Java并发编程系列——锁

Java并发编程系列——常用并发工具类

Java并发编程系列——Fork-Join

Java并发编程系列——线程的等待与唤醒

Java并发编程系列插曲——对象的内存结构

Java并发编程系列——线程

发布于: 2020 年 05 月 05 日阅读数: 289
用户头像

孙苏勇

关注

不读书,思想就会停止。 2018.04.05 加入

公众号“像什么",记录想记录的。

评论

发布
暂无评论
Java并发编程系列——分布式锁