写点什么

zk 基础—zk 实现分布式功能

  • 2025-04-08
    福建
  • 本文字数:15060 字

    阅读完需:约 49 分钟

1.zk 实现数据发布订阅


(1)发布订阅系统一般有推模式和拉模式


推模式:服务端主动将更新的数据发送给所有订阅的客户端。

 

拉模式:客户端主动发起请求来获取最新数据(定时轮询拉取)。

 

(2)zk 采用了推拉相结合来实现发布订阅


首先客户端需要向服务端注册自己关注的节点(添加 Watcher 事件)。一旦该节点发生变更,服务端就会向客户端发送 Watcher 事件通知。客户端接收到消息通知后,需要主动到服务端获取最新的数据。所以,zk 的 Watcher 机制有一个缺点就是:客户端不能定制服务端回调,需要客户端收到 Watcher 通知后再次向服务端发起请求获取数据,多进行一次网络交互。

 

如果将配置信息放到 zk 上进行集中管理,那么:

一.应用启动时需主动到 zk 服务端获取配置信息,然后在指定节点上注册一个 Watcher 监听。

二.只要配置信息发生变更,zk 服务端就会实时通知所有订阅的应用,让应用能实时获取到订阅的配置信息节点已发生变更的消息。

 

注意:原生 zk 客户端可以通过 getData()、exists()、getChildren()三个方法,向 zk 服务端注册 Watcher 监听,而且注册的 Watcher 监听具有一次性,所以 zk 客户端获得服务端的节点变更通知后需要再次注册 Watcher。

 

(3)使用 zk 来实现数据发布订阅总结


步骤一:将配置信息存储到 zk 的节点上。

步骤二:应用启动时首先从 zk 节点上获取配置信息,然后再向该 zk 节点注册一个数据变更的 Watcher 监听。一旦该 zk 节点数据发生变更,所有订阅的客户端就能收到数据变更通知。

步骤三:应用收到 zk 服务端发过来的数据变更通知后重新获取最新数据。

 

(4)zk 原生实现分布式配置(也就是实现注册发现或者数据发布订阅)


配置可以使用数据库、Redis、或者任何一种可以共享的存储位置。使用 zk 的目的,主要就是利用它的回调机制。任何 zk 的使用方不需要去轮询 zk,Redis 或者数据库可能就需要主动轮询去看看数据是否发生改变。使用 zk 最大的优势是只要对数据添加 Watcher,数据发生修改时 zk 就会回调指定的方法。注意:new 一个 zk 实例和向 zk 获取数据都是异步的。

 

如下的做法其实是一种 Reactor 响应式编程:使用 CoundownLatch 阻塞及通过调用一次数据来触发回调更新本地的 conf。我们并没有每个场景都线性写一个方法堆砌起来,而是用相应的回调和 Watcher 事件来粘连起来。其实就是把所有事件发生前后要做的事情粘连起来,等着回调来触发。

 

一.先定义一个工具类可以获取 zk 实例


public class ZKUtils {    private static ZooKeeper zk;    private static String address = "192.168.150.11:2181,192.168.150.12:2181,192.168.150.13:2181,192.168.150.14:2181/test";    private static DefaultWatcher defaultWatcher = new DefaultWatcher();    private static CountDownLatch countDownLatch = new CountDownLatch(1);        public static ZooKeeper getZK() {        try {            zk = new ZooKeeper(address, 1000, defaultWatcher);            defaultWatcher.setCountDownLatch(countDownLatch);            //阻塞直到建立好连接拿到可用的zk            countDownLatch.await();        } catch (Exception e) {            e.printStackTrace();        }        return zk;    }}
复制代码


二.定义和 zk 建立连接时的 Watcher


public class DefaultWatcher implements Watcher {    CountDownLatch countDownLatch ;        public void setCountDownLatch(CountDownLatch countDownLatch) {        this.countDownLatch = countDownLatch;    }
@Override public void process(WatchedEvent event) { System.out.println(event.toString()); switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: countDownLatch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } }}
复制代码


三.定义分布式配置的核心类 WatcherCallBack


这个 WatcherCallBack 类不仅实现了 Watcher,还实现了两个异步回调。

 

首先通过 zk.exists()方法判断配置的 znode 是否存在并添加监听(自己) + 回调(自己),然后通过 countDownLatch.await()方法进行阻塞。

 

在回调中如果发现存在配置的 znode,则设置配置并执行 countDown()方法不再进行阻塞。

 

在监听中如果发现数据变化,则会调用 zk.getData()方法获取配置的数据,并且获取配置的数据时也会继续监听(自己) + 回调(自己)。


public class WatcherCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {    ZooKeeper zk;    MyConf conf;    CountDownLatch countDownLatch = new CountDownLatch(1);
public MyConf getConf() { return conf; } public void setConf(MyConf conf) { this.conf = conf; } public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } //判断配置是否存在并监听配置的znode public void aWait(){ zk.exists("/AppConf", this, this ,"ABC"); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //回调自己,这是执行完zk.exists()方法或者zk.getData()方法的回调 //在回调中如果发现存在配置的znode,则设置配置并执行countDown()方法不再进行阻塞。 @Override public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (data != null) { String s = new String(data); conf.setConf(s); countDownLatch.countDown(); } } //监听自己,这是执行zk.exists()方法或者zk.getData()方法时添加的Watcher监听 //在监听中如果发现数据变化,则会继续调用zk.getData()方法获取配置的数据,并且获取配置的数据时也会继续监听(自己) + 回调(自己) @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (stat != null) {//stat不为空, 代表节点已经存在 zk.getData("/AppConf", this, this, "sdfs"); } } @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: //调用一次数据, 这会触发回调更新本地的conf zk.getData("/AppConf", this, this, "sdfs"); break; case NodeDeleted: //容忍性, 节点被删除, 把本地conf清空, 并且恢复阻塞 conf.setConf(""); countDownLatch = new CountDownLatch(1); break; case NodeDataChanged: //数据发生变更, 需要重新获取调用一次数据, 这会触发回调更新本地的conf zk.getData("/AppConf", this, this, "sdfs"); break; case NodeChildrenChanged: break; } }}
复制代码


分布式配置的核心配置类:


//MyConf是配置中心的配置public class MyConf {    private  String conf ;    public String getConf() {        return conf;    }        public void setConf(String conf) {        this.conf = conf;    }}
复制代码


四.通过 WatcherCallBack 的方法判断配置是否存在并尝试获取数据


public class TestConfig {    ZooKeeper zk;        @Before    public void conn () {        zk = ZKUtils.getZK();    }        @After    public void close () {        try {            zk.close();        } catch (InterruptedException e) {            e.printStackTrace();        }    }        @Test    public void getConf() {        WatchCallBack watchCallBack = new WatchCallBack();        //传入zk和配置conf        watchCallBack.setZk(zk);        MyConf myConf = new MyConf();        watchCallBack.setConf(myConf);
//节点不存在和节点存在, 都尝试去取数据, 取到了才往下走 watchCallBack.aWait(); while(true) { if (myConf.getConf().equals("")) { System.out.println("conf diu le ......"); watchCallBack.aWait(); } else { System.out.println(myConf.getConf()); } try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }}
复制代码


2.zk 实现负载均衡


(1)负载均衡算法


常用的负载均衡算法有:轮询法、随机法、原地址哈希法、加权轮询法、加权随机法、最小连接数法。

 

一.轮询法


轮询法是最为简单的负载均衡算法。当接收到客户端请求后,负载均衡服务器会按顺序逐个分配给后端服务。比如集群中有 3 台服务器,分别是 server1、server2、server3,轮询法会按照 sever1、server2、server3 顺序依次分发请求给每个服务器。当第一次轮询结束后,会重新开始下一轮的循环。

 

二.随机法


随机法是指负载均衡服务器在接收到来自客户端请求后,根据随机算法选中后台集群中的一台服务器来处理这次请求。由于当集群中的机器变得越来越多时,每台机器被抽中的概率基本相等,因此随机法的实际效果越来越趋近轮询法。

 

三.原地址哈希法


原地址哈希法是根据客户端的 IP 地址进行哈希计算,对计算结果进行取模,然后根据最终结果选择服务器地址列表中的一台机器来处理请求。这种算法每次都会分配同一台服务器来处理同一 IP 的客户端请求。

 

四.加权轮询法


由于一个分布式系统中的机器可能部署在不同的网络环境中,每台机器的配置性能各不相同,因此其处理和响应请求的能力也各不相同。

 

如果采用上面几种负载均衡算法,都不太合适。这会造成能力强的服务器在处理完业务后过早进入空闲状态,而性能差或网络环境不好的服务器一直忙于处理请求造成任务积压。

 

为了解决这个问题,可以采用加权轮询法。加权轮询法的方式与轮询法的方式很相似,唯一的不同在于选择机器时,不只是单纯按照顺序的方式去选择,还要根据机器的配置和性能高低有所侧重,让配置性能好的机器优先分配。

 

五.加权随机法


加权随机法和上面提到的随机法一样,在采用随机法选举服务器时,会考虑系统性能作为权值条件。

 

六.最小连接数法


最小连接数法是指:根据后台处理客户端的请求数,计算应该把新请求分配给哪一台服务器。一般认为请求数最少的机器,会作为最优先分配的对象。

 

(2)使用 zk 来实现负载均衡


实现负载均衡服务器的关键是:探测和发现业务服务器的运行状态 + 分配请求给最合适的业务服务器。

 

一.状态收集之实现 zk 的业务服务器列表


首先利用 zk 的临时子节点来标记业务服务器的状态。在业务服务器上线时:通过向 zk 服务器创建临时子节点来实现服务注册,表示业务服务器已上线。在业务服务器下线时:通过删除临时节点或者与 zk 服务器断开连接来进行服务剔除。最后通过统计临时节点的数量,来了解业务服务器的运行情况。

 

在代码层面的实现中,首先定义一个 BlanceSever 接口类。该类用于业务服务器启动或关闭后:第一.向 zk 服务器地址列表注册或注销服务,第二.根据接收到的请求动态更新负载均衡情况。


public class BlanceSever {    //向zk服务器地址列表进行注册服务    public void register()    //向zk服务器地址列表进行注销服务    public void unregister()    //根据接收到的请求动态更新负载均衡情况    public void addBlanceCount()    public void takeBlanceCount() }
复制代码


之后创建 BlanceSever 接口的实现类 BlanceSeverImpl,在 BlanceSeverImpl 类中首先定义:业务服务器运行的 Session 超时时间、会话连接超时时间、zk 客户端地址、服务器地址列表节点 SERVER_PATH 等基本参数。并通过构造函数,在类被引用时进行初始化 zk 客户端对象实例。


public class BlanceSeverImpl implements BlanceSever {    private static final Integer SESSION_TIME_OUT;    private static final Integer CONNECTION_TIME_OUT;    private final ZkClient zkclient;    private static final SERVER_PATH = "/Severs";        public BlanceSeverImpl() {        init...    } }
复制代码


接下来定义,业务服务器启动时,向 zk 注册服务的 register 方法。

 

在如下代码中,会通过在 SERVER_PATH 路径下创建临时子节点的方式来注册服务。首先获取业务服务器的 IP 地址,然后利用 IP 地址作为临时节点的 path 来创建临时节点。


public register() throws Exception {    //首先获取业务服务器的IP地址    InetAddress address = InetAddress.getLocalHost();    String serverIp = address.getHostAddress();    //然后利用IP地址作为临时节点的path来创建临时节点    zkclient.createEphemeral(SERVER_PATH + serverIp);}
复制代码


接下来定义,业务服务器关机或不对外提供服务时的 unregister()方法。通过调用 unregister()方法,注销该台业务服务器在 zk 服务器列表中的信息。注销后的机器不会被负载均衡服务器分发处理会话。在如下代码中,会通过删除 SERVER_PATH 路径下临时节点的方式来注销业务服务器。


public unregister() throws Exception {    zkclient.delete(SERVER_PATH + serverIp);}
复制代码


二.请求分配之如何选择业务服务器


以最小连接数法为例,来确定如何均衡地分配请求给业务服务器,整个实现步骤如下:

 

步骤一:首先负载均衡服务器在接收到客户端的请求后,通过 getData()方法获取已成功注册的业务服务器列表,也就是"/Servers"节点下的各个临时节点,这些临时节点都存储了当前服务器的连接数。

 

步骤二:然后选取连接数最少的业务服务器作为处理当前请求的业务服务器,并通过 setData()方法将该业务服务器对应的节点值(连接数)加 1。

 

步骤三:当该业务服务器处理完请求后,调用 setData()方法将该节点值(连接数)减 1。

 

下面定义,当业务服务器接收到请求后,增加连接数的 addBlance()方法。在如下代码中,首先通过 readData()方法获取服务器最新的连接数,然后将该连接数加 1。接着通过 writeData()方法将最新的连接数写入到该业务服务器对应的临时节点。


public void addBlance() throws Exception {    InetAddress address = InetAddress.getLocalHost();    String serverIp = address.getHostAddress();    Integer con_count = zkClient.readData(SERVER_PATH + serverIp);    ++con_count;    zkClient.writeData(SERVER_PATH + serverIp, con_count);}
复制代码


3.zk 实现分布式命名服务

 

命名服务是分布式系统最基本的公共服务之一。在分布式系统中,被命名的实体可以是集群中的机器、提供的服务地址等。例如,Java 中的 JNDI 便是一种典型的命名服务。

 

(1)ID 编码的特性


分布式 ID 生成器就是通过分布式的方式,自动生成 ID 编码的程序或服务。生成的 ID 编码一般具有唯一性、递增性、安全性、扩展性这几个特性。

 

(2)通过 UUID 方式生成分布式 ID


UUID 能非常简便地保证分布式环境中 ID 的唯一性。它由 32 位字符和 4 个短线字符组成,比如 e70f1357-f260-46ff-a32d-53a086c57ade。

 

由于 UUID 在本地应用中生成,所以生成速度比较快,不依赖其他服务和网络。但缺点是:长度过长、含义不明、不满足递增性。

 

(3)通过 TDDL 生成分布式 ID


MySQL 的自增主键是一种有序的 ID 生成方式,还有一种性能更好的数据库序列生成方式:TDDL 中的 ID 生成方式。TDDL 是 Taobao Distributed Data Layer 的缩写,是一种数据库中间件,主要应用于数据库分库分表的应用场景中。

 

TDDL 生成 ID 编码的大致过程如下:首先数据库中有一张 Sequence 序列化表,记录当前已被占用的 ID 最大值。然后每个需要 ID 编码的客户端在请求 TDDL 的 ID 编码生成器后,TDDL 都会返回给该客户端一段 ID 编码,并更新 Sequence 表中的信息。

 

客户端接收到一段 ID 编码后,会将该段编码存储在内存中。在本机需要使用 ID 编码时,会首先使用内存中的 ID 编码。如果内存中的 ID 编码已经完全被占用,则再重新向编码服务器获取。

 

TDDL 通过分批获取 ID 编码的方式,减少了客户端访问服务器的频率,避免了网络波动所造成的影响,并减轻了服务器的内存压力。不过 TDDL 高度依赖数据库,不能作为独立的分布式 ID 生成器对外提供服务。

 

(4)通过 zk 生成分布式 ID


每个需要 ID 编码的业务服务器可以看作是 zk 的客户端,ID 编码生成器可以看作是 zk 的服务端,可以利用 zk 数据模型中的顺序节点作为 ID 编码。

 

客户端通过 create()方法来创建一个顺序子节点。服务端成功创建节点后会响应客户端请求,把创建好的节点发送给客户端。客户端以顺序节点名称为基础进行 ID 编码,生成 ID 后就可以进行业务操作。

 

(5)SnowFlake 算法


SnowFlake 算法是 Twitter 开源的一种用来生成分布式 ID 的算法,通过 SnowFlake 算法生成的编码会是一个 64 位的二进制数。

 

第一个 bit 不用,接下来的 41 个 bit 用来存储毫秒时间戳,再接下来的 10 个 bit 用来存储机器 ID,剩余的 12 个 bit 用来存储流水号和 0。



SnowFlake 算法主要的实现手段就是对二进制数位的操作,SnowFlake 算法理论上每秒可以生成 400 多万个 ID 编码,SnowFlake 是业界普遍采用的分布式 ID 生成算法。

 

4.zk 实现分布式协调(Master-Worker 协同)


(1)Master-Worker 架构


Master-Work 是一个广泛使用的分布式架构,系统中有一个 Master 负责监控 Worker 的状态,并为 Worker 分配任务。

 

说明一:在任何时刻,系统中最多只能有一个 Master。不可以出现两个 Master,多个 Master 共存会导致脑裂。

 

说明二:系统中除了 Active 状态的 Master 还有一个 Bakcup 的 Master。如果 Active 失败,Backup 可以很快进入 Active 状态。

 

说明三:Master 实时监控 Worker 的状态,能够及时收到 Worker 成员变化的通知。Master 在收到 Worker 成员变化通知时,会重新进行任务分配。



(2)Master-Worker 架构示例—HBase


HBase 采用的就是 Master-Worker 的架构。HMBase 是系统中的 Master,HRegionServer 是系统中的 Worker。HMBase 会监控 HBase Cluster 中 Worker 的成员变化,HMBase 会把 region 分配给各个 HRegionServer。系统中有一个 HMaster 处于 Active 状态,其他 HMaster 处于备用状态。



(3)Master-Worker 架构示例—Kafka


一个 Kafka 集群由多个 Broker 组成,这些 Borker 是系统中的 Worker,Kafka 会从这些 Worker 选举出一个 Controller。这个 Controlle 是系统中的 Master,负责把 Topic Partition 分配给各个 Broker。



(4)Master-Worker 架构示例—HDFS


HDFS 采用的也是一个 Master-Worker 的架构。NameNode 是系统中的 Master,DataNode 是系统中的 Worker。NameNode 用来保存整个分布式文件系统的 MetaData,并把数据块分配给 Cluster 中的 DataNode 进行保存。



(5)如何使用 zk 实现 Master-Worker


步骤 1:使用一个临时节点"/master"表示 Master。Master 在行使 Master 的职能之前,首先要创建这个 znode。如果创建成功,进入 Active 状态,开始行使 Master 职能。如果创建失败,则进入 Backup 状态,并使用 Watcher 机制监听"/master"。假设系统中有一个 Active Master 和一个 Backup Master。如果 Active Master 故障了,那么它创建的"/master"就会被 zk 自动删除。这时 Backup Master 会收到通知,再次创建"/master"成为新 Active Master。

 

步骤 2:使用一个持久节点"/workers"下的临时子节点来表示 Worker,Worker 通过在"/workers"节点下创建临时节点来加入集群。

 

步骤 3:处于 Active 状态的 Master 会通过 Watcher 机制,监控"/workers"下的子节点列表来实时获取 Worker 成员的变化。

 

5.zk 实现分布式通信


在大部分的分布式系统中,机器间的通信有三种类型:心跳检测、工作进度汇报、系统调度。

 

(1)心跳检测


机器间的心跳检测是指:在分布式环境中,不同机器间需要检测彼此是否还在正常运行。其中,心跳检测有如下三种方法。

 

方法一:通常会通过机器间是否可以相互 PING 通来判断对方是否正常运行。

 

方法二:在机器间建立长连接,通过 TCP 连接固有的心跳检测机制来实现上层机器的心跳检测。

 

方法三:基于 zk 的临时子节点来实现心跳检测,让不同的机器都在 zk 的一个指定节点下创建临时子节点,不同机器间可以根据这个临时子节点来判断对应的客户端是否存活。基于 zk 的临时节点来实现的心跳检测,可以大大减少系统的耦合。因为检测系统和被检测系统之间不需要直接关联,只需要通过 zk 临时节点间接关联。

 

(2)工作进度汇报


在一个任务分发系统中,任务被分发到不同的机器上执行后,需要实时地将自己的任务执行进度汇报给分发系统。

 

通过 zk 的临时子节点来实现工作进度汇报:可以在 zk 上选择一个节点,每个任务机器都在该节点下创建临时子节点。然后通过判断临时子节点是否存在来确定任务机器是否存活,各个任务机器会实时地将自己的任务执行进度写到其对应的临时节点上,以便中心系统能够实时获取到任务的执行进度。

 

(3)系统调度


一个分布式系统由控制台和一些客户端系统组成,控制台的职责是将一些指令信息发送给所有客户端。

 

使用 zk 实现系统调度时:先让控制台的一些操作指令对应到 zk 的某些节点数据,然后让客户端系统注册对这些节点数据的监听。当控制台进行一些操作时,便会触发修改这些节点的数据,而 zk 会将这些节点数据的变更以事件通知的形式发送给监听的客户端。

 

这样就能省去大量底层网络通信和协议设计上的重复工作了,也大大降低了系统间的耦合,方便实现异构系统的灵活通信。

 

6.zk 实现 Master 选举


Master 选举的需求是:在集群的所有机器中选举出一台机器作为 Master。

 

(1)通过创建临时节点实现


集群的所有机器都往 zk 上创建一个临时节点如"/master"。在这个过程中只会有一个机器能成功创建该节点,则该机器便成为 Master。同时其他没有成功创建节点的机器会在"/master"节点上注册 Watcher 监听,一旦当前 Master 机器挂了,那么其他机器就会重新往 zk 上创建临时节点。

 

(2)通过临时顺序子节点来实现


使用临时顺序子节点来表示集群中的机器发起的选举请求,然后让创建最小后缀数字节点的机器成为 Master。

 

7.zk 实现分布式锁


可以利用 zk 的临时节点来解决死锁问题,可以利用 zk 的 Watcher 监听机制实现锁释放后重新竞争锁,可以利用 zk 数据节点的版本来实现乐观锁。

 

(1)死锁的解决方案


在单机环境下,多线程之间会产生死锁问题。同样,在分布式系统环境下,也会产生分布式死锁的问题。常用的解决死锁问题的方法有超时方法和死锁检测。

 

一.超时方法


在解决死锁问题时,超时方法可能是最简单的处理方式了。超时方式是在创建分布式线程时,对每个线程都设置一个超时时间。当该线程的超时时间到期后,无论该线程是否执行完毕,都要关闭该线程并释放该线程所占用的系统资源,之后其他线程就可以访问该线程释放的资源,这样就不会造成死锁问题。

 

但这种设置超时时间的方法最大的缺点是很难设置一个合适的超时时间。如果时间设置过短,可能造成线程未执行完相关的处理逻辑,就因为超时时间到期就被迫关闭,最终导致程序执行出错。

 

二.死锁检测


死锁检测是处理死锁问题的另一种方法,它解决了超时方法的缺陷。死锁检测方法会主动检测发现线程死锁,在控制死锁问题上更加灵活准确。

 

可以把死锁检测理解为一个运行在各服务器系统上的线程或方法,该方法专门用来发现应用服务上的线程是否发生死锁。如果发生死锁,那么就会触发相应的预设处理方案。

 

(2)zk 如何实现排他锁


一.获取锁


获取排他锁时,所有的客户端都会试图通过调用 create()方法,在"/exclusive_lock"节点下创建临时子节点"/exclusive_lock/lock"。zk 会保证所有的客户端中只有一个客户端能创建临时节点成功,从而获得锁。没有创建临时节点成功的客户端也就没能获得锁,需要到"/exclusive_lock"节点上,注册一个子节点变更的 Watcher 监听,以便可以实时监听 lock 节点的变更情况。

 

二.释放锁


如果获取锁的客户端宕机,那么 zk 上的这个临时节点(lock 节点)就会被移除。如果获取锁的客户端执行完,也会主动删除自己创建的临时节点(lock 节点)。

 

(3)zk 如何实现共享锁(读写锁)


一.获取锁


获取共享锁时,所有客户端会到"/shared_lock"下创建一个临时顺序节点。如果是读请求,那么就创建"/shared_lock/read001"的临时顺序节点。如果是写请求,那么就创建"/shared_lock/write002"的临时顺序节点。

 

二.判断读写顺序


步骤一:客户端在创建完节点后,会获取"/shared_lock"节点下的所有子节点,并对"/shared_lock"节点注册子节点变更的 Watcher 监听。

 

步骤二:然后确定自己的节点序号在所有子节点中的顺序(包括读节点和写节点)。

 

步骤三:对于读请求:如果没有比自己序号小的写请求子节点,或所有比自己小的子节点都是读请求,那么表明可以成功获取共享锁。如果有比自己序号小的子节点是写请求,那么就需要进入等待。对于写请求:如果自己不是序号最小的子节点,那么就需要进入等待。

 

步骤四:如果客户端在等待过程中接收到 Watcher 通知,则重复步骤一。

 

三.释放锁


如果获取锁的客户端宕机,那么 zk 上的对应的临时顺序节点就会被移除。如果获取锁的客户端执行完,也会主动删除自己创建的临时顺序节点。

 

(4)羊群效应


一.排他锁的羊群效应


如果有大量的客户端在等待锁的释放,那么就会出现大量的 Watcher 通知。然后这些客户端又会发起创建请求,但最后只有一个客户端能创建成功。这个 Watcher 事件通知其实对绝大部分客户端都不起作用,极端情况可能会出现 zk 短时间向其余客户端发送大量的事件通知,这就是羊群效应。出现羊群效应的根源在于:没有找准客户端真正的关注点。

 

二.共享锁的羊群效应


如果有大量的客户端在等待锁的释放,那么不仅会出现大量的 Watcher 通知,还会出现大量的获取"/shared_lock"的子节点列表的请求,但最后大部分客户端都会判断出自己并非是序号最小的节点。所以客户端会接收过多和自己无关的通知和发起过多查询节点列表的请求,这就是羊群效应。出现羊群效应的根源在于:没有找准客户端真正的关注点。

 

(5)改进后的排他锁


使用临时顺序节点来表示获取锁的请求,让创建出后缀数字最小的节点的客户端成功拿到锁。

 

步骤一:首先客户端调用 create()方法在"/exclusive_lock"下创建一个临时顺序节点。

 

步骤二:然后客户端调用 getChildren()方法返回"/exclusive_lock"下的所有子节点,接着对这些子节点进行排序。

 

步骤三:排序后,看看是否有后缀比自己小的节点。如果没有,则当前客户端便成功获取到排他锁。如果有,则调用 exist()方法对排在自己前面的那个节点注册 Watcher 监听。

 

步骤四:当客户端收到 Watcher 通知前面的节点不存在,则重复步骤二。

 

(6)改进后的共享锁


步骤一:客户端调用 create()方法在"/shared_lock"节点下创建临时顺序节点。如果是读请求,那么就创建"/shared_lock/read001"的临时顺序节点。如果是写请求,那么就创建"/shared_lock/write002"的临时顺序节点。

 

步骤二:然后调用 getChildren()方法返回"/shared_lock"下的所有子节点,接着对这些子节点进行排序。

 

步骤三:对于读请求:如果排序后发现有比自己序号小的写请求子节点,则需要等待,且需要向比自己序号小的最后一个写请求子节点注册 Watcher 监听。对于写请求:如果排序后发现自己不是序号最小的子节点,则需要等待,并且需要向比自己序号小的最后一个请求子节点注册 Watcher 监听。注意:这里注册 Watcher 监听也是调用 exist()方法。此外,不满足上述条件则表示成功获取共享锁。

 

步骤四:如果客户端在等待过程中接收到 Watcher 通知,则重复步骤二。

 

(7)zk 原生实现分布式锁的示例


一.分布式锁的实现步骤


步骤一:每个线程都通过"临时顺序节点 + zk.create()方法 + 添加回调"去创建节点。

 

步骤二:线程执行完创建临时顺序节点后,先通过 CountDownLatch.await()方法进行阻塞。然后在创建成功的回调中,通过 zk.getChildren()方法获取根目录并继续回调。

 

步骤三:某线程在获取根目录成功后的回调中,会对目录排序。排序后如果发现其创建的节点排第一,那么就执行 countDown()方法不再阻塞,表示获取锁成功。排序后如果发现其创建的节点不是第一,则通过 zk.exists()方法监听前一节点。

 

步骤四:获取到锁的线程会通过 zk.delete()方法来删除其对应的节点实现释放锁。在等候获取锁的线程掉线时其对应的节点也会被删除。而一旦节点被删除,那些监听根目录的线程就会重新 zk.getChildren()方法,获取成功后其回调又会进行排序以及通过 zk.exists()方法监听前一节点。

 

二 WatchCallBack 对分布式锁的具体实现


public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {    ZooKeeper zk ;    String threadName;    CountDownLatch countDownLatch = new CountDownLatch(1);    String pathName;        public String getPathName() {        return pathName;    }        public void setPathName(String pathName) {        this.pathName = pathName;    }        public String getThreadName() {        return threadName;    }        public void setThreadName(String threadName) {        this.threadName = threadName;    }        public ZooKeeper getZk() {        return zk;    }        public void setZk(ZooKeeper zk) {        this.zk = zk;    }        public void tryLock() {        try {            System.out.println(threadName + " create....");            //创建一个临时的有序的节点            zk.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc");            countDownLatch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }    }        //当前线程释放锁, 删除节点    public void unLock() {        try {            zk.delete(pathName, -1);            System.out.println(threadName + " over work....");        } catch (InterruptedException e) {            e.printStackTrace();        } catch (KeeperException e) {            e.printStackTrace();        }    }        //上面zk.create()方法的回调    //创建临时顺序节点后的回调, 10个线程都能同时创建节点    //创建完后获取根目录下的子节点, 也就是这10个线程创建的节点列表, 这个不用watch了, 但获取成功后要执行回调    //这个回调就是每个线程用来执行节点排序, 看谁是第一就认为谁获得了锁    @Override    public void processResult(int rc, String path, Object ctx, String name) {        if (name != null ) {            System.out.println(threadName  + "  create node : " +  name );            setPathName(name);            //一定能看到自己前边的, 所以这里的watch要是false            zk.getChildren("/", false, this ,"sdf");        }    }        //核心方法: 各个线程获取根目录下的节点时, 上面zk.getChildren("/", false, this ,"sdf")的回调    @Override    public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {        //一定能看到自己前边的节点        System.out.println(threadName + "look locks...");        for (String child : children) {            System.out.println(child);        }        //根目录下的节点排序        Collections.sort(children);        //获取当前线程创建的节点在根目录中排第几        int i = children.indexOf(pathName.substring(1));        //是不是第一个, 如果是则说明抢锁成功; 如果不是, 则watch当前线程创建节点的前一个节点是否被删除(删除);        if (i == 0) {            System.out.println(threadName + " i am first...");            try {                //这里的作用就是不让第一个线程获得锁释放锁跑得太快, 导致后面的线程还没建立完监听第一个节点就被删了                zk.setData("/", threadName.getBytes(), -1);                countDownLatch.countDown();            } catch (KeeperException e) {                e.printStackTrace();            } catch (InterruptedException e) {                e.printStackTrace();            }        } else {            //9个没有获取到锁的线程都去调用zk.exists, 去监控各自自己前面的节点, 而没有去监听父节点            //如果各自前面的节点发生删除事件的时候才回调自己, 并关注被删除的事件(所以会执行process回调)            zk.exists("/" + children.get(i-1), this, this, "sdf");        }    }        //上面zk.exists()的监听    //监听的节点发生变化的Watcher事件监听    @Override    public void process(WatchedEvent event) {        //如果第一个获得锁的线程释放锁了, 那么其实只有第二个线程会收到回调事件        //如果不是第一个哥们某一个挂了, 也能造成他后边的收到这个通知, 从而让他后边那个去watch挂掉这个哥们前边的, 保持顺序        switch (event.getType()) {            case None:                break;            case NodeCreated:                break;            case NodeDeleted:                zk.getChildren("/", false, this ,"sdf");                break;            case NodeDataChanged:                break;            case NodeChildrenChanged:                break;        }    }        @Override    public void processResult(int rc, String path, Object ctx, Stat stat) {        //TODO    }}
复制代码


三.分布式锁的测试类


public class TestLock {    ZooKeeper zk;        @Before    public void conn () {        zk  = ZKUtils.getZK();    }        @After    public void close () {        try {            zk.close();        } catch (InterruptedException e) {            e.printStackTrace();        }    }        @Test    public void lock() {        //10个线程都去抢锁        for (int i = 0; i < 10; i++) {            new Thread() {                @Override                public void run() {                    WatchCallBack watchCallBack = new WatchCallBack();                    watchCallBack.setZk(zk);                    String threadName = Thread.currentThread().getName();                    watchCallBack.setThreadName(threadName);                    //每一个线程去抢锁                    watchCallBack.tryLock();                    //抢到锁之后才能干活                    System.out.println(threadName + " working...");                    try {                        Thread.sleep(1000);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    //干完活释放锁                    watchCallBack.unLock();                }            }.start();        }        while(true) {        }    }}
复制代码


8.zk 实现分布式队列和分布式屏障

 

(1)分布式队列的实现


步骤一:所有客户端都到"/queue"节点下创建一个临时顺序节点。

 

步骤二:通过调用 getChildren()方法来获取"/queue"节点下的所有子节点。

 

步骤三:客户端确定自己的节点序号在所有子节点中的顺序。

 

步骤四:如果自己不是序号最小的子节点,那么就需要进入等待,同时调用 exists()方法向比自己序号小的最后一个节点注册 Watcher 监听。

 

步骤五:如果客户端收到 Watcher 事件通知,重复步骤二。

 

(2)分布式屏障的实现


"/barrier"节点是一个已存在的默认节点,"/barrier"节点的值是数字 n,表示 Barrier 值,比如 10。

 

步骤一:首先,所有客户端都需要到"/barrier"节点下创建一个临时节点。

 

步骤二:然后,客户端通过 getData()方法获取"/barrier"节点的数据内容,比如 10。

 

步骤三:接着,客户端通过 getChildren()方法获取"/barrier"节点下的所有子节点,同时注册对子节点列表变更的 Watcher 监听。

 

步骤四:如果客户端发现"/barrier"节点的子节点个数不足 10 个,那么就需要进入等待。

 

步骤五:如果客户端接收到了 Watcher 事件通知,那么就重复步骤三。


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18808532

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
zk基础—zk实现分布式功能_分布式_不在线第一只蜗牛_InfoQ写作社区