写点什么

ZooKeeper 浅析(二)

作者:andy
  • 2022-10-29
    北京
  • 本文字数:6727 字

    阅读完需:约 22 分钟

6-客户端操作


ZooKeeper 客户端操作

ZooKeeper 是一个树形目录的存储结构,以“/”根目录为主。可以通过登陆客户端进行 ZooKeeper 节点操作:/app/zookeeper/bin/zkCli.sh

主机 zk-server-03 登陆主机 zk-server-01 的 ZooKeeper 服务:

/app/zookeeper/bin/zkCli.sh -server zk-server-01

查看所有命令,使用 help

ZooKeeper -server host:port cmd args

stat path [watch]

set path data [version]

ls path [watch]

delquota [-n|-b] path

ls2 path [watch]

setAcl path acl

setquota -n|-b val path

history

redo cmdno

printwatches on|off

delete path [version]

sync path

listquota path

rmr path

get path [watch]

create [-s] [-e] path data acl

addauth scheme auth

quit

getAcl path

close

connect host:port

列出根目录下的所有节点:

ls /

返回结果:

[zookeeper]

该第二层节点描述的是各个 ZooKeeper 连接信息。

创建并设置节点数据:

create /fuys 0822

注意:ZooKeeper 的节点只能一层一层创建,同时也只能一层一层删除。

修改节点数据:

set /fuys 1988

查看节点数据信息:

get /fuys

返回结果为:

0822

cZxid = 0x100000004

ctime = Wed Jan 17 23:47:49 CST 2018

mZxid = 0x100000004

mtime = Wed Jan 17 23:47:49 CST 2018

pZxid = 0x100000004

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 4

numChildren = 0

节点数据进行监听:

get /fuys watch

当监听节点发生改变时,则返回一次结果,若多次修改,则不能返回多次:

WatchedEvent state:SyncConnected type:NodeDataChanged path:/fuys


7-集群部署


ZooKeeper 集群部署

ZooKeeper 集群至少需要三台以上的机器。

1、实际的项目运行过程中,所有的配置建议以主机名称为主(IP 也一样),同时建议保证主机名称应该不一样。为了保证三台主机相互访问没有问题,统一修改/etc/hosts 配置文件

127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4

::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

127.0.1.1 localhost

192.168.6.128 zk-server-01

192.168.6.129 zk-server-02

192.168.6.130 zk-server-03

2、修改/etc/profile 文件,增加环境变量信息

ZK_HOME=/app/zookeeper

PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH:$ZK_HOME/bin:

3、建立各自的 ZooKeeper 数据保存目录

mkdir -p /data/zookeeper/{log,data}/

注意:原始设置的保存为/temp/zookeeper,每次启动之后,会自动删除/temp 里的内容,故才重新更换数据保存目录。

4、修改各自的 zoo.cfg 配置文件

#修改数据保存目录

dataDir=/data/zookeeper/data/

dataDirLog/data/zookeeper/log/

#增加主机信息列表

server.1=zk-server-01:2888:3888

server.2=zk-server-02:2888:3888

server.3=zk-server-03:2888:3888

所有主机配置采用的格式:server.主机编号=主机名称:程序监听端口:程序选举端口

5、在各自的 ZooKeeper 之中所有主机编号都在工作目录中以“myid”的文件形式出现。

以 zk-server-01 为例:echo 1 >> /data/zookeeper/data/myid

6、启动各个 ZooKeeper 服务,分别查看各自 ZooKeeper 服务的状态,从中具有状态为 leader 模式的主机。


8-Java 操作 ZooKeeper


使用 Java 操作 ZooKeeper

ZooKeeper 作为各个系统的协调组件,应通过程序进行 ZooKeeper 开发操作。ZooKeeper 代表新一代的组件,新旧组件最明显的区别就在于集群的支持上。

一、连接 ZooKeeper 服务

进行 ZooKeeper 连接,建议把所有的 ZooKeeper 的 ip 地址全部写上,以防全程能够稳定使用 ZooKeeper 组件。同时,需要保证 ZooKeeper 服务能够正常使用。如果是集群状态下,至少保证有两台以上机器。

1.1、创建 Maven 项目,更改项目属性,导入 ZooKeeper 开发包


<dependency>			<groupId>io.netty</groupId>			<artifactId>netty</artifactId>			<version>3.10.6.Final</version>		</dependency>		<dependency>			<groupId>jline</groupId>			<artifactId>jline</artifactId>			<version>2.14.3</version>		</dependency>		<dependency>			<groupId>org.apache.zookeeper</groupId>			<artifactId>zookeeper</artifactId>			<version>3.4.10</version>			<type>pom</type>		</dependency>
复制代码


1.2、进行 ZooKeeper 连接,需要依赖 org.apache.zookeeper.ZooKeeper 类进行,使用其构造方法。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)throws IOException

  • connectString:连接的所有 ZooKeeper 地址,不填写则默认端口 2181;

  • sessionTimeout:连接超时时间(毫秒);

  • watcher:监听配置。


问题:

为什么 2181 端口与之前设置的 2888 端口不一致?

Java 源代码:


package own.fuys.ownzk;import java.util.Iterator;import java.util.List;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo {	private static Logger logger = LoggerFactory.getLogger(ZKDemo.class);	public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130";	public static int SESSION_TIMEOUT = 2000;		public static void main(String[] args) throws Exception{		ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {			public void process(WatchedEvent event) {			}		});		if(zk.exists("/", false)!=null){			List<String> children = zk.getChildren("/", false);			Iterator<String> iter = children.iterator();			while(iter.hasNext()){				logger.info("ChildNode --> " + iter.next());			}		}		zk.close();	}}
复制代码


二、ACL 授权控制

为了保证安全性,ZooKeeper 提供了 ACL(访问控制列表)认证与授权处理机制。但实际上,其意义不大,因为 ZooKeeper 保存的是基础信息,而这些信息也不会以明文的形式出现。一般情况下,不会对根节点进行授权控制,而是会对子节点进行控制。如果要使用授权控制,那么,如果 A 人员需要对 ZooKeeper 进行集群配置,而又不想遭到其他人员的破坏,则可以使用授权控制。

ZooKeeper 节点操作权限

增、删、改、查、管理权限也就是 CREATE、READ、WRITE、DELETE、ADMIN,权限简写为 crwda。

ZooKeeper 认证方式

  • world:默认方式,全世界都能访问;

  • auth:代表已经认证通过的用户,客户端 cli 可以通过 addauth digest user:pwd 来添加当前的上下文中的授权用户;

  • digest:用户名:密码的方式认证,这也是业务中最常用的;

  • ip:使用 ip 地址认证。

2.1、使用 ZooKeeper 客户端工具登陆 ZooKeeper

/app/zookeeper/bin/zkCli.sh -server zk-server-01


2.2、添加授权控制的用户

addauth digest ysfu:fuys0822

2.3、设置授权用户的操作权限控制

setAcl / auth:ysfu:fuys0822:cdrwa

返回结果:

cZxid = 0x0

ctime = Thu Jan 01 08:00:00 CST 1970

mZxid = 0x0

mtime = Thu Jan 01 08:00:00 CST 1970

pZxid = 0x100000004

cversion = 0

dataVersion = 0

aclVersion = 1

ephemeralOwner = 0x0

dataLength = 0

numChildren = 2

检测授权是否正确,可以通过登陆其他客户端,直接进行部分操作,客户端是否授权判断。例如执行 ls /,则会提示 Authentication is not valid : /,再次输入添加授权控制的用户(addauth digest ysfu:fuys0822),重新再次操作,即可成功。


Java 源代码:






package own.fuys.ownzk;import java.util.Iterator;import java.util.List;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo { private static Logger logger = LoggerFactory.getLogger(ZKDemo.class); public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130"; public static int SESSION_TIMEOUT = 2000; public static String AUTH_INFO = "ysfu:fuys0822"; public static void main(String[] args) throws Exception{ ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { } }); zk.addAuthInfo("digest", AUTH_INFO.getBytes()); if(zk.exists("/", false)!=null){ List<String> children = zk.getChildren("/", false); Iterator<String> iter = children.iterator(); while(iter.hasNext()){ logger.info("ChildNode --> " + iter.next()); } } zk.close(); }}
复制代码


三、创建节点

ZooKeeper 重要的一个原因是可以创建临时节点。

创建节点使用如下方法:

public String create(final String path, byte data[], List acl,CreateMode createMode)

方法的注意点:

  • List acl:进行安全认证,所有节点的认证信息保存在 org.apache.zookeeper.ZooDefs.Ids 中,其中所有都可以访问,使用 public final ArrayList OPEN_ACL_UNSAFE 常量。

  • CreateMode createMode:节点的模式,使用枚举 org.apache.zookeeper.CreateMode 获取:PERSISTENT:持久化节点;PERSISTENT_SEQUENTIAL:持久序列化节点;EPHEMERAL:暂时性节点,当连接关闭时,节点自动消失;EPHEMERAL_SEQUENTIAL:暂时序列化节点。

3.1、创建普通持久化节点

Java 源代码:


package own.fuys.ownzk;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo {	private static Logger logger = LoggerFactory.getLogger(ZKDemo.class);	public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130";	public static int SESSION_TIMEOUT = 2000;	public static String AUTH_INFO = "ysfu:fuys0822";	public static void main(String[] args) throws Exception {		ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {			public void process(WatchedEvent event) {			}		});		zk.addAuthInfo("digest", AUTH_INFO.getBytes());		if(zk.exists("/fuys/yun", false)==null){			String create = zk.create("/fuys/yun", "No Death".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);			logger.info("create node --> " + create);		}		zk.close();	}}
复制代码


3.2、创建普通暂时性节点

Java 源代码:


package own.fuys.ownzk;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo {	private static Logger logger = LoggerFactory.getLogger(ZKDemo.class);	public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130";	public static int SESSION_TIMEOUT = 2000;	public static void main(String[] args) throws Exception {		ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {			public void process(WatchedEvent event) {			}		});		zk.addAuthInfo("digest", AUTH_INFO.getBytes());				if(zk.exists("/fuys/song", false)==null){			String create = zk.create("/fuys/song", "No Death".getBytes(), 					ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);			logger.info("create node --> " + create);				}		zk.close();	}}
复制代码


3.3、创建暂时序列化节点

Java 源代码:


package own.fuys.ownzk;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooDefs;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo {	private static Logger logger = LoggerFactory.getLogger(ZKDemo.class);	public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130";	public static int SESSION_TIMEOUT = 2000;	public static String AUTH_INFO = "ysfu:fuys0822";	public static void main(String[] args) throws Exception {		ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {			public void process(WatchedEvent event) {			}		});		zk.addAuthInfo("digest", AUTH_INFO.getBytes());		if(zk.exists("/fuys/song", false)==null){			for(int i=0;i<10;i++){				String create = zk.create("/fuys/song", "No Death".getBytes(), 						ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);				logger.info("create EPHEMERAL_SEQUENTIAL node --> " + create);			}		}		zk.close();	}}
复制代码


3.4、节点数据操作

节点数据的操作,包含取得,修改,删除。


package own.fuys.ownzk;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo {	private static Logger logger = LoggerFactory.getLogger(ZKDemo.class);	public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130";	public static int SESSION_TIMEOUT = 2000;	public static String AUTH_INFO = "ysfu:fuys0822";	public static void main(String[] args) throws Exception {		ZooKeeper zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {			public void process(WatchedEvent event) {			}		});		zk.addAuthInfo("digest", AUTH_INFO.getBytes());		if(zk.exists("/fuys", false)!=null){			byte[] data = zk.getData("/fuys", false, null);			logger.info("Get data --> " + new String(data));			Stat setData = zk.setData("/fuys", "No death".getBytes(), -1);			logger.info("Set data --> " + setData);		}		if(zk.exists("/fuys/song", false)!=null){			logger.info("Delete /fuys/song");			zk.delete("/fuys/song", -1);		}		zk.close();	}}
复制代码


3.5、数据监听

在 ZooKeeper 组件中,提供了节点以及所有子节点的监听,但是,需要注意的是,监听只能一次。

Java 源代码:


package own.fuys.ownzk;import java.util.Iterator;import java.util.List;import org.apache.zookeeper.KeeperException;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class ZKDemo {	private static Logger logger = LoggerFactory.getLogger(ZKDemo.class);	public static String CONNECT_STRING = "192.168.6.128:2181,192.168.6.129,192.168.6.130";	public static int SESSION_TIMEOUT = 2000;	public static String AUTH_INFO = "ysfu:fuys0822";	public static ZooKeeper zk = null;	public static void main(String[] args) throws Exception {		zk = new ZooKeeper(CONNECT_STRING, SESSION_TIMEOUT, new Watcher() {			public void process(WatchedEvent event) {				try {					zk.exists("/fuys", true);				} catch (KeeperException e) {					e.printStackTrace();				} catch (InterruptedException e) {					e.printStackTrace();				}			}		});		zk.addAuthInfo("digest", AUTH_INFO.getBytes());		if(zk.exists("/fuys", true)!=null){			logger.info("Watch node --> /fuys");			List<String> children = zk.getChildren("/fuys", true);			Iterator<String> iter = children.iterator();			while(iter.hasNext()){				logger.info("Watch children node --> " + iter.next());			}		}		Thread.sleep(Integer.MAX_VALUE);		zk.close();	}}
复制代码


实验结果没有进行监听,


用户头像

andy

关注

还未添加个人签名 2019-11-21 加入

还未添加个人简介

评论

发布
暂无评论
ZooKeeper浅析(二)_andy_InfoQ写作社区