写点什么

zk 基础—Curator 的使用与剖析

  • 2025-04-07
    福建
  • 本文字数:35956 字

    阅读完需:约 118 分钟

1.基于 Curator 进行基本的 zk 数据操作


Guava is to Java what Curator is to ZooKeeper,引入依赖如下:


<dependencies>    <dependency>        <groupId>org.apache.curator</groupId>        <artifactId>curator-framework</artifactId>        <version>2.12.0</version>    </dependency>    <dependency>        <groupId>org.apache.curator</groupId>        <artifactId>curator-recipes</artifactId>        <version>2.12.0</version>    </dependency></dependencies>
复制代码


Curator 实现对 znode 进行增删改查的示例如下,其中 CuratorFramework 代表一个客户端实例。注意:可以通过 creatingParentsIfNeeded()方法进行指定节点的级联创建。


public class CrudDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();//启动客户端并建立连接             System.out.println("已经启动Curator客户端");
client.create() .creatingParentsIfNeeded()//进行级联创建 .withMode(CreateMode.PERSISTENT)//指定节点类型 .forPath("/my/path", "10".getBytes());//增
byte[] dataBytes = client.getData().forPath("/my/path");//查 System.out.println(new String(dataBytes));
client.setData().forPath("/my/path", "11".getBytes());//改 dataBytes = client.getData().forPath("/my/path"); System.out.println(new String(dataBytes));
List<String> children = client.getChildren().forPath("/my");//查 System.out.println(children);
client.delete().forPath("/my/path");//删 Thread.sleep(Integer.MAX_VALUE); }}
复制代码


2.基于 Curator 实现集群元数据管理


Curator 可以操作 zk。比如自研了一套分布式系统类似于 Kafka、Canal,想把集群运行的核心元数据都放到 zk 里去。此时就可以通过 Curator 创建一些 znode,往里面写入对应的值。

 

写入的值推荐用 json 格式,比如 Kafka 就是往 zk 写 json 格式数据。这样,其他客户端在需要的时候,就可以从里面读取出集群元数据了。

 

3.基于 Curator 实现 HA 主备自动切换


HDFS、Kafka、Canal 都使用了 zk 进行 Leader 选举,所以可以基于 Curator 实现 HA 主备自动切换。

 

HDFS 的 NameNode 是可以部署 HA 架构的,有主备两台机器。如果主机器宕机了,备用的机器可以感知到并选举为 Leader,这样备用的机器就可以作为新的 NameNode 对外提供服务。

 

Kafka 里的 Controller 负责管理整个集群的协作,Kafka 中任何一个 Broker 都可以变成 Controller,类似于 Leader 的角色。

 

Canal 也会部署主备两台机器,主机器挂掉了,备用机器就可以跟上去。

 

4.基于 Curator 实现 Leader 选举


(1)Curator 实现 Leader 选举的第一种方式之 LeaderLatch


通过 Curator 的 LeaderLatch 来实现 Leader 选举:


public class LeaderLatchDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();                //"/leader/latch"这其实是一个znode顺序节点        LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch");        leaderLatch.start();        leaderLatch.await();//直到等待他成为Leader再往后执行
//类似于HDFS里,两台机器,其中一台成为了Leader就开始工作 //另外一台机器可以通过await阻塞在这里,直到Leader挂了,自己就会成为Leader继续工作 Boolean hasLeaderShip = leaderLatch.hasLeadership();//判断是否成为Leader System.out.println("是否成为leader:" + hasLeaderShip);
Thread.sleep(Integer.MAX_VALUE); }}
复制代码


(2)Curator 实现 Leader 选举的第二种方式之 LeaderSelector


通过 Curator 的 LeaderSelector 来实现 Leader 选举如下:其中,LeaderSelector 有两个监听器,可以关注连接状态。


public class LeaderSelectorDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();
LeaderSelector leaderSelector = new LeaderSelector( client, "/leader/election", new LeaderSelectorListener() { public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("你已经成为了Leader......"); //在这里干Leader所有的事情,此时方法不能退出 Thread.sleep(Integer.MAX_VALUE); } public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { System.out.println("连接状态的变化,已经不是Leader......"); if (connectionState.equals(ConnectionState.LOST)) { throw new CancelLeadershipException(); } } } ); leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为Leader Thread.sleep(Integer.MAX_VALUE); }}
复制代码


5.基于 Curator 实现的分布式 Barrier


(1)分布式 Barrier


很多台机器都可以创建一个 Barrier,此时它们都被阻塞了。除非满足一个条件(setBarrier()或 removeBarrier()),才能不再阻塞它们。


//DistributedBarrierpublic class DistributedBarrierDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();
DistributedBarrier barrier = new DistributedBarrier(client, "/barrier"); barrier.waitOnBarrier(); }}
复制代码


(2)分布式双重 Barrier


//DistributedDoubleBarrierpublic class DistributedDoubleBarrierDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();
DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(client, "/barrier/double", 10); doubleBarrier.enter();//每台机器都会阻塞在enter这里 //直到10台机器都调用了enter,就会从enter这里往下执行 //此时可以做一些计算任务
doubleBarrier.leave();//每台机器都会阻塞在leave这里,直到10台机器都调用了leave //此时就可以继续往下执行 }}
复制代码


6.基于 Curator 实现分布式计数器


如果真的要实现分布式计数器,最好用 Redis 来实现。因为 Redis 的并发量更高,性能更好,功能更加的强大,而且还可以使用 lua 脚本嵌入进去实现复杂的业务逻辑。但是 Redis 天生的异步同步机制,存在机器宕机导致的数据不同步风险。然而 zk 在 ZAB 协议下的数据同步机制,则不会出现宕机导致数据不同步的问题。


//SharedCount:通过一个节点的值来实现public class SharedCounterDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();
SharedCount sharedCount = new SharedCount(client, "/shared/count", 0); sharedCount.start();
sharedCount.addListener(new SharedCountListener() { public void countHasChanged(SharedCountReader sharedCountReader, int i) throws Exception { System.out.println("分布式计数器变化了......"); } public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { System.out.println("连接状态变化了....."); } });
Boolean result = sharedCount.trySetCount(1); System.out.println(sharedCount.getCount()); }}
复制代码


7.基于 Curator 实现 zk 的节点和子节点监听机制


我们使用 zk 主要用于:

一.对元数据进行增删改查、监听元数据的变化

二.进行 Leader 选举

 

有三种类型的节点可以监听:

一.子节点监听 PathCache

二.节点监听 NodeCache

三.整个节点以下的树监听 TreeCache

 

(1)基于 Curator 实现 zk 的子节点监听机制


下面是 PathCache 实现的子节点监听示例:


public class PathCacheDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true); //cache就是把zk里的数据缓存到客户端里来 //可以针对这个缓存的数据加监听器,去观察zk里的数据的变化 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
} }); pathChildrenCache.start(); }}
复制代码


(2)基于 Curator 实现 zk 的节点数据监听机制


下面是 NodeCache 实现的节点监听示例:


public class NodeCacheDemo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        final CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", 5000, 3000, retryPolicy);        client.start();
final NodeCache nodeCache = new NodeCache(client, "/cluster"); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { Stat stat = client.checkExists().forPath("/cluster"); if (stat == null) { } else { nodeCache.getCurrentData(); } } }); nodeCache.start(); }}
复制代码


8.基于 Curator 创建客户端实例的源码分析


(1)创建 CuratorFramework 实例使用了构造器模式


CuratorFrameworkFactory.newClient()方法使用了构造器模式。首先通过 builder()方法创建出 Builder 实例对象,然后把参数都设置成 Builder 实例对象的属性,最后通过 build()方法把 Builder 实例对象传入目标类的构造方法中。


public class Demo {    public static void main(String[] args) throws Exception {      RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);      CuratorFramework client = CuratorFrameworkFactory.newClient(              "127.0.0.1:2181",//zk的地址              5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开              3000,//连接zk时的超时时间              retryPolicy      );      client.start();      System.out.println("已经启动Curator客户端");    }}
public class CuratorFrameworkFactory { //创建CuratorFramework实例使用了构造器模式 public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). build(); } ... public static Builder builder() { return new Builder(); } public static class Builder { ... private EnsembleProvider ensembleProvider; private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS; private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS; private RetryPolicy retryPolicy; ... public Builder connectString(String connectString) { ensembleProvider = new FixedEnsembleProvider(connectString); return this; } public Builder sessionTimeoutMs(int sessionTimeoutMs) { this.sessionTimeoutMs = sessionTimeoutMs; return this; } public Builder connectionTimeoutMs(int connectionTimeoutMs) { this.connectionTimeoutMs = connectionTimeoutMs; return this; } public Builder retryPolicy(RetryPolicy retryPolicy) { this.retryPolicy = retryPolicy; return this; } ... public CuratorFramework build() { return new CuratorFrameworkImpl(this); } } ...}
public class CuratorFrameworkImpl implements CuratorFramework { ... public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory()); this.client = new CuratorZookeeperClient( localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), builder.getWaitForShutdownTimeoutMs(), new Watcher() {//这里注册了一个zk的watcher @Override public void process(WatchedEvent watchedEvent) { CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null); processEvent(event); } }, builder.getRetryPolicy(), builder.canBeReadOnly(), builder.getConnectionHandlingPolicy() ); ... } ...}
复制代码


(2)创建 CuratorFramework 实例会初始化 CuratorZooKeeperClient 实例


CuratorFramework 实例代表了一个 zk 客户端,CuratorFramework 初始化时会初始化一个 CuratorZooKeeperClient 实例。

 

CuratorZooKeeperClient 是 Curator 封装 ZooKeeper 的客户端。

 

初始化 CuratorZooKeeperClient 时会传入一个 Watcher 监听器。

 

所以 CuratorFrameworkFactory 的 newClient()方法的主要工作是:初始化 CuratorFramework -> 初始化 CuratorZooKeeperClient -> 初始化 ZookeeperFactory + 注册一个 Watcher。

 

客户端发起与 zk 的连接,以及注册 Watcher 监听器,则是由 CuratorFramework 的 start()方法触发的。

 

9.Curator 启动时是如何跟 zk 建立连接的


ConnectionStateManager 的 start()方法会启动一个线程处理 eventQueue。eventQueue 里存放了与 zk 的网络连接变化事件,eventQueue 收到这种事件便会通知 ConnectionStateListener。

 

CuratorZookeeperClient 的 start()方法会初始化好原生 zk 客户端,和 zk 服务器建立一个 TCP 长连接,而且还会注册一个 ConnectionState 类型的 Watcher 监听器,以便能收到 zk 服务端发送的通知事件。


public class CuratorFrameworkImpl implements CuratorFramework {    private final CuratorZookeeperClient client;    private final ConnectionStateManager connectionStateManager;    private volatile ExecutorService executorService;    ...    public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {        ...        this.client = new CuratorZookeeperClient(...);        connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(),             builder.getSessionTimeoutMs(),             builder.getConnectionHandlingPolicy().getSimulatedSessionExpirationPercent(),             builder.getConnectionStateListenerDecorator()        );        ...    }    ...    @Override    public void start() {        log.info("Starting");        if (!state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)) {            throw new IllegalStateException("Cannot be started more than once");        }        ...        //1.启动一个线程监听和zk网络连接的变化事件        connectionStateManager.start();        //2.添加一个监听器监听和zk网络连接的变化        final ConnectionStateListener listener = new ConnectionStateListener() {            @Override            public void stateChanged(CuratorFramework client, ConnectionState newState) {                if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState) {                    logAsErrorConnectionErrors.set(true);                }            }            @Override            public boolean doNotDecorate() {                return true;            }        };        this.getConnectionStateListenable().addListener(listener);        //3.创建原生zk客户端        client.start();        //4.创建一个线程池,执行后台的操作        executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);        executorService.submit(new Callable<Object>() {            @Override            public Object call() throws Exception {                backgroundOperationsLoop();                return null;            }        });        if (ensembleTracker != null) {            ensembleTracker.start();        }        log.info(schemaSet.toDocumentation());    }    ...}
public class ConnectionStateManager implements Closeable { private final ExecutorService service; private final BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE); ... public ConnectionStateManager(CuratorFramework client, ThreadFactory threadFactory, int sessionTimeoutMs, int sessionExpirationPercent, ConnectionStateListenerDecorator connectionStateListenerDecorator) { ... service = Executors.newSingleThreadExecutor(threadFactory); ... } ... public void start() { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); //启动一个线程 service.submit( new Callable<Object>() { @Override public Object call() throws Exception { processEvents(); return null; } } ); } private void processEvents() { while (state.get() == State.STARTED) { int useSessionTimeoutMs = getUseSessionTimeoutMs(); long elapsedMs = startOfSuspendedEpoch == 0 ? useSessionTimeoutMs / 2 : System.currentTimeMillis() - startOfSuspendedEpoch; long pollMaxMs = useSessionTimeoutMs - elapsedMs;
final ConnectionState newState = eventQueue.poll(pollMaxMs, TimeUnit.MILLISECONDS); if (newState != null) { if (listeners.size() == 0) { log.warn("There are no ConnectionStateListeners registered."); } listeners.forEach(listener -> listener.stateChanged(client, newState)); } else if (sessionExpirationPercent > 0) { synchronized(this) { checkSessionExpiration(); } } } } ...}
public class CuratorZookeeperClient implements Closeable { private final ConnectionState state; ... public CuratorZookeeperClient(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, int waitForShutdownTimeoutMs, Watcher watcher, RetryPolicy retryPolicy, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) { ... state = new ConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, connectionTimeoutMs, watcher, tracer, canBeReadOnly, connectionHandlingPolicy); ... } ... public void start() throws Exception { log.debug("Starting"); if (!started.compareAndSet(false, true)) { throw new IllegalStateException("Already started"); } state.start(); } ...}
class ConnectionState implements Watcher, Closeable { private final HandleHolder zooKeeper; ConnectionState(ZookeeperFactory zookeeperFactory, EnsembleProvider ensembleProvider, int sessionTimeoutMs, int connectionTimeoutMs, Watcher parentWatcher, AtomicReference<TracerDriver> tracer, boolean canBeReadOnly, ConnectionHandlingPolicy connectionHandlingPolicy) { this.ensembleProvider = ensembleProvider; this.sessionTimeoutMs = sessionTimeoutMs; this.connectionTimeoutMs = connectionTimeoutMs; this.tracer = tracer; this.connectionHandlingPolicy = connectionHandlingPolicy; if (parentWatcher != null) { parentWatchers.offer(parentWatcher); } //把自己作为Watcher注册给HandleHolder zooKeeper = new HandleHolder(zookeeperFactory, this, ensembleProvider, sessionTimeoutMs, canBeReadOnly); } ... void start() throws Exception { log.debug("Starting"); ensembleProvider.start(); reset(); } synchronized void reset() throws Exception { log.debug("reset"); instanceIndex.incrementAndGet(); isConnected.set(false); connectionStartMs = System.currentTimeMillis(); //创建客户端与zk的连接 zooKeeper.closeAndReset(); zooKeeper.getZooKeeper();//initiate connection } ...}
class HandleHolder { private final ZookeeperFactory zookeeperFactory; private final Watcher watcher; private final EnsembleProvider ensembleProvider; private final int sessionTimeout; private final boolean canBeReadOnly; private volatile Helper helper; ... HandleHolder(ZookeeperFactory zookeeperFactory, Watcher watcher, EnsembleProvider ensembleProvider, int sessionTimeout, boolean canBeReadOnly) { this.zookeeperFactory = zookeeperFactory; this.watcher = watcher; this.ensembleProvider = ensembleProvider; this.sessionTimeout = sessionTimeout; this.canBeReadOnly = canBeReadOnly; } private interface Helper { ZooKeeper getZooKeeper() throws Exception; String getConnectionString(); int getNegotiatedSessionTimeoutMs(); } ZooKeeper getZooKeeper() throws Exception { return (helper != null) ? helper.getZooKeeper() : null; } void closeAndReset() throws Exception { internalClose(0); helper = new Helper() { private volatile ZooKeeper zooKeeperHandle = null; private volatile String connectionString = null; @Override public ZooKeeper getZooKeeper() throws Exception { synchronized(this) { if (zooKeeperHandle == null) { connectionString = ensembleProvider.getConnectionString(); //创建和zk的连接,初始化变量zooKeeperHandle zooKeeperHandle = zookeeperFactory.newZooKeeper(connectionString, sessionTimeout, watcher, canBeReadOnly); } ... return zooKeeperHandle; } } @Override public String getConnectionString() { return connectionString; } @Override public int getNegotiatedSessionTimeoutMs() { return (zooKeeperHandle != null) ? zooKeeperHandle.getSessionTimeout() : 0; } }; } ...}
//创建客户端与zk的连接public class DefaultZookeeperFactory implements ZookeeperFactory { @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception { return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly); }}
复制代码


10.基于 Curator 进行增删改查节点的源码分析

 

Curator 的 CURD 操作,底层都是通过调用 zk 原生的 API 来完成的。

 

(1)基于 Curator 创建 znode 节点


创建节点也使用了构造器模式:首先通过 CuratorFramework 的 create()方法创建一个 CreateBuilder 实例,然后通过 CreateBuilder 的 withMode()等方法设置 CreateBuilder 的变量,最后通过 CreateBuilder 的 forPath()方法 + 重试调用来创建 znode 节点。

 

创建节点时会调用 CuratorFramework 的 getZooKeeper()方法获取 zk 客户端实例,之后就是通过原生 zk 客户端的 API 去创建节点了。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端");        //创建节点        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/my/path", "100".getBytes());    }}
public class CuratorFrameworkImpl implements CuratorFramework { ... @Override public CreateBuilder create() { checkState(); return new CreateBuilderImpl(this); } ...}
public class CreateBuilderImpl implements CreateBuilder, CreateBuilder2, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<String> { private final CuratorFrameworkImpl client; private CreateMode createMode; private Backgrounding backgrounding; private boolean createParentsIfNeeded; ... CreateBuilderImpl(CuratorFrameworkImpl client) { this.client = client; createMode = CreateMode.PERSISTENT; backgrounding = new Backgrounding(); acling = new ACLing(client.getAclProvider()); createParentsIfNeeded = false; createParentsAsContainers = false; compress = false; setDataIfExists = false; storingStat = null; ttl = -1; } @Override public String forPath(final String givenPath, byte[] data) throws Exception { if (compress) { data = client.getCompressionProvider().compress(givenPath, data); }
final String adjustedPath = adjustPath(client.fixForNamespace(givenPath, createMode.isSequential())); List<ACL> aclList = acling.getAclList(adjustedPath); client.getSchemaSet().getSchema(givenPath).validateCreate(createMode, givenPath, data, aclList);
String returnPath = null; if (backgrounding.inBackground()) { pathInBackground(adjustedPath, data, givenPath); } else { //创建节点 String path = protectedPathInForeground(adjustedPath, data, aclList); returnPath = client.unfixForNamespace(path); } return returnPath; } private String protectedPathInForeground(String adjustedPath, byte[] data, List<ACL> aclList) throws Exception { return pathInForeground(adjustedPath, data, aclList); } private String pathInForeground(final String path, final byte[] data, final List<ACL> aclList) throws Exception { OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("CreateBuilderImpl-Foreground"); final AtomicBoolean firstTime = new AtomicBoolean(true); //重试调用 String returnPath = RetryLoop.callWithRetry( client.getZookeeperClient(), new Callable<String>() { @Override public String call() throws Exception { boolean localFirstTime = firstTime.getAndSet(false) && !debugForceFindProtectedNode; protectedMode.checkSetSessionId(client, createMode); String createdPath = null; if (!localFirstTime && protectedMode.doProtected()) { debugForceFindProtectedNode = false; createdPath = findProtectedNodeInForeground(path); } if (createdPath == null) { //在创建znode节点的时候,首先会调用CuratorFramework.getZooKeeper()获取zk客户端实例 //之后就是通过原生zk客户端的API去创建节点了 try { if (client.isZk34CompatibilityMode()) { createdPath = client.getZooKeeper().create(path, data, aclList, createMode); } else { createdPath = client.getZooKeeper().create(path, data, aclList, createMode, storingStat, ttl); } } catch (KeeperException.NoNodeException e) { if (createParentsIfNeeded) { //这就是级联创建节点的实现 ZKPaths.mkdirs(client.getZooKeeper(), path, false, acling.getACLProviderForParents(), createParentsAsContainers); if (client.isZk34CompatibilityMode()) { createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode); } else { createdPath = client.getZooKeeper().create(path, data, acling.getAclList(path), createMode, storingStat, ttl); } } else { throw e; } } catch (KeeperException.NodeExistsException e) { if (setDataIfExists) { Stat setStat = client.getZooKeeper().setData(path, data, setDataIfExistsVersion); if (storingStat != null) { DataTree.copyStat(setStat, storingStat); } createdPath = path; } else { throw e; } } } if (failNextCreateForTesting) { failNextCreateForTesting = false; throw new KeeperException.ConnectionLossException(); } return createdPath; } } ); trace.setRequestBytesLength(data).setPath(path).commit(); return returnPath; } ...}
public class CuratorFrameworkImpl implements CuratorFramework { private final CuratorZookeeperClient client; public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) { ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory()); this.client = new CuratorZookeeperClient( localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), builder.getWaitForShutdownTimeoutMs(), new Watcher() { ... }, builder.getRetryPolicy(), builder.canBeReadOnly(), builder.getConnectionHandlingPolicy() ); ... } ... ZooKeeper getZooKeeper() throws Exception { return client.getZooKeeper(); } ...}
public class CuratorZookeeperClient implements Closeable { private final ConnectionState state; ... public ZooKeeper getZooKeeper() throws Exception { Preconditions.checkState(started.get(), "Client is not started"); return state.getZooKeeper(); } ...}
class ConnectionState implements Watcher, Closeable { private final HandleHolder zooKeeper; ... ZooKeeper getZooKeeper() throws Exception { if (SessionFailRetryLoop.sessionForThreadHasFailed()) { throw new SessionFailRetryLoop.SessionFailedException(); } Exception exception = backgroundExceptions.poll(); if (exception != null) { new EventTrace("background-exceptions", tracer.get()).commit(); throw exception; } boolean localIsConnected = isConnected.get(); if (!localIsConnected) { checkTimeouts(); } //通过HandleHolder获取ZooKeeper实例 return zooKeeper.getZooKeeper(); } ...}
复制代码


(2)基于 Curator 查询 znode 节点


查询节点也使用了构造器模式:首先通过 CuratorFramework 的 getData()方法创建一个 GetDataBuilder 实例,然后通过 GetDataBuilder 的 forPath()方法 + 重试调用来查询 znode 节点。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端");
//查询节点 byte[] dataBytes = client.getData().forPath("/my/path"); System.out.println(new String(dataBytes)); //查询子节点 List<String> children = client.getChildren().forPath("/my"); System.out.println(children); }}
public class CuratorFrameworkImpl implements CuratorFramework { ... @Override public GetDataBuilder getData() { checkState(); return new GetDataBuilderImpl(this); } @Override public GetChildrenBuilder getChildren() { checkState(); return new GetChildrenBuilderImpl(this); } ...}
public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<String>, ErrorListenerPathable<byte[]> { private final CuratorFrameworkImpl client; ... @Override public byte[] forPath(String path) throws Exception { client.getSchemaSet().getSchema(path).validateWatch(path, watching.isWatched() || watching.hasWatcher()); path = client.fixForNamespace(path); byte[] responseData = null; if (backgrounding.inBackground()) { client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext(), watching), null); } else { //查询节点 responseData = pathInForeground(path); } return responseData; } private byte[] pathInForeground(final String path) throws Exception { OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("GetDataBuilderImpl-Foreground"); //重试调用 byte[] responseData = RetryLoop.callWithRetry( client.getZookeeperClient(), new Callable<byte[]>() { @Override public byte[] call() throws Exception { byte[] responseData; //通过CuratorFramework获取原生zk客户端实例,然后调用其getData()获取节点 if (watching.isWatched()) { responseData = client.getZooKeeper().getData(path, true, responseStat); } else { responseData = client.getZooKeeper().getData(path, watching.getWatcher(path), responseStat); watching.commitWatcher(KeeperException.NoNodeException.Code.OK.intValue(), false); } return responseData; } } ); trace.setResponseBytesLength(responseData).setPath(path).setWithWatcher(watching.hasWatcher()).setStat(responseStat).commit(); return decompress ? client.getCompressionProvider().decompress(path, responseData) : responseData; } ...}
复制代码


(3)基于 Curator 修改 znode 节点


修改节点也使用了构造器模式:首先通过 CuratorFramework 的 setData()方法创建一个 SetDataBuilder 实例,然后通过 SetDataBuilder 的 forPath()方法 + 重试调用来修改 znode 节点。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端");
//修改节点 client.setData().forPath("/my/path", "110".getBytes()); byte[] dataBytes = client.getData().forPath("/my/path"); System.out.println(new String(dataBytes)); }}
public class CuratorFrameworkImpl implements CuratorFramework { ... @Override public SetDataBuilder setData() { checkState(); return new SetDataBuilderImpl(this); } ...}
public class SetDataBuilderImpl implements SetDataBuilder, BackgroundOperation<PathAndBytes>, ErrorListenerPathAndBytesable<Stat> { private final CuratorFrameworkImpl client; ... @Override public Stat forPath(String path, byte[] data) throws Exception { client.getSchemaSet().getSchema(path).validateGeneral(path, data, null); if (compress) { data = client.getCompressionProvider().compress(path, data); } path = client.fixForNamespace(path); Stat resultStat = null; if (backgrounding.inBackground()) { client.processBackgroundOperation(new OperationAndData<>(this, new PathAndBytes(path, data), backgrounding.getCallback(), null, backgrounding.getContext(), null), null); } else { //修改节点 resultStat = pathInForeground(path, data); } return resultStat; } private Stat pathInForeground(final String path, final byte[] data) throws Exception { OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("SetDataBuilderImpl-Foreground"); //重试调用 Stat resultStat = RetryLoop.callWithRetry( client.getZookeeperClient(), new Callable<Stat>() { @Override public Stat call() throws Exception { //通过CuratorFramework获取原生zk客户端实例,然后调用其setData()修改节点 return client.getZooKeeper().setData(path, data, version); } } ); trace.setRequestBytesLength(data).setPath(path).setStat(resultStat).commit(); return resultStat; } ...}
复制代码


(4)基于 Curator 删除 znode 节点


删除节点也使用了构造器模式:首先通过 CuratorFramework 的 delete()方法创建一个 DeleteBuilder 实例,然后通过 DeleteBuilder 的 forPath()方法 + 重试调用来删除 znode 节点。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端");
//删除节点 client.delete().forPath("/my/path"); }}
public class CuratorFrameworkImpl implements CuratorFramework { ... @Override public DeleteBuilder delete() { checkState(); return new DeleteBuilderImpl(this); } ...}
public class DeleteBuilderImpl implements DeleteBuilder, BackgroundOperation<String>, ErrorListenerPathable<Void> { private final CuratorFrameworkImpl client; ... @Override public Void forPath(String path) throws Exception { client.getSchemaSet().getSchema(path).validateDelete(path); final String unfixedPath = path; path = client.fixForNamespace(path); if (backgrounding.inBackground()) { OperationAndData.ErrorCallback<String> errorCallback = null; if (guaranteed) { errorCallback = new OperationAndData.ErrorCallback<String>() { @Override public void retriesExhausted(OperationAndData<String> operationAndData) { client.getFailedDeleteManager().addFailedOperation(unfixedPath); } }; } client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null); } else { //删除节点 pathInForeground(path, unfixedPath); } return null; }
private void pathInForeground(final String path, String unfixedPath) throws Exception { OperationTrace trace = client.getZookeeperClient().startAdvancedTracer("DeleteBuilderImpl-Foreground"); //重试调用 RetryLoop.callWithRetry( client.getZookeeperClient(), new Callable<Void>() { @Override public Void call() throws Exception { try { //通过CuratorFramework获取原生zk客户端实例,然后调用其delete()删除节点 client.getZooKeeper().delete(path, version); } catch (KeeperException.NoNodeException e) { if (!quietly) { throw e; } } catch (KeeperException.NotEmptyException e) { if (deletingChildrenIfNeeded) { ZKPaths.deleteChildren(client.getZooKeeper(), path, true); } else { throw e; } } return null; } } ); trace.setPath(path).commit(); }}
复制代码


11.Curator 节点监听回调机制的实现源码


(1)PathCache 子节点监听机制的实现源码


PathChildrenCache 会调用原生 zk 客户端对象的 getChildren()方法,并往该方法传入一个监听器 childrenWatcher。当子节点发生事件,就会通知 childrenWatcher 这个原生的 Watcher,然后该 Watcher 便会调用注册到 PathChildrenCache 的 Listener。注意:在传入的监听器 Watcher 中会实现重复注册 Watcher。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端");
//PathCache,监听/cluster下的子节点变化 PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/cluster", true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { ... } }); pathChildrenCache.start(); }}
public class PathChildrenCache implements Closeable { private final WatcherRemoveCuratorFramework client; private final String path; private final boolean cacheData; private final boolean dataIsCompressed; private final CloseableExecutorService executorService; private final ListenerContainer<PathChildrenCacheListener> listeners = new ListenerContainer<PathChildrenCacheListener>(); ... //初始化 public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService) { this.client = client.newWatcherRemoveCuratorFramework(); this.path = PathUtils.validatePath(path); this.cacheData = cacheData; this.dataIsCompressed = dataIsCompressed; this.executorService = executorService; ensureContainers = new EnsureContainers(client, path); } //获取用来存放Listener的容器listeners public ListenerContainer<PathChildrenCacheListener> getListenable() { return listeners; } //启动对子节点的监听 public void start() throws Exception { start(StartMode.NORMAL); } private volatile ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { //处理连接状态的变化 handleStateChange(newState); } }; public void start(StartMode mode) throws Exception { ... //对建立的zk连接添加Listener client.getConnectionStateListenable().addListener(connectionStateListener); ... //把PathChildrenCache自己传入RefreshOperation中 //下面的代码其实就是调用PathChildrenCache的refresh()方法 offerOperation(new RefreshOperation(this, RefreshMode.STANDARD)); ... } //提交一个任务到线程池进行处理 void offerOperation(final Operation operation) { if (operationsQuantizer.add(operation)) { submitToExecutor( new Runnable() { @Override public void run() { ... operationsQuantizer.remove(operation); //其实就是调用PathChildrenCache的refresh()方法 operation.invoke(); ... } } ); } } private synchronized void submitToExecutor(final Runnable command) { if (state.get() == State.STARTED) { //提交一个任务到线程池进行处理 executorService.submit(command); } } ...}
class RefreshOperation implements Operation { private final PathChildrenCache cache; private final PathChildrenCache.RefreshMode mode; RefreshOperation(PathChildrenCache cache, PathChildrenCache.RefreshMode mode) { this.cache = cache; this.mode = mode; } @Override public void invoke() throws Exception { //调用PathChildrenCache的refresh方法,也就是发起对子节点的监听 cache.refresh(mode); } ...}
public class PathChildrenCache implements Closeable { ... private volatile Watcher childrenWatcher = new Watcher() { //重复注册监听器 //当子节点发生变化事件时,该方法就会被触发调用 @Override public void process(WatchedEvent event) { //下面的代码其实依然是调用PathChildrenCache的refresh()方法 offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.STANDARD)); } }; void refresh(final RefreshMode mode) throws Exception { ensurePath(); //创建一个回调,在下面执行client.getChildren()成功时会触发执行该回调 final BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (reRemoveWatchersOnBackgroundClosed()) { return; } if (event.getResultCode() == KeeperException.Code.OK.intValue()) { //处理子节点数据 processChildren(event.getChildren(), mode); } else if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { if (mode == RefreshMode.NO_NODE_EXCEPTION) { log.debug("KeeperException.NoNodeException received for getChildren() and refresh has failed. Resetting ensureContainers but not refreshing. Path: [{}]", path); ensureContainers.reset(); } else { log.debug("KeeperException.NoNodeException received for getChildren(). Resetting ensureContainers. Path: [{}]", path); ensureContainers.reset(); offerOperation(new RefreshOperation(PathChildrenCache.this, RefreshMode.NO_NODE_EXCEPTION)); } } } }; //下面的代码最后会调用到原生zk客户端的getChildren方法发起对子节点的监听 //并且添加一个叫childrenWatcher的监听,一个叫callback的后台异步回调 client.getChildren().usingWatcher(childrenWatcher).inBackground(callback).forPath(path); } ...}
//子节点发生变化事件时,最后都会触发执行EventOperation的invoke()方法class EventOperation implements Operation { private final PathChildrenCache cache; private final PathChildrenCacheEvent event; EventOperation(PathChildrenCache cache, PathChildrenCacheEvent event) { this.cache = cache; this.event = event; } @Override public void invoke() { //调用PathChildrenCache的Listener cache.callListeners(event); } ...}
复制代码


(2)NodeCache 节点监听机制的实现源码


NodeCache 会调用原生 zk 客户端对象的 exists()方法,并往该方法传入一个监听器 watcher。当子节点发生事件,就会通知 watcher 这个原生的 Watcher,然后该 Watcher 便会调用注册到 NodeCache 的 Listener。注意:在传入的监听器 Watcher 中会实现重复注册 Watcher。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端");
//NodeCache final NodeCache nodeCache = new NodeCache(client, "/cluster"); nodeCache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { Stat stat = client.checkExists().forPath("/cluster"); if (stat == null) { } else { nodeCache.getCurrentData(); } } }); nodeCache.start(); }}
public class NodeCache implements Closeable { private final WatcherRemoveCuratorFramework client; private final String path; private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>(); ... private ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ((newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED)) { if (isConnected.compareAndSet(false, true)) { reset(); } } else { isConnected.set(false); } } }; //初始化一个Watcher,作为监听器添加到下面reset()方法执行的client.checkExists()方法中 private Watcher watcher = new Watcher() { //重复注册监听器 @Override public void process(WatchedEvent event) { reset(); } }; //初始化一个回调,在下面reset()方法执行client.checkExists()成功时会触发执行该回调 private final BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { processBackgroundResult(event); } }; //初始化NodeCache public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed) { this.client = client.newWatcherRemoveCuratorFramework(); this.path = PathUtils.validatePath(path); this.dataIsCompressed = dataIsCompressed; } //获取存放Listener的容器ListenerContainer public ListenerContainer<NodeCacheListener> getListenable() { Preconditions.checkState(state.get() != State.CLOSED, "Closed"); return listeners; } //启动对节点的监听 public void start() throws Exception { start(false); } public void start(boolean buildInitial) throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); //对建立的zk连接添加Listener client.getConnectionStateListenable().addListener(connectionStateListener); if (buildInitial) { //调用原生的zk客户端的exists()方法,对节点进行监听 client.checkExists().creatingParentContainersIfNeeded().forPath(path); internalRebuild(); } reset(); } private void reset() throws Exception { if ((state.get() == State.STARTED) && isConnected.get()) { //下面的代码最后会调用原生的zk客户端的exists()方法,对节点进行监听 //并且添加一个叫watcher的监听,一个叫backgroundCallback的后台异步回调 client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } } private void processBackgroundResult(CuratorEvent event) throws Exception { switch (event.getType()) { case GET_DATA: { if (event.getResultCode() == KeeperException.Code.OK.intValue()) { ChildData childData = new ChildData(path, event.getStat(), event.getData()); setNewData(childData); } break; } case EXISTS: { if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { setNewData(null); } else if (event.getResultCode() == KeeperException.Code.OK.intValue()) { if (dataIsCompressed) { client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } else { client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } } break; } } } ...}
复制代码


(3)getChildren()方法对子节点注册监听器和后台异步回调说明


getChildren()方法注册的 Watcher 只有一次性,其注册的回调是一个异步回调。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端,完成zk的连接");
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/test", "10".getBytes()); System.out.println("创建节点'/test");
client.getChildren().usingWatcher(new CuratorWatcher() { public void process(WatchedEvent event) throws Exception { //只要通知过一次zk节点的变化,这里就不会再被通知了 //也就是第一次的通知才有效,这里被执行过一次后,就不会再被执行 System.out.println("收到一个zk的通知: " + event); } }).inBackground(new BackgroundCallback() { //后台回调通知,表示会让zk.getChildren()在后台异步执行 //后台异步执行client.getChildren()方法完毕,便会回调这个方法进行通知 public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("收到一个后台回调通知: " + event); } }).forPath("/test"); }}
复制代码


(4)PathCache 实现自动重复注册监听器的效果


每当节点发生变化时,就会触发 childEvent()方法的调用。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        final CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端,完成zk的连接");
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/test", true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { //只要子节点发生变化,无论变化多少次,每次变化都会触发这里childEvent的调用 public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("监听的子节点发生变化,收到了事件通知:" + pathChildrenCacheEvent); } }); pathChildrenCache.start(); System.out.println("完成子节点的监听和启动"); }}
复制代码


5)NodeCache 实现节点变化事件监听的效果


每当节点发生变化时,就会触发 nodeChanged()方法的调用。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        final CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端,完成zk的连接");
final NodeCache nodeCache = new NodeCache(client, "/test/child/id"); nodeCache.getListenable().addListener(new NodeCacheListener() { //只要节点发生变化,无论变化多少次,每次变化都会触发这里nodeChanged的调用 public void nodeChanged() throws Exception { Stat stat = client.checkExists().forPath("/test/child/id"); if (stat != null) { byte[] dataBytes = client.getData().forPath("/test/child/id"); System.out.println("节点数据发生了变化:" + new String(dataBytes)); } else { System.out.println("节点被删除"); } } }); nodeCache.start(); }}
复制代码


12.基于 Curator 的 Leader 选举机制的实现源码

 

利用 Curator 的 CRUD+ 监听回调机制,就能满足大部分系统使用 zk 的场景了。需要注意的是:如果使用原生的 zk 去注册监听器来监听节点或者子节点,当节点或子节点发生了对应的事件,会通知客户端一次,但是下一次再有对应的事件就不会通知了。使用 zk 原生的 API 时,客户端需要每次收到事件通知后,重新注册监听器。然而 Curator 的 PathCache + NodeCache,会自动重新注册监听器。

 

(1)第一种 Leader 选举机制 LeaderLatch 的源码


Curator 客户端会通过创建临时顺序节点的方式来竞争成为 Leader 的,LeaderLatch 这种 Leader 选举的实现方式与分布式锁的实现几乎一样。

 

每个 Curator 客户端创建完临时顺序节点后,就会对/leader/latch 目录调用 getChildren()方法来获取里面所有的子节点,调用 getChildren()方法的结果会通过 backgroundCallback 回调进行通知,接着客户端便对获取到的子节点进行排序来判断自己是否是第一个子节点。

 

如果客户端发现自己是第一个子节点,那么就是 Leader。如果客户端发现自己不是第一个子节点,就对上一个节点添加一个监听器。在添加监听器时,会使用 getData()方法获取自己的上一个节点,getData()方法执行成功后会调用 backgrondCallback 回调。

 

当上一个节点对应的客户端释放了 Leader 角色,上一个节点就会消失,此时就会通知第二个节点对应的客户端,执行 getData()方法添加的监听器。

 

所以如果 getData()方法的监听器被触发了,即发现上一个节点不存在了,客户端会调用 getChildren()方法重新获取子节点列表,判断是否是 Leader。

 

注意:使用 getData()代替 exists(),可以避免不必要的 Watcher 造成的资源泄露。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        final CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.getConnectionStateListenable().addListener(new ConnectionStateListener() {            public void stateChanged(CuratorFramework client, ConnectionState newState) {                switch (newState) {                    case LOST:                        //当Leader与zk断开时,需要暂停当前Leader的工作                }            }        });        client.start();        System.out.println("已经启动Curator客户端,完成zk的连接");
LeaderLatch leaderLatch = new LeaderLatch(client, "/leader/latch"); leaderLatch.start(); leaderLatch.await();//阻塞等待直到当前客户端成为Leader Boolean hasLeaderShip = leaderLatch.hasLeadership(); System.out.println("是否成为Leader: " + hasLeaderShip); }}
public class LeaderLatch implements Closeable { private final WatcherRemoveCuratorFramework client; private final ConnectionStateListener listener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { handleStateChange(newState); } }; ... //Add this instance to the leadership election and attempt to acquire leadership. public void start() throws Exception { ... //对建立的zk连接添加Listener client.getConnectionStateListenable().addListener(listener); reset(); ... } @VisibleForTesting void reset() throws Exception { setLeadership(false); setNode(null); //callback作为成功创建临时顺序节点后的回调 BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { ... if (event.getResultCode() == KeeperException.Code.OK.intValue()) { setNode(event.getName()); if (state.get() == State.CLOSED) { setNode(null); } else { //成功创建临时顺序节点,需要通过getChildren()再去获取子节点列表 getChildren(); } } else { log.error("getChildren() failed. rc = " + event.getResultCode()); } } }; //创建临时顺序节点 client.create().creatingParentContainersIfNeeded().withProtection() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback) .forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } //获取子节点列表 private void getChildren() throws Exception { //callback作为成功获取子节点列表后的回调 BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getResultCode() == KeeperException.Code.OK.intValue()) { checkLeadership(event.getChildren()); } } }; client.getChildren().inBackground(callback).forPath(ZKPaths.makePath(latchPath, null)); } //检查自己是否是第一个节点 private void checkLeadership(List<String> children) throws Exception { if (debugCheckLeaderShipLatch != null) { debugCheckLeaderShipLatch.await(); } final String localOurPath = ourPath.get(); //对获取到的节点进行排序 List<String> sortedChildren = LockInternals.getSortedChildren(LOCK_NAME, sorter, children); int ourIndex = (localOurPath != null) ? sortedChildren.indexOf(ZKPaths.getNodeFromPath(localOurPath)) : -1; if (ourIndex < 0) { log.error("Can't find our node. Resetting. Index: " + ourIndex); reset(); } else if (ourIndex == 0) { //如果自己是第一个节点,则标记自己为Leader setLeadership(true); } else { //如果自己不是第一个节点,则对前一个节点添加监听 String watchPath = sortedChildren.get(ourIndex - 1); Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { if ((state.get() == State.STARTED) && (event.getType() == Event.EventType.NodeDeleted) && (localOurPath != null)) { //重新获取子节点列表 getChildren(); } } }; BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if (event.getResultCode() == KeeperException.Code.NONODE.intValue()) { reset(); } } }; //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak //使用getData()代替exists(),可以避免不必要的Watcher造成的资源泄露 client.getData().usingWatcher(watcher).inBackground(callback).forPath(ZKPaths.makePath(latchPath, watchPath)); } } ... //阻塞等待直到成为Leader public void await() throws InterruptedException, EOFException { synchronized(this) { while ((state.get() == State.STARTED) && !hasLeadership.get()) { wait();//Objetc对象的wait()方法,阻塞等待 } } if (state.get() != State.STARTED) { throw new EOFException(); } } //设置当前客户端成为Leader,并进行notifyAll()通知之前阻塞的线程 private synchronized void setLeadership(boolean newValue) { boolean oldValue = hasLeadership.getAndSet(newValue); if (oldValue && !newValue) { // Lost leadership, was true, now false listeners.forEach(new Function<LeaderLatchListener, Void>() { @Override public Void apply(LeaderLatchListener listener) { listener.notLeader(); return null; } }); } else if (!oldValue && newValue) { // Gained leadership, was false, now true listeners.forEach(new Function<LeaderLatchListener, Void>() { @Override public Void apply(LeaderLatchListener input) { input.isLeader(); return null; } }); } notifyAll();//唤醒之前执行了wait()方法的线程 }}
复制代码


(2)第二种 Leader 选举机制 LeaderSelector 的源码


通过判断是否成功获取到分布式锁,来判断是否竞争成为 Leader。正因为是通过持有分布式锁来成为 Leader,所以 LeaderSelector.takeLeadership()方法不能退出,否则就会释放锁。而一旦释放了锁,其他客户端就会竞争锁成功而成为新的 Leader。


public class Demo {    public static void main(String[] args) throws Exception {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        final CuratorFramework client = CuratorFrameworkFactory.newClient(            "127.0.0.1:2181",//zk的地址            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开            3000,//连接zk时的超时时间            retryPolicy        );        client.start();        System.out.println("已经启动Curator客户端,完成zk的连接");
LeaderSelector leaderSelector = new LeaderSelector( client, "/leader/election", new LeaderSelectorListener() { public void takeLeadership(CuratorFramework curatorFramework) throws Exception { System.out.println("你已经成为了Leader......"); //在这里干Leader所有的事情,此时方法不能退出 Thread.sleep(Integer.MAX_VALUE); } public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) { System.out.println("连接状态的变化,已经不是Leader......"); if (connectionState.equals(ConnectionState.LOST)) { throw new CancelLeadershipException(); } } } ); leaderSelector.start();//尝试和其他节点在节点"/leader/election"上进行竞争成为Leader Thread.sleep(Integer.MAX_VALUE); }}
public class LeaderSelector implements Closeable { private final CuratorFramework client; private final LeaderSelectorListener listener; private final CloseableExecutorService executorService; private final InterProcessMutex mutex; ... public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener) { Preconditions.checkNotNull(client, "client cannot be null"); PathUtils.validatePath(leaderPath); Preconditions.checkNotNull(listener, "listener cannot be null");
this.client = client; this.listener = new WrappedListener(this, listener); hasLeadership = false; this.executorService = executorService; //初始化一个分布式锁 mutex = new InterProcessMutex(client, leaderPath) { @Override protected byte[] getLockNodeBytes() { return (id.length() > 0) ? getIdBytes(id) : null; } }; } public void start() { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); Preconditions.checkState(!executorService.isShutdown(), "Already started"); Preconditions.checkState(!hasLeadership, "Already has leadership"); client.getConnectionStateListenable().addListener(listener); requeue(); } public boolean requeue() { Preconditions.checkState(state.get() == State.STARTED, "close() has already been called"); return internalRequeue(); } private synchronized boolean internalRequeue() { if (!isQueued && (state.get() == State.STARTED)) { isQueued = true; //将选举的工作作为一个任务交给线程池执行 Future<Void> task = executorService.submit(new Callable<Void>() { @Override public Void call() throws Exception { ... doWorkLoop(); ... return null; } }); ourTask.set(task); return true; } return false; } private void doWorkLoop() throws Exception { ... doWork(); ... } @VisibleForTesting void doWork() throws Exception { hasLeadership = false; try { //尝试获取一把分布式锁,获取失败会进行阻塞 mutex.acquire(); //执行到这一行代码,说明获取分布式锁成功 hasLeadership = true; try { if (debugLeadershipLatch != null) { debugLeadershipLatch.countDown(); } if (debugLeadershipWaitLatch != null) { debugLeadershipWaitLatch.await(); } //回调用户重写的takeLeadership()方法 listener.takeLeadership(client); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw e; } catch (Throwable e) { ThreadUtils.checkInterrupted(e); } finally { clearIsQueued(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw e; } finally { if (hasLeadership) { hasLeadership = false; boolean wasInterrupted = Thread.interrupted(); // clear any interrupted tatus so that mutex.release() works immediately try { //释放锁 mutex.release(); } catch (Exception e) { if (failedMutexReleaseCount != null) { failedMutexReleaseCount.incrementAndGet(); } ThreadUtils.checkInterrupted(e); log.error("The leader threw an exception", e); } finally { if (wasInterrupted) { Thread.currentThread().interrupt(); } } } } } ...}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18810719

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2025-04-01 加入

还未添加个人简介

评论

发布
暂无评论
zk基础—Curator的使用与剖析_架构_量贩潮汐·WholesaleTide_InfoQ写作社区