想搞清楚 ZooKeepe?这篇入门你必须了解!
EPEMERAL_SEQUENTIAL,临时性顺序编号 ZNode。和临时性节点一样,断开连接会被删除,并且 ZNode 的编号会自动增加。
四、监听通知机制
Watcher 是基于观察者模式实现的一种机制。如果我们需要实现当某个 ZNode 节点发生变化时收到通知,就可以使用 Watcher 监听器。
客户端通过设置监视点(watcher)向 ZooKeeper 注册需要接收通知的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送消息。
这种通知机制是一次性的。一旦 watcher 被触发,ZooKeeper 就会从相应的存储中删除。如果需要不断监听 ZNode 的变化,可以在收到通知后再设置新的 watcher 注册到 ZooKeeper。
监视点的类型有很多,如监控 ZNode 数据变化、监控 ZNode 子节点变化、监控 ZNode 创建或删除。
五、选举机制
ZooKeeper 是一个高可用的应用框架,因为 ZooKeeper 是支持集群的。ZooKeeper 在集群状态下,配置文件是不会指定 Master 和 Slave,而是在 ZooKeeper 服务器初始化时就在内部进行选举,产生一台做为 Leader,多台做为 Follower,并且遵守半数可用原则。
由于遵守半数可用原则,所以 5 台服务器和 6 台服务器,实际上最大允许宕机数量都是 3 台,所以为了节约成本,集群的服务器数量一般设置为奇数。
如果在运行时,如果长时间无法和 Leader 保持连接的话,则会再次进行选举,产生新的 Leader,以保证服务的可用。
六、初の体验
首先在官网下载 ZooKeeper,我这里用的是 3.3.6 版本。
然后解压,复制一下/conf 目录下的 zoo_sample.cfg 文件,重命名为 zoo.cfg。
修改 zoo.cfg 中 dataDir 的值,并创建对应的目录:
最后到/bin 目录下启动,我用的是 window 系统,所以启动 zkServer.cmd,双击即可:
启动成功的话就可以看到这个对话框:
可视化界面的话,我推荐使用ZooInspector
,操作比较简便:
6.1 使用 java 连接 ZooKeeper
首先引入 Maven 依赖:
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.4.6</version></dependency>
接着我们写一个 Main 方法,进行操作:
//连接地址及端口号 private static final String SERVER_HOST = "127.0.0.1:2181";
//会话超时时间 private static final int SESSION_TIME_OUT = 2000;
public static void main(String[] args) throws Exception {//参数一:服务端地址及端口号//参数二:超时时间//参数三:监听器 ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {@Overridepublic void process(WatchedEvent watchedEvent) {//获取事件的状态 Event.KeeperState state = watchedEvent.getState();//判断是否是连接事件 if (Event.KeeperState.SyncConnected == state) {Event.EventType type = watchedEvent.getType();if (Event.EventType.None == type) {System.out.println("zk 客户端已连接...");}}}});zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("新增 ZNode 成功");zooKeeper.close();}
创建一个持久性 ZNode,路径是/java,值为"Hello World":
七、API 概述
7.1 创建
public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
参数解释:
path ZNode 路径
data ZNode 存储的数据
acl ACL 权限控制
createMode ZNode 类型
ACL 权限控制,有三个是 ZooKeeper 定义的常用权限,在 ZooDefs.Ids 类中:
/**
This is a completely open ACL.
完全开放的 ACL,任何连接的客户端都可以操作该属性 znode*/public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));
/**
This ACL gives the creators authentication id's all permissions.
只有创建者才有 ACL 权限*/public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));
/**
This ACL gives the world the ability to read.
只能读取 ACL*/public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));
createMode 就是前面讲过的四种 ZNode 类型:
public enum CreateMode {/**
持久性 ZNode/PERSISTENT (0, false, false),/*
持久性自动增加顺序号 ZNode/PERSISTENT_SEQUENTIAL (2, false, true),/*
临时性 ZNode/EPHEMERAL (1, true, false),/*
临时性自动增加顺序号 ZNode*/EPHEMERAL_SEQUENTIAL (3, true, true);}
7.2 查询
//同步获取节点数据 public byte[] getData(String path, boolean watch, Stat stat){...}
//异步获取节点数据 public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){...}
同步 getD
ata()方法中的 stat 参数是用于接收返回的节点描述信息:
public byte[] getData(final String path, Watcher watcher, Stat stat){//省略...GetDataResponse response = new GetDataResponse();//发送请求到 ZooKeeper 服务器,获取到 responseReplyHeader r = cnxn.submitRequest(h, request, response, wcb);if (stat != null) {//把 response 的 Stat 赋值到传入的 stat 中 DataTree.copyStat(response.getStat(), stat);}}
使用同步 getData()获取数据:
//数据的描述信息,包括版本号,ACL 权限,子节点信息等等 Stat stat = new Stat();//返回结果是 byte[]数据,getData()方法底层会把描述信息复制到 stat 对象中 byte[] bytes = zooKeeper.getData("/java", false, stat);//打印结果 System.out.println("ZNode 的数据 data:" + new String(bytes));//Hello WorldSystem.out.println("获取到 dataVersion 版本号:" + stat.getVersion());//默认数据版本号是 0
7.3 更新
public Stat setData(final String path, byte data[], int version){...}
值得注意的是第三个参数 version,使用 CAS 机制,这是为了防止多个客户端同时更新节点数据,所以需要在更新时传入版本号,每次更新都会使版本号+1,如果服务端接收到版本号,对比发现不一致的话,则会抛出异常。
所以,在更新前需要先查询获取到版本号,否则你不知道当前版本号是多少,就没法更新:
//获取节点描述信息 Stat stat = new Stat();zooKeeper.getData("/java", false, stat);System.out.println("更新 ZNode 数据...");//更新操作,传入路径,更新值,版本号三个参数,返回结果是新的描述信息 Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());System.out.println("更新后的版本号为:" + setData.getVersion());//更新后的版本号为:1
更新后,版本号增加了:
如果传入的版本参数是"-1",就是告诉 zookeeper 服务器,客户端需要基于数据的最新版本进行更新操作。但是-1 并不是一个合法的版本号,而是一个标识符。
7.4 删除
public void delete(final String path, int version){...}
path 删除节点的路径
version 版本号
这里也需要传入版本号,调用 getData()方法即可获取到版本号,很简单:
Stat stat = new Stat();zooKeeper.getData("/java", false, stat);//删除 ZNodezooKeeper.delete("/java", stat.getVersion());
7.5 watcher 机制
在上面第三点提到,ZooKeeper 是可以使用通知监听机制,当 ZNode 发生变化会收到通知消息,进行处理。基于 watcher 机制,ZooKeeper 能玩出很多花样。怎么使用?
ZooKeeper 的通知监听机制,总的来说可以分为三个过程:
①客户端注册 Watcher ②服务器处理 Watcher ③客户端回调 Watcher 客户端。
注册 watcher 有 4 种方法,new ZooKeeper()、getData()、exists()、getChildren()。下面演示一下使用 exists()方法注册 watcher:
首先需要实现 Watcher 接口,新建一个监听器:
public class MyWatcher implements Watcher {@Overridepublic void process(WatchedEvent event) {//获取事件类型 Event.EventType eventType = event.getType();//通知状态 Event.KeeperState eventState = event.getState();//节点路径 String eventPath = event.getPath();System.out.println("监听到的事件类型:" + eventType.name());System.out.println("监听到的通知状态:" + eventState.name());System.out.println("监听到的 ZNode 路径:" + eventPath);}}
然后调用 exists()方法,注册监听器:
zooKeeper.exists("/java", new MyWatcher());//对 ZNode 进行更新数据的操作,触发监听器 zooKeeper.setData("/java", "fly".getBytes(), -1);
然后在控制台就可以看到打印的信息:
评论