写点什么

ZooKeeper 分布式配置——看这篇就够了

  • 2021 年 11 月 11 日
  • 本文字数:4128 字

    阅读完需:约 14 分钟

如果 ZooKeeper 判断当前客户端需要进行 Watcher 注册,会将当前的 ServerCnxn 对象和数据路径传入 getData 方法中去。ServerCnxn 是 ZooKeeper 客户端和服务端之间的连接接口,代表了一个客户端和服务端的连接,可以将 ServerCnxn 当做一个 Watcher 对象,因为它实现了 Watcher 的 process 接口。


  1. WatcherManager


WatcherManager 是 ZK 服务端 Watcher 的管理器,分为 WatchTable 和 Watch2Paths 两个存储结构,这两个是不同的存储结构


1)WatchTable: 从数据节点路径的颗粒度管理 Watcher


2)Watch2Paths:从 Watcher 的颗粒度来控制时间出发的数据节点


在服务端,DataTree 中会托管两个 WatchManager, 分别是 dataWatches (数据变更 Watch) 和 childWatches(子节点变更 Watch)。


  1. Watcher 触发逻辑


1)封装 WatchedEven:将(KeeperState(通知状态),EventType(事件类型),Path(节点路径))封装成一个 WatchedEvent 对象


2)查询 Watcher:根据路径取出对应的 Watcher,如果存在,取出数据同时从 WatcherManager(WatchTable/Watch2Paths) 中删除


3)调用 Process 方法触发 Watcher


4.客户端回调 Watcher


1)反序列化:字节流转换成 WatcherEvent 对象


2)处理 chrootPath:如果客户端设置了 chrootPath 属性,那么需要对服务器传过来的完整节点路径进行 chrootPath 处理,生成客户端的一个相对节点路径。比如(/mxn/app/love,经过 chrootPath 处理,会变成 /love)


3)还原 WatchedEvent:WatcherEvent 转换成 WatchedEvent


4)回调 Watcher:将 WatcherEvent 对象交给 EventThread 线程,在下一个轮询周期中进行 Watcher 回调


  1. EventThread 处理时间通知


1) SendThread 接收到服务端的通知事件后,会通过调用 EventThread.queueEvent 方法将事件传给 EventThread 线程


2)queueEvent 方法首先会根据该通知事件,从 ZKWatchManager 中取出所有相关的 Watcher 客户端识别出 事件类型 EventType 后,会从相应的 Watcher 存储 (即 3 个注册方法( dataWatches、existWatcher 或 childWatcher)中去除对应的 Watcher


3) 获取到相关的所有 Watcher 后,会将其放入 waitingEvents 这个队列去


代码实现




下面我们就来演示如何使用代码来实现 ZooKeeper 的配置


首先我们需要引入 ZK 的 jar


<dependency>


<groupId>org.apache.zookeeper</groupId>


<artifactId>zookeeper</artifactId>


<version>3.6.3</version>


</dependency>

配置类

既然我们要做的是分布式配置,首先我们需要模拟一个配置,这个配置用来同步服务的地址


/**


  • @program: mxnzookeeper

  • @ClassName MyConf

  • @description: 配置类

  • @author: muxiaonong

  • @create: 2021-10-19 22:18

  • @Version 1.0


**/


public class MyConfig {


private String conf ;


public String getConf() {


return conf;


}


public void setConf(String conf) {


this.conf = conf;


}


}

Watcher

创建 ZooKeeper 的时候,我们需要一个 Watcher 进行监听,后续对 Znode 节点操作的时候,我们也需要使用到 Watcher,但是这两类的功能不一样,所以我们需要定义一个自己的 watcher 类,如下所示:


import org.apache.zookeeper.WatchedEvent;


import org.apache.zookeeper.Watcher;


import java.util.concurrent.CountDownLatch;


/**


  • @program: mxnzookeeper

  • @ClassName DefaultWatch

  • @description:

  • @author: muxiaonong

  • @create: 2021-10-19 22:02

  • @Version 1.0


**/


public class DefaultWatch implements Watcher {


CountDownLatch cc;


public void setCc(CountDownLatch cc) {


this.cc = cc;


}


@Override


public void process(WatchedEvent event) {


System.out.println(event.toString());


switch (event.getState()) {


case Unknown:


break;


case Disconnected:


break;


case NoSyncConnected:


break;


case SyncConnected:


System.out.println("连接成功。。。。。");


//连接成功后,执行 countDown,此时便可以拿 zk 对象使用了


cc.countDown();


break;


case AuthFailed:


break;


case ConnectedReadOnly:


break;


case SaslAuthenticated:


break;


case Expired:


break;


case Closed:


break;


}


}


}


由于是异步进行操作的,我们创建一个 ZooKeeper 对象之后,如果不进行阻塞操作的话,有可能还没有连接完成就执行后续的操作,所以这里我们用 CountDownLatch进行阻塞操作,当监测连接成功后,进行 countDown放行,执行后续的 ZK 的动作。


当我们连接成功 ZooKeeper 之后,我们需要通过 exists判断是否存在节点,存在就进行 getData 操作。这里我们创建一个 WatchCallBack因为exists和getData都需要一个callback,所以除了实现 Watcher 以外还需要实现节点状态:AsyncCallback.StatCallback 数据监听:AsyncCallback.DataCallback


import org.apache.zookeeper.AsyncCallback;


import org.apache.zookeeper.WatchedEvent;


import org.apache.zookeeper.Watcher;


import org.apache.zookeeper.ZooKeeper;


import org.apache.zookeeper.data.Stat;


import java.util.concurrent.CountDownLatch;


/**


  • @program: mxnzookeeper

  • @ClassName WatchCallBack

  • @description:

  • @author: muxiaonong

  • @create: 2021-10-19 22:13

  • @Version 1.0


**/


public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback {


ZooKeeper zk ;


MyConfig conf ;


CountDownLatch cc = new CountDownLatch(1);


public MyConfig getConf() {


return conf;


}


public void setConf(MyConfig conf) {


this.conf = conf;


}


public ZooKeeper getZk() {


return zk;


}


public void setZk(ZooKeeper zk) {


this.zk = zk;


}


public void aWait(){


//exists 的异步实现版本


zk.exists(ZKConstants.ZK_NODE,this,this ,"exists watch");


try {


cc.await();


} catch (InterruptedException e) {


e.printStackTrace();


}


}


/** @Author mxn


  • @Description //TODO 此回调用于检索节点的 stat

  • @Date 21:24 2021/10/20

  • @param rc 调用返回的 code 或结果

  • @param path 传递给异步调用的路径

  • @param ctx 传递给异步调用的上下文对象

  • @param stat 指定路径上节点的 Stat 对象

  • @return


**/


@Override


public void processResult(int rc, String path, Object ctx, Stat stat) {


if(stat != null){


//getData 的异步实现版本


zk.getData(ZKConstants.ZK_NODE,this,this,"status");


}


}


/** @Author mxn


  • @Description //TODO 此回调用于检索节点的数据和 stat

  • @Date 21:23 2021/10/20

  • @param rc 调用返回的 code 或结果

  • @param path 传递给异步调用的路径

  • @param ctx 传递给异步调用的上下文对象

  • @param data 节点的数据

  • @param stat 指定节点的 Stat 对象

  • @return


**/


@Override


public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {


if(data != null ){


String s = new String(data);


conf.setConf(s);


cc.countDown();


}


}


/** @Author mxn


  • @Description //TODO Watcher 接口的实现。

  • @Date 21:24 2021/10/20

  • @Param watchedEvent WatchedEvent 表示监视者能够响应的 ZooKeeper 上的更改。

  • @return


**/


@Override


public void process(WatchedEvent event) {


switch (event.getType()) {


case None:


break;


case NodeCreated:


//当一个 node 被创建后,获取 node


//getData 中又会触发 StatCallback 的回调 processResult


zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");


break;


case NodeDeleted:


//节点删除


conf.setConf("");


//重新开启 CountDownLatch


cc = new CountDownLatch(1);


break;


case NodeDataChanged:


//节点数据被改变了


//触发 DataCallback 的回调


zk.getData(ZKConstants.ZK_NODE,this,this,"sdfs");


break;


//子节点发生变化的时候


case NodeChildrenChanged:


break;


}


}


}


当前面准备好了之后,我们可以编写测试用例了:


ZKUtils 工具类


import org.apache.zookeeper.ZooKeeper;


import java.util.concurrent.CountDownLatch;


/**


  • @program: mxnzookeeper

  • @ClassName ZKUtils

  • @description:

  • @author: muxiaonong

  • @create: 2021-10-19 21:59

  • @Version 1.0


**/


public class ZKUtils {


private static ZooKeeper zk;


//192.168.5.130:2181/mxn 这个后面/mxn,表示客户端如果成功建立了到 zk 集群的连接,


// 那么默认该客户端工作的根 path 就是/mxn,如果不带/mxn,默认根 path 是/


//当然我们要保证/mxn 这个节点在 ZK 上是存在的


private static String address ="192.18.5.129:2181,192.168.5.130:2181,192.168.5.130:2181/mxn";


private static DefaultWatch watch = new DefaultWatch();


private static CountDownLatch init = new CountDownLatch(1);


public static ZooKeeper getZK(){


try {


//因为是异步的,所以要 await,等到连接上 zk 集群之后再进行后续操作


zk = new ZooKeeper(address,1000,watch);


watch.setCc(init);


init.await();


} catch (Exception e) {


e.printStackTrace();


}


return zk;


}


}


测试类:


import org.apache.zookeeper.ZooKeeper;


import org.junit.Before;


imp


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


ort org.junit.Test;


/**


  • @program: mxnzookeeper

  • @ClassName TestConfig

  • @description:

  • @author: muxiaonong

  • @create: 2021-10-19 22:04

  • @Version 1.0


**/


public class TestConfig {


ZooKeeper zk;


@Before


public void conn(){


zk = ZKUtils.getZK();


}


/** @Author mxn


  • @Description //TODO 关闭 ZK

  • @Date 21:16 2021/10/20

  • @Param

  • @return


**/


public void close(){


try {


zk.close();


}catch (Exception e){


e.printStackTrace();


}


}


@Test


public void getConf(){


WatchCallBack watchCallBack = new WatchCallBack();


watchCallBack.setZk(zk);


MyConfig myConfig = new MyConfig();


watchCallBack.setConf(myConfig);


//阻塞等待


watchCallBack.aWait();


while(true){


if(myConfig.getConf().equals("")){


System.out.println("zk node 节点丢失了 ......");


watchCallBack.aWait();


}else{


System.out.println(myConfig.getConf());


}


//


try {


//每隔 500 毫秒打印一次


Thread.sleep(500);


} catch (InterruptedException e) {


e.printStackTrace();


}


}


}


运行测试




首先我们要知道,因为我们连接 IP 的时候加上了 /mxn这个目录结构,所以我们在服务器初始状态就必须要有这个节点:


集群初始状态:


[zk: localhost:2181(CONNECTED) 7] ls /


[mxn, zookeeper]


我们启动程序看看

评论

发布
暂无评论
ZooKeeper分布式配置——看这篇就够了