1.ZooKeeper 的数据模型、节点类型与应用
zk 基础有三大模块:数据模型、ACL 权限控制、Watch 监控。其中数据模型最重要,zk 很多典型的应用场景都是利用其数据模块实现的。比如可利用数据模型中的临时节点和 Watcher 监控来实现分布式发布订阅。
(1)数据模型之树形结构
计算机最根本的作用其实就是处理和存储数据,作为一款分布式一致性框架,zk 也是如此。数据模型就是 zk 用来存储和处理数据的一种逻辑结构,zk 数据模型最根本的功能就像一个数据库。接下来按如下步骤做一些简单操作,最终会在 zk 服务器上得到一个具有层级关系的数据结构。
//步骤一:配置文件zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
//步骤二:服务启动
bin.zkServer.sh start
//步骤三:使用客户端连接服务器
bin/zkCli.sh -server 127.0.0.1:2181
//步骤四:完成单机版的开发环境构建之后,便通过zk提供的create命令来创建几个节点
create /locks
create /servers
create /works
复制代码
zk 中的数据模型是一种树形结构,有一个根文件夹,下面有很多子文件夹。zk 的数据模型也具有一个固定的根节点"/",可以在根节点下创建子节点,并在子节点下继续创建下一级节点。zk 树中的每一层级用斜杠"/"分隔开,只能用绝对路径的方式查询 zk 节点,如"get /work/task1",不能用相对路径的方式查询 zk 节点。
(2)ZNode 节点类型与特性(持久 + 临时 + 顺序)
zk 中的数据节点分为持久节点、临时节点和顺序节点三种类型。
一.持久节点
持久节点在 zk 最为常用的,几乎所有业务场景中都会包含持久节点的创建。之所以叫作持久节点是因为:一旦将节点创建为持久节点,节点数据就会一直存储在 ZK 服务器上。即使创建该节点的客户端与服务端的会话关闭了,该节点依然不会被删除。如果想要删除持久节点,需要显式调用 delete 函数进行删除。
二.临时节点
临时节点从名称上可以看出它最重要的一个特性就是临时性。所谓临时性是指:如果将节点创建为临时节点,那么该节点数据不会一直存储在 zk 服务器上。当创建该临时节点的客户端,因会话超时或发生异常而关闭时,该节点也相应在 zk 服务器上被删除。同样,可以像删除持久节点一样主动删除临时节点。
可以利用临时节点的临时特性来进行服务器集群内机器运行情况的统计。比如将集群设置为"/servers"节点,并为集群下的每台服务器创建一个临时节点"/servers/host",当服务器下线时其对应的临时节点就会自动被删除,最后统计临时节点个数就可以知道集群中的运行情况。
三.顺序节点
其实顺序节点并不算是一种单独种类的节点,而是在持久节点和临时节点特性的基础上,增加一个节点有序的性质。所谓节点有序是指:创建顺序节点时,zk 服务器会自动使用一个单调递增的数字作为后缀追加到所创建节点后面。
例如一个客户端创建了一个路径为"works/task-"的有序节点,那么 zk 将会生成一个序号并追加到该节点的路径后,最后该节点的路径就会变为"works/task-1"。通过这种方式可以直观查看到节点的创建顺序。
小结:
zk 服务器上存储数据的模型是一种树形结构。zk 中的数据节点类型有:持久节点、持久顺序节点、临时节点和临时顺序节点。这几种数据节点虽然类型不同,但每个数据节点都有一个二进制数组(byte data[]),每个数据节点都有一个记录自身状态信息的字段 stat。其中二进制数组会用来存储节点的数据、ACL 访问控制信息、子节点数据。
注意:因为临时节点不允许有子节点,所以临时节点的子节点为 null。
(3)节点的状态结构(各种 zxid + 各种 version)
每个节点都有属于自己的状态信息,就像每个人都有其身份证信息一样。打开 zk 客户端,执行"stat /zk_test",可以看到控制台输出一些信息。如下这些信息就是节点状态信息:
cZxid = 5
ctime = Wed Feb 09 18:17:23 CST 2022
mZxid = 5
mtime = Wed Feb 09 18:17:23 CST 2022
pZxid = 5
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100011d485f0007
dataLength = 2
numChildren = 0
复制代码
每个节点都有一个自己的状态属性,这些状态属性记录了节点本身的一些信息,包括如下内容:
(4)节点的版本(version + cversion + aversion)
zk 为数据节点引入了版本的概念。每个数据节点有 3 种类型的版本信息:version、cversion、aversion,对数据节点的任何更新操作都会引起版本号的变化。
zk 的版本信息表示的是:对节点数据内容、子节点信息或者是 ACL 信息的修改次数。
(5)使用 ZooKeeper 实现锁(悲观锁 + 乐观锁)
一.悲观锁
悲观锁认为线程对数据资源的竞争总是会出现。为了保证线程在操作数据时,该条数据不会被其他线程修改,那么该条数据要一直处于被锁定的状态。
假设有 n 个线程同时访问和修改某一数据资源,为了实现线程安全,可以让线程通过创建 zk 节点"/locks"的方式获取锁。线程 A 成功创建 zk 节点"/locks"后获取到锁继续执行,这时线程 B 也要访问该数据资源,于是线程 B 尝试创建 zk 节点"/locks"来尝试获取锁。但是因为线程 A 已创建该节点,所以线程 B 创建节点失败而无法获得锁。这样线程 A 操作数据时,数据就不会被其他线程修改,从而实现了一个简单的悲观锁。
不过存在一个问题:
就是如果进程 A 因为异常而中断,那么就会导致"/locks"节点始终存在。此时其他线程就会因为无法再次创建节点而无法获取锁,从而产生死锁问题。针对这个问题,可以通过将节点设置为临时节点来进行避免,并通过在服务器端添加监听事件来实现锁被释放时可以通知其他线程重新获取锁。
二.乐观锁
乐观锁认为线程对数据资源的竞争不会总是出现。所以相对悲观锁而言,加锁方式没有那么激烈,不会全程锁定数据。而是在数据进行提交更新时,才对数据进行冲突检测,如果发现冲突才拒绝操作。
乐观锁基本可以分为读取、校验、写入三个步骤。CAS(Compare-And-Swap 即比较并替换)就是一个乐观锁的实现。CAS 有 3 个操作数:内存值 V、旧的预期值 A、要修改的新值 B,当且仅当预期值 A 和内存值 V 相同时,才会将内存值 V 修改为 B。
zk 中的 version 属性就是用来实现乐观锁机制中的"校验"的,zk 每个节点都有数据版本的概念。在调用更新操作时,假如有一个客户端试图进行更新操作,该客户端会携带上次获取到的 version 值进行更新。如果在这段时间内,zk 服务器上该节点的数值恰好被其他客户端更新了,那么该节点的数据版本 version 值一定也会发生变化,从而导致与客户端携带的 version 无法匹配,于是无法成功更新。因此有效避免了分布式更新的并发安全问题。
在 zk 的底层实现中,当服务端处理每一个数据更新请求(SetDataRequest)时,首先会调用 checkAndIncVersion()方法进行数据版本校验:
步骤一:从 SetDataRequest 请求中获取 version。
步骤二:通过 getRecordForPath()方法获取服务器数据记录 nodeRecord,然后从 nodeRecord 中获取当前服务器上该数据的最新版本 currentversion。如果 version 是-1,表示该请求操作不使用乐观锁,可以忽略版本对比。如果 version 不是-1,则对比 version 和 currentversion。若相等则进行更新,否则抛出 BadVersionException 异常中断。
(6)总结
zk 的基础内容包括:数据模型、节点类型、stat 状态属性等。利用这些内容可解决:集群中服务器运行情况统计、悲观锁、乐观锁等。
了解 zk 数据模型的基本原理后,有一个问题:为什么 zk 不能采用相对路径查找节点?
因为 zk 大多应用场景是定位数据模型上的节点,并在相关节点上进行操作。对于这种查找与给定值相等的问题,最适合用散列表来解决。因此 zk 在底层实现时,使用了一个 ConcurrentHashMap 来存储节点数据,用节点的完整路径作为 key 去存储节点数据,可以大大提高 zk 的查找性能。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private ZKDatabase zkDb;
private FileTxnSnapLog txnLogFactory = null;
...
}
public class ZKDatabase {
protected DataTree dataTree;
protected FileTxnSnapLog snapLog;
...
}
public class DataTree {
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
}
复制代码
2.发布订阅模式:用 Watcher 机制实现分布式通知
下面介绍 zk 另一关键技术——Watcher 机制,并用它实现一个发布订阅功能。
zk 的 Watcher 机制的整个流程:客户端在向 zk 服务端注册 Watcher 的同时,会将 Watcher 对象存储在客户端的 WatchManger 中。当 zk 服务端触发 Watcher 事件后,会向客户端发送通知。客户端线程会从 WatchManager 中取出对应的 Watcher 对象来执行回调逻辑。
zk 的 Watcher 机制主要包括三部分:客户端线程、客户端 WatchManager、zk 服务端;
zk 的 Watcher 机制主要包括三个过程:即客户端注册 Watcher、服务端处理 Watcher、客户端回调 Watcher,这三个过程其实也是发布订阅功能的几个核心点。
(1)Watcher 机制是如何实现的
一.客户端向服务端添加 Watcher 监控事件的方式
zk 的客户端可以通过 Watcher 机制来订阅服务端上某一节点的数据,以便当该节点的数据状态发生变化时能收到相应的通知。
比如可以通过向 zk 客户端的构造方法中传递 Watcher 参数的方式实现添加 Watcher 监控事件。下面代码的意思是定义了一个了 zk 客户端对象实例,并传入三个参数。这个 Watcher 将作为整个 zk 会话期间的默认 Watcher,一直被保存在客户端 ZKWatchManager 的 defaultWatcher 中。
new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
connectString:服务端地址
sessionTimeout:超时时间
Watcher:监控事件
复制代码
除此之外,zk 客户端也可以通过 getData()、getChildren()和 exists()这三个接口,向 zk 服务端注册 Watcher,从而可以方便地在不同的情况下添加 Watcher 事件。
getData(String path, Watcher watcher, Stat stat);
复制代码
二.触发服务端 Watcher 监控事件通知的条件
zk 中的接口类 Watcher 用于表示一个标准的事件处理器,Watcher 接口类定义了事件通知的相关逻辑。Watcher 接口类中包含了 KeeperState 和 EventType 两个枚举类,其中 KeeperState 枚举类代表会话通知状态,EventType 枚举类代表事件类型。
下图列出了客户端在常见的会话状态下,服务器节点所能支持的事件类型。例如在客户端连接服务端时:可以对数据节点的创建、删除、数据变更、子节点的更新等操作进行监控。
至此已从应用层的角度介绍完 zk 中的 Watcher 机制了,通过几个简单的 API 调用就可以对服务器的节点状态变更进行监控。但在实际生产环境中还会遇到很多意想不到的问题,要想解决好这些问题就要深入理解 Watcher 机制的底层实现原理。
(2)Watcher 机制的底层原理
下面从设计模式角度出发来分析其底层实现:
Watcher 机制的结构其实很像设计模式中的观察者模式。一个对象或者数据节点可能会被多个客户端监控,当对应事件被触发时,会通知这些对象或客户端。可以将 Watcher 机制理解为是分布式环境下的观察者模式,所以接下来以观察者模式的角度来看 zk 底层 Watcher 是如何实现的。
在实现观察者模式时,最关键的代码就是创建一个列表来存放观察者。而在 zk 中则是在客户端和服务端分别实现了两个存放观察者列表:也就是客户端的 ZKWatchManager 和服务端的 WatchManager。zk 的 Watcher 机制的核心操作其实就是围绕这两个列表展开的。
(3)客户端 Watcher 注册实现过程
先看客户端的实现过程。当发送一个带有 Watcher 事件的会话请求时,zk 客户端主要会做两个工作:一.标记该会话请求是一个带有 Watcher 事件的请求,二.将 Watcher 事件存储到 ZKWatchManager 中。
整个客户端 Watcher 的注册流程如下:
以 getData 接口为例,当 zk 客户端发送一个带有 Watcher 事件的会话请求时:
一.首先将数据节点和 Watcher 的对应关系封装到 DataWatchRegistration
然后把该请求标记为带有 Watcher 事件的请求,这里说的封装其实指的是 new 一个对象。
二.接着将请求封装成一个 Packet 对象并添加到发送队列 outgoingQueue
Packet 对象被添加到发送队列 outgoingQueue 后,会进行阻塞等待。也就是通过 Packet 对象的 wait()方法进行阻塞,直到 Packet 对象标记为完成。
三.随后通过客户端 SendThread 线程向服务端发送 outgoingQueue 里的请求
也就是通过 ClientCnxnSocket 的 doTransport()方法处理请求发送和响应。
四.完成请求发送后,客户端 SendThread 线程会监听并处理服务端响应
也就是由 ClientCnxn 的内部类 SendThread 的 readResponse()方法负责接处理务端响应,然后执行 ClientCnxn 的 finishPacket()方法从 Packet 对象中取出对应的 Watcher。即通过调用 WatchRegistration 的 register()方法,将 Watcher 事件注册到 ZooKeeper 的 ZKWatchManager 中。
因为客户端一开始将 Watcher 事件封装到 DataWatchRegistration 对象中,所以在调用 WatchRegistration 的 register()方法时,客户端就会将之前封装在 DataWatchRegistration 的 Watcher 事件交给 ZKWatchManager,并最终保存到 ZKWatchManager 的 dataWatches 中。
ZKWatchManager 的 dataWatches 是一个 Map<String, Set<Watcher>>,用于将数据节点的路径和 Watcher 事件进行一一映射后管理起来。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
public byte[] getData(final String path, Watcher watcher, Stat stat) {
final String clientPath = path;
PathUtils.validatePath(clientPath);
WatchRegistration wcb = null;
if (watcher != null) {
//1.封装DataWatchRegistration对象暂存数据节点和Watcher的对应关系;
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
//1.客户端首先会把该会话请求标记为带有Watcher事件的请求
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
//把封装好的请求和响应交给ClientCnxn客户端进行处理
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
...
}
public class ClientCnxn {
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
...
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) {
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
ReplyHeader r = new ReplyHeader();
//2.接着将请求封装成一个Packet对象并添加到一个发送队列outgoingQueue
Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
//阻塞等待
waitForPacketFinish(r, packet);
} else {
//阻塞等待
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
Packet packet = null;
//2.接着将请求封装成一个Packet对象
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (outgoingQueue) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//2.将封装好的Packet对象添加到一个发送队列outgoingQueue
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
class SendThread extends ZooKeeperThread {
...
@Override
public void run() {
//把outgoingQueue传入clientCnxnSocket中
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
...
while (state.isAlive()) {
//建立和服务端的连接等
if (!clientCnxnSocket.isConnected()) {
if (closing) {
break;
}
if (rwServerAddress != null) {
serverAddress = rwServerAddress;
rwServerAddress = null;
} else {
serverAddress = hostProvider.next(1000);
}
onConnecting(serverAddress);
startConnect(serverAddress);
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
}
...
//3.将outgoingQueue里的请求发送出去 + 处理接收到的响应
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
...
}
}
//读取服务端发送过来的输入流
void readResponse(ByteBuffer incomingBuffer) {
...
finishPacket(packet);
}
}
...
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
...
}
}
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
...
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {
...
doIO(pendingQueue, cnxn);
...
}
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {
SocketChannel sock = (SocketChannel) sockKey.channel();
...
//处理接收响应
if (sockKey.isReadable()) {
...
sendThread.readResponse(incomingBuffer);
...
}
//处理发送请求
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
...
p.createBB();
sock.write(p.bb);
outgoingQueue.removeFirstOccurrence(p);
pendingQueue.add(p);
...
}
...
}
}
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
public abstract class WatchRegistration {
private Watcher watcher;
private String clientPath;
...
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}
abstract protected Map<String, Set<Watcher>> getWatches(int rc);
protected boolean shouldAddWatch(int rc) {
return rc == 0;
}
}
class DataWatchRegistration extends WatchRegistration {
...
@Override
protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}
}
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
...
}
}
复制代码
注意:客户端每调用一次 getData()接口,就会注册一个 Watcher,但这些 Watcher 实体不会随着客户端请求被发送到服务端。如果客户端的所有 Watcher 都被发送到服务端的话,服务端可能就会出现内存紧张或其他性能问题。虽然封装 Packet 对象的时候会传入 DataWatchRegistration 对象,但是在底层实际的网络传输对 Packet 对象序列化的过程中,并没有将 DataWatchRegistration 对象序列化到字节数组。
具体来说,在 Packet 的 createBB()方法中,zk 只会将 requestHeader 和 request 两个属性进行序列化。尽管 DataWatchRegistration 对象被封装在了 Packet 中,但是并没有被序列化到底层字节数组里去,因此不会进行网络传输。
public class ClientCnxn {
...
static class Packet {
RequestHeader requestHeader;
ReplyHeader replyHeader;
Record request;
Record response;
ByteBuffer bb;
...
public void createBB() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
boa.writeInt(-1, "len"); // We'll fill this in later
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
baos.close();
this.bb = ByteBuffer.wrap(baos.toByteArray());
this.bb.putInt(this.bb.capacity() - 4);
this.bb.rewind();
}
...
}
...
}
复制代码
(4)服务端处理 Watcher 过程
zk 服务端处理 Watcher 事件基本有两个过程:
一.判断收到的请求是否需要注册 Watcher 事件
二.将对应的 Watcher 事件存储到 WatchManager
以下是 zk 服务端处理 Watcher 的序列图:
zk 服务端接收到客户端请求后的具体处理:
一.当服务端收到客户端标记了 Watcher 事件的 getData 请求时,会调用到 FinalRequestProcessor 的 processRequest()方法,判断当前客户端请求是否需要注册 Watcher 事件。
二.当 getDataRequest.getWatch()的值为 true 时,则表明当前客户端请求需要进行 Watcher 注册。
三.然后将当前的 ServerCnxn 对象(即 Watcher 事件)和数据节点路径,传入到 zks.getZKDatabase()的 getData()方法中来实现 Watcher 事件的注册,也就是实现存储 Watcher 事件到 WatchManager 中。具体就是:调用 DataTree.dataWatches 这个 WatchManager 的 addWatch()方法,将该客户端请求的 Watcher 事件(也就是 ServerCnxn 对象)存储到 DataTree.dataWatches 这个 WatchManager 的两个 HashMap(watchTable 和 watch2Paths)中。
补充说明:
首先,ServerCnxn 对象代表了一个客户端和服务端的连接。ServerCnxn 接口的默认实现是 NIOServerCnxn,也可以选 NettyServerCnxn。由于 NIOServerCnxn 和 NettyServerCnxn 都实现了 Watcher 的 process 接口,所以可以把 ServerCnxn 对象看作是一个 Watcher 对象。
然后,zk 服务端的数据库 DataTree 中会有两个 WatchManager,分别是 dataWatches 和 childWatches,分别对应节点和子节点数据变更。
接着,WatchManager 中有两个 HashMap:watch2Paths 和 watchTable。当前的 ServerCnxn 对象和数据节点路径最终会被存储在这两 HashMap 中。watchTable 可以根据数据节点路径来查找对应的 Watcher,watch2Paths 可以根据 Watcher 来查找对应的数据节点路径。
同时,WatchManager 除了负责添加 Watcher 事件,还负责触发 Watcher 事件,以及移除那些已经被触发的 Watcher 事件。
public class FinalRequestProcessor implements RequestProcessor {
ZooKeeperServer zks;
...
public void processRequest(Request request) {
...
ServerCnxn cnxn = request.cnxn;
...
switch (request.type) {
...
case OpCode.getData: {
...
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
...
}
case OpCode.getChildren: {
...
List<String> children = zks.getZKDatabase().getChildren(getChildrenRequest.getPath(), null, getChildrenRequest.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
...
}
}
...
}
}
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private ZKDatabase zkDb;
private FileTxnSnapLog txnLogFactory = null;
...
public ZKDatabase getZKDatabase() {
return this.zkDb;
}
...
}
public class ZKDatabase {
protected DataTree dataTree;
protected FileTxnSnapLog snapLog;
...
public byte[] getData(String path, Stat stat, Watcher watcher) {
return dataTree.getData(path, stat, watcher);
}
public List<String> getChildren(String path, Stat stat, Watcher watcher) {
return dataTree.getChildren(path, stat, watcher);
}
...
}
public class DataTree {
private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
public byte[] getData(String path, Stat stat, Watcher watcher) {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
public List<String> getChildren(String path, Stat stat, Watcher watcher) {
DataNode n = nodes.get(path);
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
childWatches.addWatch(path, watcher);
}
return new ArrayList<String>(n.getChildren());
}
}
...
}
class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>();
private final HashMap<Watcher, HashSet<String>> watch2Paths = new HashMap<Watcher, HashSet<String>>();
...
synchronized void addWatch(String path, Watcher watcher) {
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
...
}
复制代码
(5)服务端 Watch 事件的触发过程
对于标记了 Watcher 注册的请求,zk 会将其对应的 ServerCnxn 对象(Watcher 事件)存储到 DataTree 里的 WatchManager 的 HashMap(watchTable 和 watch2Paths)中。之后,当服务端对指定节点进行数据更新后,会通过调用 DataTree 里的 WatchManager 的 triggerWatch()方法来触发 Watcher。
无论是触发 DataTree 的 dataWatches,还是触发 DataTree 的 childWatches,Watcher 的触发逻辑都是一样的。
具体的 Watcher 触发逻辑如下:
步骤一:首先封装一个具有这三个属性的 WatchedEvent 对象:通知状态(KeeperState)、事件类型(EventType)、数据节点路径(path)。
步骤二:然后根据数据节点路径从 DateTree 的 WatchManager 中取出 Watcher。如果为空,则说明没有任何客户端在该数据节点上注册过 Watcher。如果存在,则将 Watcher 事件添加到自定义的 Wathcers 集合中,并且从 DataTree 的 WatchManager 的 watchTable 和 watch2Paths 中移除。最后调用 Watcher 的 process()方法向客户端发送通知。
public class DataTree {
private final WatchManager dataWatches = new WatchManager();
private final WatchManager childWatches = new WatchManager();
...
public Stat setData(String path, byte data[], int version, long zxid, long time) {
Stat s = new Stat();
DataNode n = nodes.get(path);
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
...
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
...
}
class WatchManager {
...
Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}
...
}
复制代码
具体的 Watcher 的 process()方法,会由 NIOServerCnxn 来实现。Watcher 的 process()方法的具体逻辑如下:
步骤一:标记响应头 ReplyHeader 的 xid 为-1,表示当前响应是一个通知。
步骤二:将触发 WatchManager.triggerWatch()方法时封装的 WatchedEvent,包装成 WatcherEvent,以便进行网络传输序列化。
步骤三:向客户端发送响应。
public interface Watcher {
abstract public void process(WatchedEvent event);
...
}
public abstract class ServerCnxn implements Stats, Watcher {
...
public abstract void process(WatchedEvent event);
}
public class NIOServerCnxn extends ServerCnxn {
...
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
sendResponse(h, e, "notification");
}
}
复制代码
(6)客户端回调 Watcher 的处理过程
对于来自服务端的响应:客户端会使用 SendThread 的 readResponse()方法来进行统一处理。如果反序列化后得到的响应头 replyHdr 的 xid 为-1,则表明这是一个通知类型的响应。
SendThread 接收事件通知的处理步骤如下:
步骤一:反序列化成 WatcherEvent 对象
zk 客户端接收到请求后,首先将字节流反序列化成 WatcherEvent 对象。
步骤二:处理 chrootPath
如果客户端设置了 chrootPath 为/app,而服务端响应的节点路径为/app/a,那么经过 chrootPath 处理后,就会统一变成一个相对路径:/a。
步骤三:还原成 WatchedEvent 对象
将 WatcherEvent 对象转换成 WatchedEvent 对象。
步骤四:回调 Watcher
通过调用 EventThread 的 queueEvent()方法,将 WatchedEvent 对象交给 EventThread 线程来回调 Watcher。所以服务端的 Watcher 事件通知,最终会交给 EventThread 线程来处理。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
}
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
...
class SendThread extends ZooKeeperThread {
...
@Override
public void run() {
clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
...
while (state.isAlive()) {
...
//将outgoingQueue里的请求发送出去 + 处理接收到的响应
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
...
}
}
void readResponse(ByteBuffer incomingBuffer) {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
...
//处理事务回调
if (replyHdr.getXid() == -1) {
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else
...
}
WatchedEvent we = new WatchedEvent(event);
eventThread.queueEvent( we );
return;
}
...
finishPacket(packet);
}
...
}
}
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
...
@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {
...
doIO(pendingQueue, cnxn);
...
}
void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {
SocketChannel sock = (SocketChannel) sockKey.channel();
...
//处理接收响应
if (sockKey.isReadable()) {
...
sendThread.readResponse(incomingBuffer);
...
}
//处理发送请求
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
...
p.createBB();
sock.write(p.bb);
outgoingQueue.removeFirstOccurrence(p);
pendingQueue.add(p);
...
}
...
}
}
复制代码
EventThread 线程是 zk 客户端专门用来处理服务端事件通知的线程。
EventThread 处理事件通知的步骤如下:
步骤一:EventThread 的 queueEvent()方法首先会根据 WatchedEvent 对象,从 ZKWatchManager 中取出所有注册过的客户端 Watcher。
步骤二:然后从 ZKWatchManager 的管理中删除这些 Watcher。这也说明客户端的 Watcher 机制是一次性的,触发后就会失效。
步骤三:接着将所有获取到的 Watcher 放入 waitingEvents 队列中。
步骤四:最后 EventThread 线程的 run()方法,通过循环的方式,每次都会从 waitingEvents 队列中取出一个 Watcher 进行串行同步处理。也就是调用 EventThread 线程的 processEvent()方法来最终执行实现了 Watcher 接口的 process()方法,从而实现回调处理。
public class ClientCnxn {
final SendThread sendThread;
final EventThread eventThread;
private final ClientWatchManager watcher;
...
class EventThread extends ZooKeeperThread {
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
//通过以下这两个变量来实现waitingEvents为空时,加入的Watcher要马上执行,而不用等待run()方法
private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
...
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) {
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
//对WatchedEvent对象进行处理,从ZKWatchManager的管理中删除这些Watcher
watchers = watcher.materialize(event.getState(), event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
//将获取到的所有Watcher放入waitingEvents队列
waitingEvents.add(pair);
}
...
public void run() {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
}
}
private void processEvent(Object event) {
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
watcher.process(pair.event);
}
}
...
}
...
}
}
public class ZooKeeper implements AutoCloseable {
...
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
...
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
...
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
...
}
return result;
}
final private void addTo(Set<Watcher> from, Set<Watcher> to) {
if (from != null) {
to.addAll(from);
}
}
...
}
...
}
复制代码
总结 zk 的 Watcher 机制处理过程:
一.zk 是通过在客户端和服务端创建观察者信息列表来实现 Watcher 机制的。
二.客户端调用 getData()、getChildren()、exist()等方法时,会将 Watcher 事件放到本地的 ZKWatchManager 中进行管理。
三.服务端在接收到客户端的请求后首先判断是否需要注册 Watcher,若是则将 ServerCnxn 对象当成 Watcher 事件放入 DataTree 的 WatchManager 中。
四.服务端触发 Watcher 事件时,会根据节点路径从 WatchManager 中取出对应的 Watcher,然后发送通知类型的响应给客户端。
五.客户端在接收到通知类型的响应后,首先通过 SendThread 线程提取出 WatchedEvent 对象。然后将 WatchedEvent 对象交给 EventThread 线程来回调 Watcher。也就是查询本地的 ZKWatchManager 获得对应的 Watcher 事件,删除 ZKWatchManager 的 Watcher 并将 Watcher 放入 waitingEvents 队列。后续 EventThread 线程便会在其 run()方法中串行出队 waitingEvents,执行 Watcher 的 process()回调。
客户端的 Watcher 管理器是 ZKWatchManager。
服务端的 Watcher 管理器是 WatchManager。
以上的处理设计实现了一个分布式环境下的观察者模式,通过将客户端和服务端处理 Watcher 事件时所需要的信息分别保存在两端,减少了彼此通信的内容,大大提升了服务的处理性能。
(7)利用 Watcher 实现发布订阅
一.发布订阅系统一般有推模式和拉模式
推模式是指服务端主动将数据更新发送给所有订阅的客户端,拉模式是指客户端主动发起请求来获取最新数据(定时轮询拉取)。
二.zk 采用了推拉相结合来实现发布和订阅功能
首先客户端需要向服务端注册自己关注的节点(添加 Watcher 事件)。一旦该节点发生变更,服务端就会向客户端发送 Watcher 事件通知。客户端接收到消息通知后,需要主动到服务端获取最新的数据。
如果将配置信息放到 zk 上进行集中管理,那么应用启动时需要主动到 zk 服务端获取配置信息,然后在指定节点上注册一个 Watcher 监听。接着只要配置信息发生变更,zk 服务端就会实时通知所有订阅的应用。从而让应用能实时获取到所订阅的配置信息节点已发生变更了的消息。
注意:原生 zk 客户端可以通过 getData()、exists()、getChildren()三个方法,向 zk 服务端注册 Watcher 监听。而且注册到 Watcher 监听具有一次性,所以 zk 客户端获得服务端的节点变更通知后需要再次注册 Watcher。
三.使用 zk 来实现发布订阅功能总结
步骤一:将配置信息存储到 zk 的节点上。
步骤二:应用启动时先从 zk 节点上获取配置信息,然后再向该 zk 节点注册一个数据变更的 Watcher 监听。一旦该 zk 节点数据发生变更,所有订阅的客户端就能收到数据变更通知。
步骤三:应用收到 zk 服务端发过来的数据变更通知后重新获取最新数据。
(8)Watcher 具有的特性
一.一次性
无论是客户端还是服务端,一旦 Watcher 被触发或者回调,zk 都会将其移除,所以使用 zk 的 Watcher 时需要反复注册。这样的设计能够有效减轻服务端的压力。否则,如果一个 Watcher 注册后一直有效,那么频繁更新的节点就会频繁发送通知给客户端,这样就会影响网络性能和服务端性能。
二.客户端串行执行
客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序。注意不要因一个 Watcher 的处理逻辑而影响整个客户端的 Watcher 回调。
三.轻量
WatchedEvent 是 Watcher 机制的最小通知单元,WatchedEvent 只包含三部分内容:通知状态、事件类型和节点路径。所以 Watcher 通知非常简单,只告诉客户端发生的事件,不包含具体内容。所以原始数据和变更后的数据无法从 WatchedEvent 中获取,需要客户端主动重新去获取数据。
客户端向服务端注册 Watcher 时:不会把客户端真实的 Watcher 对象传递到服务端,只会在客户端请求中使用 boolean 属性来标记 Watcher 对象,服务端也只会保存当前连接的 ServerCnxn 对象。
这种轻量的 Watcher 设计机制,在网络开销和服务端内存开销上都是很低的。
(9)总结
有一个问题:当服务端某一节点发生数据变更操作时,所有曾经设置了该节点监控事件的客户端都会收到服务器的通知吗?
答案是否定的,通过对 zk 内部实现机制的解析可以知道:Watcher 事件的触发机制取决于会话的连接状态和客户端注册事件的类型,当客户端会话状态或数据节点发生改变时,都会触发对应的 Watcher 事件。Watcher 具有一次性,曾经的监控要重新监控。
3.ACL 权限控制:避免未经授权的访问
前面介绍完了数据模型、Watcher 监控机制,并实现了在分布式环境中经常用到的分布式锁、配置管理等功能,这些功能的本质都在于操作数据节点。如果作为分布式锁或配置项的数据节点被错误删除或修改,那么对整个分布式系统有很大的影响,甚至会造成严重的生产事故。所以 zk 提供了一个很好的解决方案,那就是 ACL 权限控制。
(1)ACL 的使用(scheme:id:permission)
如何使用 zk 的 ACL 机制来实现客户端对数据节点的访问控制。一个 ACL 权限设置通常可以分为 3 部分,分别是:权限模式(Scheme)、授权对象(ID)、权限信息(Permission),最终组成的一条 ACL 请求信息格式为"scheme:id:permission"。
一.权限模式:Scheme
权限模式就是用来设置 zk 服务器进行权限验证的方式。zk 的权限验证方式大体分为两种类型:一种是范围验证,另外一种是口令验证。但具体来分,则有 4 种权限模式:
模式一:所谓的范围验证就是 zk 可针对一个 IP 或一段 IP 授予某种权限
比如通过"ip:192.168.0.11"让某机器对服务器的一数据节点具有写入权限,或者也可以通过"ip:192.168.0.11/22"给一段 IP 地址的机器赋权。
模式二:另一种权限模式就是口令验证,也可以理解为用户名密码的方式
在 zk 中这种验证方式是 Digest 认证。即在向客户端传送"username:password"这种形式的权限表示符时,服务端会对密码部分使用 SHA-1 和 BASE64 算法进行加密以保证安全。
模式三:还有一种权限模式 Super 可以认为是一种特殊的 Digest 认证
具有 Super 权限的客户端可以对 zk 上的任意数据节点进行任意操作。
模式四:最后一种授权模式是 world 模式
其实这种授权模式对应于系统中的所有用户,本质上起不到任何作用,设置了 world 权限模式系统中的所有用户操作都可以不进行权限验证。
下面代码给出了 Digest 模式下客户端的调用方式:
create /digest_node1
setAcl /digest_node1 digest:用户名:base64格式密码:rwadc
getAcl /digest_node1
addauth digest user:passwd
复制代码
二.授权对象:ID
所谓的授权对象就是要把权限赋予谁,对应于 4 种不同的权限模式来说:
如果使用IP方式,那么授权对象可以是一个IP地址或IP地址段
如果使用Digest或Super方式,那么授权对象对应于一个用户名
如果使用World模式,那么就是授权系统中所有的用户
复制代码
三.权限信息:Permission
权限就是指可以在数据节点上执行的操作种类,zk 定义好的权限有 5 种:
数据节点(Create)创建权限,可以在该数据节点下创建子节点
数据节点(Wirte)更新权限,可以更新该数据节点
数据节点(Read)读取权限,可以读取该数据节点内容以及子节点信息
数据节点(Delete)删除权限,可以删除该数据节点的子节点
数据节点(Admin)管理者权限,可以对该数据节点体进行ACL权限设置
复制代码
需要注意的是:每个节点都有维护自身的 ACL 权限数据,即使是该节点的子节点也有自己的 ACL 权限而不是直接继承其父节点权限。所以如果客户端只配置"/Config"节点的读取权限,该客户端是没有其子节点的"/Config/dataBase"的读取权限的。
(2)实现自己的权限控制
虽然 zk 自身的权限控制机制已经做得很细,但是 zk 还提供了一种权限扩展机制来让用户实现自己的权限控制方式。官方文档对这种机制的定义是"Pluggable ZooKeeper Authenication",意思是可插拔的授权机制,从名称上可看出它的灵活性。
那么这种机制是如何实现的呢?首先,要想实现自定义的权限控制机制,最核心的一点是实现 zk 提供的权限控制器接口 AuthenticationProvider。然后,实现了自定义权限后,如何才能让服务端使用自定义的权限验证方式呢?接下来就需要将自定义的权限控制注册到服务端,而注册的方式有两种:第一种是通过设置系统属性来注册自定义的权限控制器,第二种是在配置文件 zoo.cfg 中进行配置。
//第一种注册方式
-Dzookeeper.authProvider.x=CustomAuthenticationProvider
//第二种方式
authProvider.x=CustomAuthenticationProvider
复制代码
(3)ACL 内部实现原理之客户端处理过程
下面深入到底层介绍 zk 是如何实现 ACL 权限控制机制的,先看一下客户端是如何操作的,以节点授权 addAuth 接口为例。
步骤一:客户端会通过 ClientCnxn 的 addAuthInfo()方法,向服务端发送请求。
步骤二:addAuthInfo()方法会将 scheme 和 auth 封装成 AuthPacket 对象,并封装一个表示权限操作请求的 RequestHeader 对象。
步骤三:接着 AuthPacket 对象和 RequestHeader 对象会被封装到 Packet 对象中,最后会将该 Packet 对象添加到 outgoingQueue 队列,发送给服务端。
public class ZooKeeper implements AutoCloseable {
protected final ClientCnxn cnxn;
protected final HostProvider hostProvider;
protected final ZKWatchManager watchManager;
private final ZKClientConfig clientConfig;
...
public void addAuthInfo(String scheme, byte auth[]) {
cnxn.addAuthInfo(scheme, auth);
}
...
}
public class ClientCnxn {
private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();
volatile States state = States.NOT_CONNECTED;
...
public void addAuthInfo(String scheme, byte auth[]) {
if (!state.isAlive()) {
return;
}
authInfo.add(new AuthData(scheme, auth));
queuePacket(
new RequestHeader(-4, OpCode.auth), null,
new AuthPacket(0, scheme, auth),
null, null, null, null, null, null
);
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
Packet packet = null;
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
...
}
复制代码
ACL 权限控制机制的客户端实现相对简单,只是封装请求类型为权限请求,方便服务器识别处理,而发送到服务器的信息包括前面提到的权限校验信息。
(4)ACL 内部实现原理之服务端实现过程
相比于客户端的处理过程,服务器端对 ACL 内部实现就比较复杂。
一.当客户端发出的节点 ACL 授权请求到达服务端后
步骤一:首先调用 NIOServerCnxn.readRequest()方法作为服务端处理的入口,而 readRequest()方法其内部只是调用 processPacket()方法。
public class NIOServerCnxn extends ServerCnxn {
private final ZooKeeperServer zkServer;
private ByteBuffer incomingBuffer = lenBuffer;
...
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
...
}
复制代码
步骤二:然后在 ZooKeeperServer 的 processPacket()方法的内部,首先反序列化客户端的请求信息并封装到 AuthPacket 对象中,之后通过 ProviderRegistry 的 getProvider()方法根据 scheme 判断具体实现类。
以 Digest 模式为例,该实现类是 DigestAuthenticationProvider,此时就会调用 handleAuthentication 方法进行权限验证。如果返回 KeeperException.Code.OK 则表示该请求已经通过了权限验证,如果返回的状态是其他或者抛出异常则表示权限验证失败。
所以权限认证的最终实现方法是 handleAuthentication()方法,该方法的工作是解析客户端传递的权限验证类型,并通过 addAuthInfo()方法将权限信息添加到 authInfo 集合中。
其中 addAuthInfo()方法的作用是将解析到的权限信息存储到 zk 服务端的内存中,这些权限信息在整个会话存活期间会一直保存在 zk 服务端。如果会话关闭,那么权限信息就会被删除,这个特性类似于数据节点中的临时节点。
public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
...
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
...
}
return;
} else {
...
}
cnxn.incrOutstandingRequests(h);
}
...
}
public class DigestAuthenticationProvider implements AuthenticationProvider {
...
public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
String id = new String(authData);
try {
String digest = generateDigest(id);
if (digest.equals(superDigest)) {
cnxn.addAuthInfo(new Id("super", ""));
}
cnxn.addAuthInfo(new Id(getScheme(), digest));
return KeeperException.Code.OK;
} catch (NoSuchAlgorithmException e) {
LOG.error("Missing algorithm",e);
}
return KeeperException.Code.AUTHFAILED;
}
...
}
public abstract class ServerCnxn implements Stats, Watcher {
protected ArrayList<Id> authInfo = new ArrayList<Id>();
...
public void addAuthInfo(Id id) {
//将权限信息添加到authInfo集合
if (authInfo.contains(id) == false) {
authInfo.add(id);
}
}
}
复制代码
二.当服务端已将客户端 ACL 授权请求解析并将对应的会话权限信息存储好后
服务端处理一次请求时,是如何进行权限验证的?
首先通过 PrepRequestProcessor 中的 checkAcl()方法检查对应的请求权限。如果该节点没有任何权限设置则直接返回,如果该节点有权限设置则循环遍历节点的权限信息进行检查,如果具有相应的权限则直接返回表明权限认证成功,否则直接抛出 NoAuthException 异常表明权限认证失败。
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {
...
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
...
case OpCode.delete:
...
checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo);
...
case OpCode.setData:
...
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
...
...
}
static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm, List<Id> ids) {
...
for (ACL a : acl) {
if (authId.getScheme().equals(id.getScheme()) && ap.matches(authId.getId(), id.getId())) {
return;
}
}
throw new KeeperException.NoAuthException();
}
...
}
复制代码
(5)ACL 权限总结
客户端发送 ACL 权限请求时的处理:首先会封装请求类型,然后将权限信息封装到 request 中,最后发送 request 给服务端。
服务器对 ACL 权限请求的授权处理:首先分析请求类型是否是权限相关操作,然后根据不同的权限模式调用不同的实现类验证权限,最后存储权限信息。
注意:会话的授权信息存储在服务端内存。如果客户端会话关闭,授权信息会被删除。下次连接服务器后,需要重新调用授权接口进行授权;
zk 作为分布式系统协调框架,往往在一个分布式系统下起到关键的作用。尤其是在分布式锁、配置管理等应用场景中。如果因错误操作对重要数据节点进行变更或删除,对整个系统影响很大,甚至可能会导致整个分布式服务不可用,所以设计使用 zk 时一定要考虑对关键节点添加权限控制。
问题:如果一个客户端对服务器上的一个节点设置了只有它自己才能操作的权限,那么等该客户端下线后,对其创建的节点要想进行修改应该怎么做?
可以通过"super 模式"删除该节点或变更该节点的权限验证方式,正因为"super 模式"有如此大的权限,在平时使用时应更加谨慎。
文章转载自:东阳马生架构
原文链接:https://www.cnblogs.com/mjunz/p/18811791
体验地址:http://www.jnpfsoft.com/?from=001YH
评论