写点什么

分布式锁实战:基于 Zookeeper 的实现

作者:小小怪下士
  • 2022-11-03
    湖南
  • 本文字数:5465 字

    阅读完需:约 18 分钟

1. Zookeeper 概述

Zookeeper(后续简称 ZK)是一个分布式的,开放源码的分布式应用程序协调服务,通常以集群模式运转,其协调能力可以理解为是基于观察者设计模式来实现的;ZK 服务会使用 Znode 存储使用者的数据,并将这些数据以树形目录的形式来组织管理,支持使用者以观察者的角色指定自己关注哪些节点\数据的变更,当这些变更发生时,ZK 会通知其观察者;为满足本篇目标所需,着重介绍以下几个关键特性:


  • 数据组织:数据节点以树形目录(类似文件系统)组织管理,每一个节点中都会保存数据信息和节点信息。



  • 集群模式:通常是由 3、5 个基数实例组成集群,当超过半数服务实例正常工作就能对外提供服务,既能避免单点故障,又尽量高可用,每个服务实例都有一个数据备份,以实现数据全局一致



  • 顺序更新:更新请求都会转由 leader 执行,来自同一客户端的更新将按照发送的顺序被写入到 ZK,处理写请求创建 Znode 时,Znode 名称后会被分配一个全局唯一的递增编号,可以通过顺序号推断请求的顺序,利用这个特性可以实现高级协调服务



  • 监听机制:给某个节点注册监听器,该节点一旦发生变更(例如更新或者删除),监听者就会收到一个 Watch Event,可以感知到节点\数据的变更



  • 临时节点:session 链接断开临时节点就没了,不能创建子节点(很关键)


ZK 的分布式锁正是基于以上特性来实现的,简单来说是:


  • 临时节点:用于支撑异常情况下的锁自动释放能力

  • 顺序节点:用于支撑公平锁获取锁和排队等待的能力

  • 监听机制:用于支撑抢锁能力

  • 集群模式:用于支撑锁服务的高可用

2. 加解锁的流程描述


  1. 创建一个永久节点作为锁节点(/lock2)

  2. 试图加锁的客户端在指定锁名称节点(/lock2)下,创建临时顺序子节点

  3. 获取锁节点(/lock2)下所有子节点

  4. 对所获取的子节点按节点自增序号从小到大排序

  5. 判断自己是不是第一个子节点,若是,则获取锁

  6. 若不是,则监听比该节点小的那个节点的删除事件(这种只监听前一个节点的方式避免了惊群效应)

  7. 若是阻塞申请锁,则申请锁的操作可增加阻塞等待

  8. 若监听事件生效(说明前节点释放了,可以尝试去获取锁),则回到第 3 步重新进行判断,直到获取到锁

  9. 解锁时,将第一个子节点删除释放

3. ZK 分布式锁的能力

可能读者是单篇阅读,这里引入上一篇《分布式锁上-初探》中的一些内容,一个分布式锁应具备这样一些功能特点:


  • 互斥性:在同一时刻,只有一个客户端能持有锁

  • 安全性:避免死锁,如果某个客户端获得锁之后处理时间超过最大约定时间,或者持锁期间发生了故障导致无法主动释放锁,其持有的锁也能够被其他机制正确释放,并保证后续其它客户端也能加锁,整个处理流程继续正常执行

  • 可用性:也被称作容错性,分布式锁需要有高可用能力,避免单点故障,当提供锁的服务节点故障(宕机)时不影响服务运行,这里有两种模式:一种是分布式锁服务自身具备集群模式,遇到故障能自动切换恢复工作;另一种是客户端向多个独立的锁服务发起请求,当某个锁服务故障时仍然可以从其他锁服务读取到锁信息(Redlock)

  • 可重入性:对同一个锁,加锁和解锁必须是同一个线程,即不能把其他线程程持有的锁给释放了

  • 高效灵活:加锁、解锁的速度要快;支持阻塞和非阻塞;支持公平锁和非公平锁


基于上文的内容,这里简单总结一下 ZK 的能力矩阵(其它分布式锁的情况会在后续文章中补充):


关于性能不太高的一种说法

因为每次在创建锁和释放锁的过程中,都要动态创建、销毁临时节点来实现锁功能。ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后 Leader 服务器还需要将数据同步到所有的 Follower 机器上,这样频繁的网络通信,性能的短板是非常突出的。在高性能,高并发的场景下,不建议使用 ZooKeeper 的分布式锁。

由于 ZooKeeper 的高可用特性,在并发量不是太高的场景,也推荐使用 ZK 的分布式锁。

4. InterProcessMutex 使用示例

Zookeeper 客户端框架 Curator 提供的 InterProcessMutex 是分布式锁的一种实现,acquire 方法阻塞|非阻塞获取锁,release 方法释放锁,另外还提供了可撤销、可重入功能。
4.1 接口介绍
// 获取互斥锁public void acquire() throws Exception;// 在给定的时间内获取互斥锁public boolean acquire(long time, TimeUnit unit) throws Exception;// 释放锁处理public void release() throws Exception;// 如果当前线程获取了互斥锁,则返回trueboolean isAcquiredInThisProcess();
复制代码
4.2 pom 依赖
<dependency>  <groupId>org.apache.logging.log4j</groupId>  <artifactId>log4j-core</artifactId>  <version>2.8.2</version></dependency><dependency>  <groupId>org.apache.zookeeper</groupId>  <artifactId>zookeeper</artifactId>  <version>3.5.7</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-framework</artifactId>  <version>4.3.0</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-recipes</artifactId>  <version>4.3.0</version></dependency><dependency>  <groupId>org.apache.curator</groupId>  <artifactId>curator-client</artifactId>  <version>4.3.0</version></dependency>
复制代码
4.3 示例
package com.atguigu.case3;
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.ExponentialBackoffRetry;
public class CuratorLockTest {
public static void main(String[] args) {
// 创建分布式锁1 InterProcessMutex lock1 = new InterProcessMutex(getCuratorFramework(), "/locks");
// 创建分布式锁2 InterProcessMutex lock2 = new InterProcessMutex(getCuratorFramework(), "/locks");
new Thread(new Runnable() { @Override public void run() { try { lock1.acquire(); System.out.println("线程1 获取到锁");
lock1.acquire(); System.out.println("线程1 再次获取到锁");
Thread.sleep(5 * 1000);
lock1.release(); System.out.println("线程1 释放锁");
lock1.release(); System.out.println("线程1 再次释放锁");
} catch (Exception e) { e.printStackTrace(); } } }).start();
new Thread(new Runnable() { @Override public void run() { try { lock2.acquire(); System.out.println("线程2 获取到锁");
lock2.acquire(); System.out.println("线程2 再次获取到锁");
Thread.sleep(5 * 1000);
lock2.release(); System.out.println("线程2 释放锁");
lock2.release(); System.out.println("线程2 再次释放锁");
} catch (Exception e) { e.printStackTrace(); } } }).start(); }
private static CuratorFramework getCuratorFramework() {
ExponentialBackoffRetry policy = new ExponentialBackoffRetry(3000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("xxx:2181,xxx:2181,xxx:2181") .connectionTimeoutMs(2000) .sessionTimeoutMs(2000) .retryPolicy(policy).build();
// 启动客户端 client.start();
System.out.println("zookeeper 启动成功"); return client; }}
复制代码

5. DIY 一个阉割版的分布式锁

通过这个实例对照第 2 节内容来理解加解锁的流程,以及如何避免惊群效应。


package com.rock.case2;
import org.apache.zookeeper.*;import org.apache.zookeeper.data.Stat;
import java.io.IOException;import java.util.List;import java.util.concurrent.CountDownLatch;
/** * zk 分布式锁 v1版本: * 完成功能 : * 1. 避免了惊群效应 * 缺失功能: * 1. 超时控制 * 2. 读写锁 * 3. 重入控制 */public class DistributedLock {
private String connectString; private int sessionTimeout; private ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1);
private String waitPath; private String currentNode; private String LOCK_ROOT_PATH;
private static String NODE_PREFIX = "w";
public DistributedLock(String connectString, int sessionTimeout, String lockName) { //TODO:数据校验 this.connectString = connectString; this.sessionTimeout = sessionTimeout; this.LOCK_ROOT_PATH = lockName; }

public void init() throws IOException, KeeperException, InterruptedException { // 建联 zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> { // connectLatch 连接上zk后 释放 if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) { connectLatch.countDown(); } });
connectLatch.await();// 等待zk正常连接后
// 判断锁名称节点是否存在 Stat stat = zk.exists(LOCK_ROOT_PATH, false); if (stat == null) { // 创建一下锁名称节点 try { zk.create(LOCK_ROOT_PATH, LOCK_ROOT_PATH.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } catch (KeeperException e) { //并发创建冲突忽略。 if (!e.code().name().equals("NODEEXISTS")) { throw e; } } } }
/** * 待补充功能: * 1. 超时设置 * 2. 读写区分 * 3. 重入控制 */ public void zklock() throws KeeperException, InterruptedException { if (!tryLock()) { waitLock(); zklock(); } }
/** * */ private void waitLock() throws KeeperException, InterruptedException { try { zk.getData(waitPath, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { // waitLatch 需要释放 if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { waitLatch.countDown(); } } }, new Stat()); // 等待监听 waitLatch.await(); } catch (KeeperException.NoNodeException e) { //如果等待的节点已经被清除了,不等了,再尝试去抢锁 return; }
}
private boolean tryLock() throws KeeperException, InterruptedException {
currentNode = zk.create(LOCK_ROOT_PATH + "/" + NODE_PREFIX, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 判断创建的节点是否是最小的序号节点,如果是获取到锁;如果不是,监听他序号前一个节点 List<String> children = zk.getChildren(LOCK_ROOT_PATH, false); // 如果children 只有一个值,那就直接获取锁; 如果有多个节点,需要判断,谁最小 if (children.size() == 1) { return true; } else { String thisNode = currentNode.substring(LOCK_ROOT_PATH.length() + 1); // 通过w00000000获取该节点在children集合的位置 int index = children.indexOf(thisNode); if (index == 0) { //自己就是第一个节点 return true; } // 需要监听 他前一个节点变化 waitPath = LOCK_ROOT_PATH + "/" + children.get(index - 1); } return false; }

// 解锁 public void unZkLock() { // 删除节点 try { zk.delete(this.currentNode, -1); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } }
}
复制代码


用户头像

还未添加个人签名 2022-09-04 加入

热衷于分享java技术,一起交流学习,探讨技术。 需要Java相关资料的可以+v:xiaoyanya_1

评论

发布
暂无评论
分布式锁实战:基于Zookeeper的实现_Java_小小怪下士_InfoQ写作社区