写点什么

大数据 -32 ZooKeeper 分布式锁 Java 附带案例 代码

作者:武子康
  • 2025-07-06
    美国
  • 本文字数:3384 字

    阅读完需:约 11 分钟

大数据-32 ZooKeeper 分布式锁 Java 附带案例 代码

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-29 - 字节跳动 DeerFlow 深度研究框斜体样式架 私有部署 测试上手 架构研究,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 07 月 02 日更新到:Java-61 深入浅出 分布式服务 一致性算法 Raft 多图详解 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了:


  • ZooKeeper 的 Leader 选举机制

  • ZooKeeper 的选举过程

  • ZooKeeper 的 ZAB 协议

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个 Hadoop 的学习环境,供我学习。


  • 2C4G 编号 h121

  • 2C4G 编号 h122

  • 2C2G 编号 h123

ZooKeeper 简介

核心特性

  1. 分布式一致性保证

  2. 提供顺序一致性(所有更新请求按顺序执行)

  3. 原子性(更新要么成功要么失败)

  4. 单一系统镜像(无论连接到哪个服务器,客户端看到的数据视图都是一致的)

  5. 可靠性(一旦更新被应用,将保持到被下一次更新覆盖)

  6. 及时性(客户端在一定时间内能看到最新的数据)

  7. 数据模型

  8. 本质上是一个分布式的小文件存储系统,采用类似 Unix 文件系统的树形层次结构(称为 ZNode Tree)

  9. 每个节点(ZNode)可以存储少量数据(默认上限 1MB)

  10. 节点分为持久节点(PERSISTENT)和临时节点(EPHEMERAL),后者在会话结束后自动删除

  11. 监控机制(Watcher)

  12. 客户端可以注册监听特定节点的变化

  13. 当节点数据变更或子节点列表变化时会触发通知

  14. 采用一次触发机制,收到通知后需要重新注册

典型应用场景

  1. 统一命名服务

  2. 如 Dubbo 服务注册中心,服务提供者将服务地址注册到 ZooKeeper

  3. 消费者从 ZooKeeper 获取可用的服务列表

  4. 示例:/dubbo/com.example.Service/providers 目录下存储服务提供者 URL

  5. 分布式配置管理

  6. 如 Solr 集群配置同步,所有节点监听配置节点

  7. 配置变更时,管理员更新 ZooKeeper 上的配置数据

  8. 各节点收到变更通知后自动获取新配置

  9. 示例:/solr/configs/mycore 目录存储核心配置

  10. 分布式消息队列

  11. 实现发布/订阅模式(Pub/Sub)

  12. 生产者创建顺序节点作为消息

  13. 消费者监听父节点获取新消息

  14. 示例:/queue/msg-0000000001,/queue/msg-0000000002

  15. 分布式锁

  16. 实现互斥锁:多个客户端竞争创建同一个临时节点,成功创建的获得锁

  17. 实现共享锁:通过节点顺序特性实现读写锁

  18. 锁释放:会话结束自动删除临时节点或主动删除

  19. 集群管理

  20. 监控集群节点存活状态(通过临时节点)

  21. 选举主节点(通过节点序号最小的成为 Master)

  22. 示例:/cluster/node1(临时节点)自动消失表示节点下线

分布式锁

出现问题 1(单机器)

  • 假设 Redis 里面的某个商品库存为 1,此时两个用户同时下单,其中一个下单请求执行到第 3 步,更新数据库的库存为 0,但是第 4 步还没执行。

  • 而另外一个用户下单执行到了第二步,发现库存还是 1,就会继续执行第 3 步。

  • 但是此时库存已经为 0 了,所以数据库没有限制,此时会出现超卖的问题。

解决方案 1

  • 用锁把 2、3、4 步锁住,让他们执行完后,另一个线程才能够继续执行。

  • 但是由于业务发展迅速,原来的单机已经不能够满足,此时增加一台机器后,会出现更严重的问题。

出现问题 2(多机器)

假设有两个订单同时执行,分别有两个机器执行,那么这两个请求就是可以同时执行了,这样就依然出现了超卖的问题。


解决方案 2

我们需要使用分布式锁来解决上面出现的问题。分布式锁的作用就是在整个系统中提供一个全局的、唯一的锁,在分布式系统中每个系统进行相关的操作时都需要获取到该锁,才能够执行相应的操作。

ZK 分布式锁

实现思路

  • 锁就是 ZK 指定目录下序号最小的临时节点,多个系统的多个线程都要在此目录下创建临时顺序节点,因为 ZK 会保证节点的顺序性,所以可以利用节点的顺序性进行锁判断。

  • 每个线程都是先创建临时顺序节点,然后获取当前目录下最小的节点(序号),判断最小节点是不是当前节点,如果是那么获取锁成功,如果不是则获取锁失败。

  • 获取锁失败的线程获取当前节点上一个临时顺序节点,并对此节点进行监听,当该节点删除时,代表释放了锁。

流程图

编写代码

LockTest

package icu.wzk.zk.demo02;
public class LockTest {
public static void main(String[] args) { for (int i = 0; i < 10; i ++) { // 启动10个 new Thread(new LockRunnable()).start(); } }
static class LockRunnable implements Runnable {
@Override public void run() { final ClientTest clientTest = new ClientTest(); clientTest.getLock(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } clientTest.deleteLock(); } }
}
复制代码

ClientTest

package icu.wzk.zk.demo02;
import org.I0Itec.zkclient.IZkDataListener;import org.I0Itec.zkclient.ZkClient;
import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;
public class ClientTest {
private ZkClient zkClient = new ZkClient("h121.wzk.icu:2181,h122.wzk.icu:2181,h123.wzk.icu:2181");
String beforeNodePath;
String currentNodePath;
CountDownLatch countDownLatch = null;
public ClientTest() { synchronized (ClientTest.class) { if (!zkClient.exists("/lock")) { zkClient.createPersistent("/lock"); } } }
public boolean tryGetLock() { if (null == currentNodePath || currentNodePath.isEmpty()) { currentNodePath = zkClient.createEphemeralSequential("/lock/", "lock"); } final List<String> childs = zkClient.getChildren("/lock"); Collections.sort(childs); final String minNode = childs.get(0); if (currentNodePath.equals("/lock/" + minNode)) { return true; } else { final int i = Collections.binarySearch(childs, currentNodePath.substring("/lock/".length())); String lastNodeChild = childs.get(i - 1); beforeNodePath = "/lock/" + lastNodeChild; } return false; }
public void waitForLock() { final IZkDataListener iZkDataListener = new IZkDataListener() { @Override public void handleDataChange(String dataPath, Object data) throws Exception { // }
@Override public void handleDataDeleted(String dataPath) throws Exception { countDownLatch.countDown(); } }; zkClient.subscribeDataChanges(beforeNodePath, iZkDataListener);
if (zkClient.exists(beforeNodePath)) { countDownLatch = new CountDownLatch(1); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }
zkClient.unsubscribeDataChanges(beforeNodePath, iZkDataListener); }
public void deleteLock() { if (zkClient != null) { zkClient.delete(currentNodePath); zkClient.close(); } }
public void getLock() { final String threadName = Thread.currentThread().getName(); if (tryGetLock()) { System.out.println(threadName + ": 获取到了锁!"); } else { System.out.println(threadName + ": 没有获取到锁!"); waitForLock(); // 自己调用自己 getLock(); } }

}
复制代码

运行结果


发布于: 刚刚阅读数: 3
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-32 ZooKeeper 分布式锁 Java 附带案例 代码_Java_武子康_InfoQ写作社区