写点什么

大数据 -30 ZooKeeper Java-API 监听节点 创建、删除节点

作者:武子康
  • 2025-07-04
    美国
  • 本文字数:3459 字

    阅读完需:约 11 分钟

大数据-30 ZooKeeper Java-API 监听节点 创建、删除节点

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 07 月 02 日更新到:Java-61 深入浅出 分布式服务 一致性算法 Raft 多图详解 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • ZK 创建节点:永久、顺序、临时

  • ZK 读取节点:列出、查看、更新

  • ZK 删除节点

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123


ZooKeeper 简介

核心特性

  1. 分布式一致性保证

  2. 提供顺序一致性(所有更新请求按顺序执行)

  3. 原子性(更新要么成功要么失败)

  4. 单一系统镜像(无论连接到哪个服务器,客户端看到的数据视图都是一致的)

  5. 可靠性(一旦更新被应用,将保持到被下一次更新覆盖)

  6. 及时性(客户端在一定时间内能看到最新的数据)

  7. 数据模型

  8. 本质上是一个分布式的小文件存储系统,采用类似 Unix 文件系统的树形层次结构(称为 ZNode Tree)

  9. 每个节点(ZNode)可以存储少量数据(默认上限 1MB)

  10. 节点分为持久节点(PERSISTENT)和临时节点(EPHEMERAL),后者在会话结束后自动删除

  11. 监控机制(Watcher)

  12. 客户端可以注册监听特定节点的变化

  13. 当节点数据变更或子节点列表变化时会触发通知

  14. 采用一次触发机制,收到通知后需要重新注册

典型应用场景

  1. 统一命名服务

  2. 如 Dubbo 服务注册中心,服务提供者将服务地址注册到 ZooKeeper

  3. 消费者从 ZooKeeper 获取可用的服务列表

  4. 示例:/dubbo/com.example.Service/providers 目录下存储服务提供者 URL

  5. 分布式配置管理

  6. 如 Solr 集群配置同步,所有节点监听配置节点

  7. 配置变更时,管理员更新 ZooKeeper 上的配置数据

  8. 各节点收到变更通知后自动获取新配置

  9. 示例:/solr/configs/mycore 目录存储核心配置

  10. 分布式消息队列

  11. 实现发布/订阅模式(Pub/Sub)

  12. 生产者创建顺序节点作为消息

  13. 消费者监听父节点获取新消息

  14. 示例:/queue/msg-0000000001,/queue/msg-0000000002

  15. 分布式锁

  16. 实现互斥锁:多个客户端竞争创建同一个临时节点,成功创建的获得锁

  17. 实现共享锁:通过节点顺序特性实现读写锁

  18. 锁释放:会话结束自动删除临时节点或主动删除

  19. 集群管理

  20. 监控集群节点存活状态(通过临时节点)

  21. 选举主节点(通过节点序号最小的成为 Master)

  22. 示例:/cluster/node1(临时节点)自动消失表示节点下线

工作原理

ZooKeeper 集群是一个高可用的分布式协调服务,其核心架构和运行机制如下:


  1. 集群组成与节点数量


  • 典型部署包含 3 个、5 个或 7 个服务器节点(必须为奇数)

  • 奇数数量便于选举时形成多数派(quorum),如:

  • 3 节点集群可容忍 1 个节点故障(需要 2 个节点存活)

  • 5 节点集群可容忍 2 个节点故障(需要 3 个节点存活)

  • 每个节点都存储完整的数据副本


  1. 一致性协议


  • 采用 ZAB(ZooKeeper Atomic Broadcast)协议,该协议包含两个主要阶段:a) 选举阶段:当 Leader 失效时,剩余节点通过投票选出新 Leaderb) 广播阶段:Leader 将事务请求以提案形式广播给所有 Follower

  • 需要获得多数节点(N/2+1)的 ACK 才能提交事务

  • 所有写操作都通过 Leader 处理,读操作可由任意节点响应


  1. 请求处理流程


  • 客户端可以连接到集群中的任意节点

  • 对于写请求:

  • 接收节点会将请求转发给 Leader

  • Leader 生成事务提案并广播

  • 获得多数确认后提交并响应客户端

  • 对于读请求:

  • 可直接由接收节点本地响应(可能读到稍旧数据)

  • 可选 sync 操作确保读取最新数据


  1. 容错机制


  • 故障检测通过心跳机制实现

  • Leader 失效时会自动触发新的选举

  • 集群持续服务需要保持多数节点存活

  • 数据持久化到磁盘,重启后自动恢复


  1. 典型应用场景


  • 分布式锁服务

  • 配置管理中心

  • 命名服务

  • 集群成员管理

  • 选主服务


在实际生产环境中,通常会将 ZooKeeper 节点部署在不同的物理服务器或可用区,以避免单点故障。同时建议配置监控系统来跟踪节点健康状态和性能指标。

新建 Java 工程

这里就跳过了,新建一个 Maven 工程。

POM 文件

<dependency>    <groupId>org.apache.zookeeper</groupId>    <artifactId>zookeeper</artifactId>    <version>3.8.4</version></dependency><dependency>      <groupId>com.101tec</groupId>    <artifactId>zkclient</artifactId>    <version>0.11</version></dependency>
复制代码

创建回话

public class Test01 {
public static void main(String[] args) { ZkClient zkClient = new ZkClient("h121.wzk.icu:2181"); System.out.println("ZooKeeper session created."); }
}
复制代码


运行程序,结果如下:


创建节点

public class Test01 {
public static void main(String[] args) { ZkClient zkClient = new ZkClient("h121.wzk.icu:2181"); System.out.println("ZooKeeper session created."); // true 则可以递归创建目录 zkClient.createPersistent("/wzk-java/temp", true); System.out.println("ZooKeeper craete ZNode");
}
}
复制代码


运行程序,结果如下:


删除节点

public class Test01 {
public static void main(String[] args) { ZkClient zkClient = new ZkClient("h121.wzk.icu:2181"); System.out.println("ZooKeeper session created."); // true 则可以递归创建目录 zkClient.createPersistent("/wzk-java/temp", true); System.out.println("ZooKeeper create ZNode"); // 删除数据 zkClient.deleteRecursive("/wzk-java/temp"); System.out.println("ZooKeeper delete recursive"); }
}
复制代码


运行程序,结果如下:


监听节点

public class Test02 {
public static void main(String[] args) throws Exception { ZkClient zkClient = new ZkClient("h121.wzk.icu:2181"); // 监听器 不会对当前目录进行监控 只会监听子目录变化!!! zkClient.subscribeChildChanges("/wzk-data", new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println("parentPath: " + parentPath + ", " + currentChilds); } });
// 添加数据 zkClient.createPersistent("/wzk-data/test-data", true); Thread.sleep(1000);
// 删除数据 zkClient.deleteRecursive("/wzk-data/test-data"); Thread.sleep(1000); }
}
复制代码


运行程序,结果如下:


监听数据

public class Test03 {
public static void main(String[] args) throws Exception { ZkClient zkClient = new ZkClient("h121.wzk.icu:2181"); // 序列化 zkClient.setZkSerializer(new SerializableSerializer());
// 判断节点是否存在 final boolean exists = zkClient.exists("/wzk-data/test-data"); if (!exists) { // 不存在则创建出来 zkClient.createPersistent("/wzk-data/test-data", true); System.out.println("ZooKeeper create ZNode"); }
zkClient.subscribeDataChanges("/wzk-data/test-data", new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println("数据改变: " + dataPath + ", " + data); }
@Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println("数据删除: " + dataPath); } });
// 更新数据 出发监听器 final Object o = zkClient.readData("/wzk-data/test-data"); zkClient.writeData("/wzk-data/test-data", "更新了数据"); Thread.sleep(1000);
zkClient.deleteRecursive("/wzk-data/test-data"); Thread.sleep(1000); }
}
复制代码


运行程序,结果如下:



发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-30 ZooKeeper Java-API 监听节点 创建、删除节点_大数据_武子康_InfoQ写作社区