写点什么

【分布式技术专题】「Zookeeper 系列」为大家介绍一下 Zookeeper 的"开发伴侣"—Curator-Framework(组件篇)

作者:浩宇天尚
  • 2022 年 1 月 12 日
  • 本文字数:4613 字

    阅读完需:约 15 分钟

【分布式技术专题】「Zookeeper系列」为大家介绍一下 Zookeeper 的"开发伴侣"—Curator-Framework(组件篇)

Curator-Framework


Curator-Framework 是 ZooKeeper Client 更高的抽象 API,最佳核心的功能就是自动连接管理:


  1. 当 ZooKeeper 客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明

  2. 监控节点数据变化事件 NodeDataChanged,需要时调用 updateServerList()方法

  3. Curator recipes 自动移除监控

CuratorFramework 版本

目前 Curator 有 2.x.x 和 3.x.x 两个系列的版本,支持不同版本的 Zookeeper。其中 Curator 2.x.x 兼容 Zookeeper 的 3.4.x 和 3.5.x。而 Curator 3.x.x 只兼容 Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch 删除等新特性。

更加清晰的 API

简化了 ZooKeeper 原生的方法, 事件等, 提供流式 fluent 的接口,提供 Recipes 实现 : 选举,共享锁, 路径 cache, 分布式队列,分布式优先队列等。

maven 配置依赖

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --><dependency>    <groupId>org.apache.curator</groupId>    <artifactId>curator-recipes</artifactId>    <version>2.12.0</version></dependency>
复制代码

事务管理

/** 事务管理:碰到异常,事务会回滚 * 使用transaction()来控制事务 * @throws Exception */public void testTransaction() throws Exception{    //定义几个基本操作    CuratorOp createOp = client.transactionOp().create()            .forPath("/curator/one_path","some data".getBytes());    CuratorOp setDataOp = client.transactionOp().setData()            .forPath("/curator","other data".getBytes());    CuratorOp deleteOp = client.transactionOp().delete()            .forPath("/curator");    //事务执行结果    List<CuratorTransactionResult> results = client.transaction()            .forOperations(createOp,setDataOp,deleteOp);    //遍历输出结果    for(CuratorTransactionResult result : results){        System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());    }}//因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚
复制代码

监听器

Curator 提供了三种 Watcher(Cache)来监听结点的变化:


Path Cache:监视一个路径下


  • 1)孩子结点的创建

  • 2)删除

  • 3)以及结点数据的更新。


产生的事件会传递给注册的 PathChildrenCacheListener。


  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

  • Tree Cache:Path Cache 和 Node Cache 的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。


/** * 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理 */ExecutorService pool = Executors.newFixedThreadPool(2);/** * 监听数据节点的变化情况 */final NodeCache nodeCache = new NodeCache(client, "/zk-huey/cnode", false);    nodeCache.start(true);            nodeCache.getListenable().addListener(            new NodeCacheListener() {    @Override    public void nodeChanged() throws Exception {        System.out.println("Node data is changed, new data: " +        new String(nodeCache.getCurrentData().getData()));    }}, pool);/** * 监听子节点的变化情况 */final PathChildrenCache childrenCache = new PathChildrenCache(client, "/zk-huey", true);        childrenCache.start(StartMode.POST_INITIALIZED_EVENT);        childrenCache.getListenable().addListener(        new PathChildrenCacheListener() {        @Override        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {            switch (event.getType()) {                case CHILD_ADDED:                    System.out.println("CHILD_ADDED: " + event.getData().getPath());                    break;                case CHILD_REMOVED:                    System.out.println("CHILD_REMOVED: " + event.getData().getPath());                    break;                case CHILD_UPDATED:                    System.out.println("CHILD_UPDATED: " + event.getData().getPath());                    break;                default:                    break;                }            }            },        pool        );        client.setData().forPath("/zk-huey/cnode", "world".getBytes());        Thread.sleep(10 * 1000);        pool.shutdown();        client.close();
复制代码

分布式锁思路

最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在 rebuild 缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。


下面的程序会启动两个线程 t1 和 t2 去争夺锁,拿到锁的线程会占用 5 秒。运行多次可以观察到,有时是 t1 先拿到锁而 t2 等待,有时又会反过来。Curator 会用我们提供的 lock 路径的结点作为全局锁,每次获得锁时会生成这种串,释放锁时清空数据。


import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.locks.InterProcessMutex;import org.apache.curator.retry.RetryNTimes;import java.util.concurrent.TimeUnit;/** * Curator framework's distributed lock test. */public class CuratorDistrLockTest {    /** Zookeeper info */    private static final String ZK_ADDRESS = "192.168.1.100:2181";    private static final String ZK_LOCK_PATH = "/zktest";    public static void main(String[] args) throws InterruptedException {        // 1.Connect to zk        CuratorFramework client = CuratorFrameworkFactory.newClient(                ZK_ADDRESS,                new RetryNTimes(10, 5000)        );        client.start();        System.out.println("zk client start successfully!");        Thread t1 = new Thread(() -> {            doWithLock(client);        }, "t1");        Thread t2 = new Thread(() -> {            doWithLock(client);        }, "t2");        t1.start();        t2.start();    }    private static void doWithLock(CuratorFramework client) {        InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);        try {            if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {                System.out.println(Thread.currentThread().getName() + " hold lock");                Thread.sleep(5000L);                System.out.println(Thread.currentThread().getName() + " release lock");            }        } catch (Exception e) {            e.printStackTrace();        } finally {            try {                lock.release();            } catch (Exception e) {                e.printStackTrace();            }        }    }}
复制代码

Leader 选举

当集群里的某个服务 down 机时,我们可能要从 slave 结点里选出一个作为新的 master,这时就需要一套能在分布式环境中自动协调的 Leader 选举方法。Curator 提供了 LeaderSelector 监听器实现 Leader 选举功能。同一时刻,只有一个 Listener 会进入 takeLeadership()方法,说明它是当前的 Leader。


import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.leader.LeaderSelector;import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;import org.apache.curator.framework.state.ConnectionState;import org.apache.curator.retry.RetryNTimes;import org.apache.curator.utils.EnsurePath;/** * Curator framework's leader election test. * Output: *  LeaderSelector-2 take leadership! *  LeaderSelector-2 relinquish leadership! *  LeaderSelector-1 take leadership! *  LeaderSelector-1 relinquish leadership! *  LeaderSelector-0 take leadership! *  LeaderSelector-0 relinquish leadership!  *      ... */public class CuratorLeaderTest {    /** Zookeeper info */    private static final String ZK_ADDRESS = "192.168.1.100:2181";    private static final String ZK_PATH = "/zktest";    public static void main(String[] args) throws InterruptedException {        LeaderSelectorListener listener = new LeaderSelectorListener() {            @Override            public void takeLeadership(CuratorFramework client) throws Exception {                System.out.println(Thread.currentThread().getName() + " take leadership!");                // takeLeadership() method should only return when leadership is being relinquished.                Thread.sleep(5000L);                System.out.println(Thread.currentThread().getName() + " relinquish leadership!");            }            @Override            public void stateChanged(CuratorFramework client, ConnectionState state) {            }        };        new Thread(() -> {            registerListener(listener);        }).start();        new Thread(() -> {            registerListener(listener);        }).start();        new Thread(() -> {            registerListener(listener);        }).start();        Thread.sleep(Integer.MAX_VALUE);    }
private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); }}
复制代码


注意:当 Listener 从 takeLeadership()退出时就说明它放弃了“Leader 身份”,这时 Curator 会利用 Zookeeper 再从剩余的 Listener 中选出一个新的 Leader。autoRequeue()方法使放弃 Leadership 的 Listener 有机会重新获得 Leadership,如果不设置的话放弃了的 Listener 是不会再变成 Leader 的。

参考资料

https://www.cnblogs.com/qingyunzong/p/8666288.html

发布于: 刚刚阅读数: 3
用户头像

浩宇天尚

关注

🏆 InfoQ写作平台-签约作者 🏆 2020.03.25 加入

【个人简介】酷爱计算机科学、醉心编程技术、喜爱健身运动、热衷悬疑推理的“极客达人” 【技术格言】任何足够先进的技术都与魔法无异 【技术范畴】Java领域、Spring生态、MySQL专项、微服务/分布式体系和算法设计等

评论

发布
暂无评论
【分布式技术专题】「Zookeeper系列」为大家介绍一下 Zookeeper 的"开发伴侣"—Curator-Framework(组件篇)