写点什么

并发王者课 - 铂金 9:互通有无 -Exchanger 如何完成线程间的数据交换

发布于: 刚刚
并发王者课-铂金9:互通有无-Exchanger如何完成线程间的数据交换

欢迎来到《并发王者课》,本文是该系列文章中的第 22 篇,铂金中的第 9 篇


在前面的文章中,我们已经介绍了ReentrantLockCountDownLatchCyclicBarrierSemaphore等同步工具。在本文中,将为你介绍最后一个同步工具,即 Exchanger.


Exchanger 用于两个线程在某个节点时进行数据交换。在用法上,Exchanger 并不复杂,但是实现上会稍微有点费解。所以,考虑到 Exchanger 在平时使用的场景并不多,况且多数读者对一些“枯燥”的源码的耐受度有限(可能引起不适或烦躁等不良情绪,阻碍学习),本文将侧重讲它的使用和思想,对于源码不会过多展开,点到为止。

一、Exchanger 的使用场景

在峡谷中,铠和兰陵王都是擅长打野的英雄,各自对野怪的偏好也不完全相同。所以,为了能得到自己想要的野怪,他们经常会在峡谷的交易中心交换各自的猎物。


这一天,铠打到了一只棕熊,而兰陵王则收获了一只野狼,并且彼此都想要对方的野怪。于是,他们约定在峡谷交易中心交换双方的野怪,谁先到了就先等会。这个过程,可以用下面这幅图来表示:



在铠和兰陵王交换猎物的过程中,有三个点需要你留意:


  • 交换的双方有明确的交易地点(峡谷交易中心);

  • 交换的双方具有明确的交易对象(比如棕熊和野狼);

  • 谁先到了就等会儿(他们中总会有先来后到)。


如果用代码来实现的话,也是有多种方式可以选择,比如前面所学过的同步方法等。不过,虽然做也是可以做的,只是没那么方便。所以,接下来我们就用 Exchanger 来实现这一过程。


在下面的代码中,我们定义了一个exchanger,它就类似于峡谷交易中心,而它的类型Exchanger<WildMonster> 则明确表示交换的对象是野怪


接着,我们再定义两个线程,分别代表兰陵王。在其线程的内部,会通过前面定义的exchanger对象来和对方进行交换数据。交换完成后,他们彼此将获得对方的物品


 public static void main(String[] args) {     Exchanger<WildMonster> exchanger = new Exchanger<> (); // 定义交换地点和交换类型
Thread 铠 = newThread("铠", () -> { try { WildMonster wildMonster = new Bear("棕熊"); say("我手里有一只:" + wildMonster.getName()); WildMonster exchanged = exchanger.exchange(wildMonster); // 交换后将获得对方的物品 say("交易完成,我获得了:", wildMonster.getName(), "->", exchanged.getName()); } catch (InterruptedException e) { e.printStackTrace(); } });
Thread 兰陵王 = newThread("兰陵王", () -> { try { WildMonster wildMonster = new Wolf("野狼"); say("我手里有一只:" + wildMonster.getName()); WildMonster exchanged = exchanger.exchange(wildMonster); say("交易完成,我获得了:", wildMonster.getName(), "->", exchanged.getName()); } catch (InterruptedException e) { e.printStackTrace(); } }); 铠.start(); 兰陵王.start(); }
复制代码


下面是上面代码用到的内部类:


 @Data private static class WildMonster {     protected String name; }
private static class Wolf extends WildMonster { public Wolf(String name) { this.name = name; } }
private static class Bear extends WildMonster { public Bear(String name) { this.name = name; } }
复制代码


示例代码运行结果如下:


铠:我手里有一只:棕熊兰陵王:我手里有一只:野狼兰陵王:交易完成,我获得了:野狼->棕熊铠:交易完成,我获得了:棕熊->野狼
Process finished with exit code 0
复制代码


从结果中可以看到,铠用棕熊换到了野狼,而兰陵王则用野狼换到了棕熊,他们完成了交换。


以上就是 Exchanger 的用法,看起来还是非常简单的,事实上也确实很简单。在使用 Exchanger 的时候要注意下面几点:


  • 定义 Exchanger 对象,各线程通过这个对象完成交换

  • 在 Exchanger 对象中要定义类型,也就是这两个线程要交换什么

  • 线程在调用 Exchanger 进行交换时,要特别注意的是,先到的那个线程会原地等待另外一个线程的出现。比如,铠先到交换地点,可这时候兰陵王还没有到,那么铠会等待兰陵王的出现,除非超过设置的时间限制,比如兰陵王中途被妲己蹲了草丛。反之亦然,兰陵王先到也到等铠的出现。

二、Exchanger 的源码与实现

虽然理解 Exchanger 的思想很容易,了解其用法也很简单,但是若要理清它几百余行的源码却并非易事。其原因在于,槽是 Exchanger 中的核心概念和属性,Exchanger 中的数据交换分为单槽交换多槽交换,其中单槽交换源码简单,但多槽交换却很复杂。所以,下文对 Exchanger 源码的阐述以概括为主,不会对源码深究。如果你有兴趣,可以参考阅读这篇文章,作者对其源码的解读较为详细。

1. 核心构造

与其他同步工具不同的是,Exchanger 有且仅有一个构造函数。在这个构造中,也只初始化了一个对象participant.


public Exchanger() {    participant = new Participant();}
复制代码


从继承关系看,Participant 本质上是一个 ThreadLocal,而其中的 Node 则是线程的本地变量。


static final class Participant extends ThreadLocal<Node> {    public Node initialValue() {         return new Node();     }}
复制代码

2. 核心属性

Exchanger 有四个核心变量,如下所示。当然,除此之外,还有一些用以计算的其他变量。不过,为避免引入不必要的复杂度,本文暂不提及。


//ThreadLocal变量,每个线程都有自己的一个副本private final Participant participant;
//多槽位,高并发下使用,保存待匹配的Node实例private volatile Node[] arena;
//单槽位,arena未初始化时使用的保存待匹配的Node实例private volatile Node slot;
//初始值为0,当创建arena后会被赋值成SEQ,用来记录arena数组的可用最大索引,会随着并发的增大而增大直到等于最大值FULL,会随着并行的线程逐一匹配成功而减少恢复成初始值private volatile int bound;
复制代码


Node 的具体细节,注意其中的 item 和 match.


 @sun.misc.Contended static final class Node {        int index;              //arena的下标,多个槽位的时候使用        int bound;              // 上一次记录的Exchanger.bound        int collides;           // 记录的 CAS 失败数        int hash;               // 用于自旋        Object item;            //  这个线程的数据项        volatile Object match;  // 交换的数据        volatile Thread parked; //  当阻塞时,设置此线程,不阻塞的话会自旋    }
复制代码

3. 核心方法

// 交换数据// 如果一个线程达到后,会等待其他线程的到达(除非自己被中断)。然后,该线程会和到达的线程交换数据。// 如果线程在到达后,已经有其他线程在等待。那么,将会唤起该线程并交换数据。public V exchange(V x) throws InterruptedException {...}    //带有超时限制的交换public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {...}
复制代码


所以,从源码上看上文的示例,那么铠和兰陵王交换数据的过程应该是下面这样的:


小结

以上就是关于 Exchanger 的全部内容。在学习 Exchanger 时,要侧重理解它所要解决的问题场景,以及它的基本用法。对于其源码,当前阶段可以选择“不求甚解”,以降维的方式降低学习难度,日后再循序渐进理解。我在写本文时,也曾多次考虑是否要讲清楚源码,最终还是决定暂缓,毕竟现阶段理解它、学会它才是重点。


正文到此结束,恭喜你又上了一颗星✨


夫子的试炼


  • 使用 Exchanger 实现生产者与消费者。


延伸阅读与参考资料



关于作者


关注【技术八点半】,及时获取文章更新。传递有品质的技术文章,记录平凡人的成长故事,偶尔也聊聊生活和理想。早晨 8:30 推送作者品质原创,晚上 20:30 推送行业深度好文。


如果本文对你有帮助,欢迎点赞关注监督,我们一起从青铜到王者

发布于: 刚刚阅读数: 2
用户头像

微信公众号:【技术八点半】 2018.05.13 加入

关注公众号【技术八点半】,及时获取文章更新。传递有品质的技术文章,记录平凡人的成长故事,偶尔也聊聊生活和理想。早晨8:30推送作者品质原创,晚上20:30推送行业深度好文。

评论

发布
暂无评论
并发王者课-铂金9:互通有无-Exchanger如何完成线程间的数据交换