写点什么

zookeeper 使用篇 -Zookeeper Api 实践,flutter 下拉刷新上拉加载更多

用户头像
Android架构
关注
发布于: 刚刚

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)throws IOException{this(connectString, sessionTimeout, watcher, false);}//第二个构造 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly) throws IOException {this(connectString, sessionTimeout, watcher, canBeReadOnly,createDefaultHostProvider(connectString));}//第三个构造 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider)throws IOException {this(connectString, sessionTimeout, watcher, canBeReadOnly,aHostProvider, null);}//第四个构造 public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider,ZKClientConfig clientConfig) throws IOException {LOG.info("Initiating client connection, connectString=" + connectString


  • " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);


if (clientConfig == null) {clientConfig = new ZKClientConfig();}this.clientConfig = clientConfig;watchManager = defaultWatchManager();watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider;


cnxn = createConnection(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();}


可以看到 Zookeeper 的源码中总共提供了四个构造函数,参数从三个到六个不等,我们分别来看看这几个参数的说明:


connectString


connectString 代表 Zookeeper 的服务器列表,每个 zk 服务器使用 host:port 字符串组成,多个 zk 服务器之间使用英文输入法的逗号分隔,需要注意的是我们还可以在连接中指定根目录,例如


192.168.1.3 :2181/zk-root,则当客户端连接上当前 zk 服务器的时候,后续所有的操作都会基于当前目录作为根目录进行操作


sessionTimeout


sessionTimeout 指的是会话时间,单位是毫秒,zk 引入了会话的概念,在一个会话周期内,zk 会通过心跳检测机制来维持会话的有效性,一旦在当前指定的时间内没有完成心跳检测,则视为当前会话已经失效


watcher


watcher 是 zk 中提供的事件通知机制,在 zk 中允许传入当前类型的接口实例来作为客户端默认的事件通知处理器,如果我们当前不需要引入事件通知机制,当前参数传递 null 即可


canBeReadOnly


canBeReadOnly 是 zk 中提供的一个保护机制,默认情况下 zk 集群如果出现大半的机器失去网络连接(宕机),那么 zk 将进入保护,不再处理任何客户端请求(包括读),但是如果我们需要继续提供读服务操作,则需要将当前的参数设置为 true


aHostProvider


aHostProvider 是 zk 提供的支持客户端的自定义行为操作,内部提供 next、onConnected 以及 updateServerList 等操作


clientConfig


clientConfig 是 zk 提供的用来灵活配置的配置类,可以将连接的信息配置进去,例如 zk 超时时间,连接方式,连接信息等


了解了 Zookeeper 的构造函数,我们来编写第一个 demo,创建一个 zk 会话实例:


/**


  • 会话连接 demo*/


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


public class SessionDemo implements Watcher {private static CountDownLatch latch = new CountDownLatch(1);


public static void main(String[] args) throws IOException, InterruptedException {ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181",5000,new SessionDemo());System.out.println(zooKeeper.getState());latch.await();}


@Overridepublic void process(WatchedEvent event) {System.out.println("触发了监听事件:"+event);//如果当前是异步连接状态 if(Event.KeeperState.SyncConnected == event.getState()){latch.countDown();}}}


点击运行以后,可以看到控制台的输出:


CONNECTING 触发了监听事件:WatchedEvent state:SyncConnected type:None path:null

创建节点

zookeeper 提供了两种创建节点的方式,一种是同步创建,一种是异步,我们首先来看看 create 方法:


//创建同步节点 public String create(final String path, byte data[], List<ACL> acl,CreateMode createMode)throws KeeperException, InterruptedException


//创建异步节点 public void create(final String path, byte data[], List<ACL> acl,CreateMode createMode, StringCallback cb, Object ctx)


可以看到异步创建节点的接口多了两个参数,一个是 callback 回调,一个是传递的 object 对象,接下来我们针对这些参数进行一下说明:


path


path 是创建 zk 节点的时候指定的对应路径,如:/root


data[]


**data[]**是创建节点的时候对应存储的内容


acl


acl 是创建节点的时候的策略


createMode


createMode 是创建节点的时候指定的节点类型,通常指定四种类型(实际不止):


1.持久化节点--PERSISTENT


2.持久化顺序节点--PERSISTENT _ S E QUENTIAL


3.临时节点--EPHEMERAL


4.临时顺序节点-- EPHEMERAL _ S EQUENTIAL


cb


cb 是注册一个异步回调函数,类型为 StringCallback,一般重写 void processResult( int rc , String path , Object ctx , Stringn ame ) ;当 zk 创建节点完毕以后,会自动调用这个方法,可以在当前方法内处理对应的业务逻辑


ctx


ctx 在创建节点的时候可以传递的对象,在 StringCallback 的回调函数执行的时候使用(传递),通常会传递上下文(Context)


注意:这里需要注意的是,zk 默认创建的时候不会给我们进行序列化,需要我们手动序列化转换为字节数组传递节点存储的内容,并且 zk 默认不允许跨节点创建,即如果上层路径不存在的情况下,直接创建子节点,并且如果当前路径对应节点已经创建,再去创建并不会覆盖原来的内容,会抛出 NodeExistsException 异常

创建同步节点

接下来,我们来创建一个同步节点:


/***创建同步节点 demo**/public class CreateDemo implements Watcher {private static final CountDownLatch LATCH = new CountDownLatch(1);


public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",5000,new CreateDemo());LATCH.await();//等待连接上 zk 以后 String path1 = zk.create("/test01", "test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("创建无序节点:"+path1);


String path2 = zk.create("/test02", "test02".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println("创建有序节点:"+path2);}


@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected == event.getState()){LATCH.countDown();}}}


我们来运行一下,可以看到控制台输出的内容:


创建无序节点:/test01 创建有序节点:/test020000000002

创建异步节点

创建异步节点,我们需要注意传递 ctx 参数,以及 StringCallback 的实例,并且我们需要注意的是,create 方法并不会阻塞后续业务执行:


/***创建异步节点 demo**/public class CreateAyncDemo implements Watcher {private static final CountDownLatch LATCH = new CountDownLatch(1);


public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",5000,new CreateAyncDemo());LATCH.await();//等待连接上 zk 以后 zk.create("/testAsyn001", "testAsyn001".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new IStringCallback(),"我是上下文");


zk.create("/testAsyn002", "testAsyn002".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,new IStringCallback(),"我是上下文");Thread.sleep(Integer.MAX_VALUE);}


@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected == event.getState()){LATCH.countDown();}}


}class IStringCallback implements AsyncCallback.StringCallback{@Overridepublic void processResult(int rc, String path, Object ctx, String name) {System.out.println("创建节点完毕的回调函数:[rc:"+rc+",path:"+path+",ctx:"+ctx+",name:"+name+"]");}}


我们使用 Thread.sleep 阻塞一下当前的业务,运行可以看到控制台成功输出:


创建节点完毕的回调函数:[rc:0,path:/testAsyn001,ctx:我是上下文,name:/testAsyn001]创建节点完毕的回调函数:[rc:0,path:/testAsyn002,ctx:我是上下文,name:/testAsyn0020000000009]

删除节点

客户端通过 delete 方法来删除某一个节点,delete 方法如下:


//同步删除 public void delete(final String path, int version)throws InterruptedException, KeeperException//异步删除 public void delete(final String path, int version, VoidCallback cb,Object ctx)


可以看到删除节点操作和创建节点一样,支持同步和异步操作,但是这里我们需要注意的是,**在 Zookeeper 中,只运行按照顺序删除节点,即子节点存在无法直接删除父节点,必须优先删除下面的所有子节点!**接下来我们编写一个案例,删除节点:


public class DeleteDemo implements Watcher {private static final CountDownLatch LATCH = new CountDownLatch(1);


public static void main(String[] args) throws IOException, InterruptedException, KeeperException {ZooKeeper zk = new ZooKeeper("127.0.0.1:2181",5000,new DeleteDemo());LATCH.await();//等待连接上 zk 以后 String path1 = zk.create("/test01", "test01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);System.out.println("创建无序节点:"+path1);


String path2 = zk.create("/test02", "test02".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println("创建有序节点:"+path2);//触发同步删除 zk.delete("/test01",0);//触发异步删除 zk.delete("/test02",0,new IVoidCallback(),"我是触发异步删除的数据");Thread.sleep(Integer.MAX_VALUE);}


@Overridepublic void process(WatchedEvent event) {if(Event.KeeperState.SyncConnected == event.getState()){LATCH.countDown();}}}


class IVoidCallback implements AsyncCallback.VoidCallback{


@Overridepublic void processResult(int rc, String path, Object ctx) {System.out.println("异步删除节点触发:[path:"+path+",rc:"+rc+",ctx:"+ctx+"]");}}


运行以后,可以看到控制台的输出如下:


创建无序节点:/test01 创建有序节点:/test020000000011 异步删除节点触发:[path:/test02,rc:-101,ctx:我是触发异步删除的数据]

获取数据

在 Zookeeper 中有两种获取数据的接口,一种是获取当前节点的相关数据信息,一种是获取当前节点下的子节点相关的数据信息,接下来我们来看看这两种 Api 接口

getData

客户端通过 getData 方法可以获取当前节点的相关数据内容,方法如下:


public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException//第二个 public byte[] getData(String path, boolean watch, Stat stat) throws KeeperException, InterruptedException//第三个 public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)//第四个 public void getData(String path, boolean watch, DataCallback cb, Object ctx)


可见这里也提供了同步获取数据和异步获取数据的方法,接下来我们来看看对应参数的说明:


path


path 是需要获取数据的节点所在的路径


watcher


watcher 是用来注册的通知接口,如果节点发生变更,就会通过当前的接口通知客户端


stat

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
zookeeper使用篇-Zookeeper Api实践,flutter下拉刷新上拉加载更多