写点什么

ZooKeeper 实现生产 - 消费者队列,万字长文总结 Java 多进程

作者:Java高工P7
  • 2021 年 11 月 10 日
  • 本文字数:1362 字

    阅读完需:约 4 分钟

生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在 ZooKeeper 中,队列可以使用一个容器节点下创建多个子节点来实现;创建子节点时,CreateMode 使用 PERSISTENT_SEQUENTIAL,ZooKeeper 会自动在节点名称后面添加唯一序列号。EPHEMERAL_SEQUENTIAL 也有同样的特点,区别在于会话结束后是否会自动删除。


敲小黑板:*_SEQUENTIAL 是 ZooKeeper 的一个很重要的特性,分布式锁、选举制度都依靠这个特性实现的。

1????? 对前续代码的重构

之前的文章,我们已经用实现了 Watcher 和 Barrier,创建 ZooKeeper 连接的代码已经复制了一遍。后续还需要类似的工作,因此先对原有代码做一下重构,让代码味道干净一点。



以下是 process(WatchedEvent)的代码


final public void process(WatchedEvent event) {if (Event.EventType.None.equals(event.getType())) {// 连接状态发生变化 if (Event.KeeperState.SyncConnected.equals(event.getState())) {// 连接建立成功 connectedSemaphore.countDown();}} else if (Event.EventType.NodeCreated.equals(event.getType())) {processNodeCreated(event);} else if (Event.EventType.


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


NodeDeleted.equals(event.getType())) {processNodeDeleted(event);} else if (Event.EventType.NodeDataChanged.equals(event.getType())) {processNodeDataChanged(event);} else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {processNodeChildrenChanged(event);}}


以 ZooKeeperBarrier 为例,看看重构之后的构造函数和监听 Event 的代码


ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)throws IOException {super(address);this.tableSerial = createRootNode(tableSerial);this.tableCapacity = tableCapacity;this.customerName = customerName;}protected void processNodeChildrenChanged(WatchedEvent event) {log.info("{} 接收到了通知 : {}", customerName, event.getType());// 子节点有变化 synchronized (mutex) {mutex.notify();}}

2 队列的生产者

生产者的关键代码


String elementName = queueName + "/element";ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;getZooKeeper().create(elementName, value, ids, createMode);


注意,重点是 PERSISTENT_SEQUENTIAL,PERSISTENT 是表示永久存储直到有命令删除,SEQUENTIAL 表示自动在后面添加自增的唯一序列号。这样,尽管 elementName 都一样,但实际生成的 zNode 名字在 “element”后面会添加格式为 %010d 的 10 个数字,如 0000000001。如一个完整的 zNode 名可能为/queue/element0000000021。

3 队列的消费者

消费者尝试从子节点列表获取 zNode 名最小的一个子节点,如果队列为空则等待 NodeChildrenChanged 事件。关键代码


/** 队列的同步信号 */


private static Integer queueMutex = Integer.valueOf(1);@Overrideprotected void processNodeChildrenChanged(WatchedEvent event) {synchronized (queueMutex) {queueMutex.notify();

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
ZooKeeper实现生产-消费者队列,万字长文总结Java多进程