写点什么

Java 并发源码:阻塞队列实现之 DelayQueue 源码解析

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:2290 字

    阅读完需:约 8 分钟

=============


队中的元素必须实现 Delayed 接口【Delay 接口又继承了 Comparable,需要实现 compareTo 方法】,每个元素都需要指明过期时间,通过 getDelay(unit)获取元素剩余时间【剩余时间 = 到期时间 - 当前时间】。


每次向优先队列中添加元素时根据 compareTo 方法作为排序规则,当然我们约定一下,默认 q.peek()出来的就是最先过期的元素。


public interface Delayed extends Comparable<Delayed> {


// 返回剩余时间


long getDelay(TimeUnit unit);


}


public interface Comparable<T> {


// 定义比较方法


public int compareTo(T o);


}


Delayed 元素案例


===============


学习了 Delayed 接口之后,我们看一个实际的案例,加深印象,源于:《Java 并发编程之美》。


static class DelayedElement implements Delayed {


private final long delayTime; // 延迟时间


private final long expire; // 到期时间


private final String taskName; // 任务名称


public DelayedElement (long delayTime, String taskName) {


this.delayTime = delayTime;


this.taskName = taskName;


expire = now() + delayTime;


}


final long now () {


return System.currentTimeMillis();


}


// 剩余时间 = 到期时间 - 当前时间


@Override


public long getDelay (TimeUnit unit) {


return unit.convert(expire - now(), TimeUnit.MILLISECONDS);


}


@Override


public int compareTo (Delayed o) {


return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));


}


@Override


public String toString () {


final StringBuilder res = new StringBuilder("DelayedElement [ ");


res.append("delay = ").append(delayTime);


res.append(", expire = ").append(expire);


res.append(", taskName = '").append(taskName).append(''');


res.append(" ] ");


return res.toString();


}


}


public static void main (String[] args) {


// 创建 delayQueue 队列


DelayQueue<DelayedElement> delayQueue = new DelayQueue<>();


// 创建延迟任务


Random random = new Random();


for (int i = 0; i < 10; i++) {


DelayedElement element = new DelayedElement(random.nextInt(500), "task: " + i);


delayQueue.offer(element);


}


// 依次取出任务并打印


DelayedElement ele = null;


try {


for (; ; ) {


while ((ele = delayQueue.take()) != null) {


System.out.println(ele);


}


}


} catch (InterruptedException ex) {


ex.printStackTrace();


}


}


// 打印结果


DelayedElement [ delay = 2, expire = 1611995426061, taskName = 'task: 4' ]


DelayedElement [ delay = 52, expire = 1611995426111, taskName = 'task: 2' ]


DelayedElement [ delay = 80, expire = 1611995426139, taskName = 'task: 5' ]


DelayedElement [ delay = 132, expire = 1611995426191, taskName = 'task: 0' ]


DelayedElement [ delay


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


= 174, expire = 1611995426233, taskName = 'task: 9' ]


DelayedElement [ delay = 175, expire = 1611995426234, taskName = 'task: 7' ]


DelayedElement [ delay = 326, expire = 1611995426385, taskName = 'task: 3' ]


DelayedElement [ delay = 447, expire = 1611995426506, taskName = 'task: 8' ]


DelayedElement [ delay = 452, expire = 1611995426511, taskName = 'task: 1' ]


DelayedElement [ delay = 486, expire = 1611995426545, taskName = 'task: 6' ]


  • 实现了 compareTo 方法,定义比较规则为越早过期的排在队头。

  • 实现了 getDelay 方法,计算公式为:剩余时间 = 到期时间 - 当前时间。


构造器


=======


DelayQueue 构造器相比于前几个,就显得非常 easy 了。


public DelayQueue() {}


public DelayQueue(Collection<? extends E> c) {


this.addAll(c);


}


void put(E e)


=================


因为 DelayQueue 是无界队列,不会因为边界问题产生阻塞,因此 put 操作和 offer 操作是一样的。


public void put(E e) {


offer(e);


}


public boolean offer(E e) {


// 获取独占锁


final ReentrantLock lock = this.lock;


lock.lock();


try {


// 加入优先队列里


q.offer(e);


// 判断堆顶元素是不是刚刚插入的元素


// 如果判断为 true,说明当前这个元素是将最先过期


if (q.peek() == e) {


// 重置 leader 线程为 null


leader = null;


// 激活 available 变量条件队列中的一个线程


available.signal();


}


return true;


} finally {


lock.unlock();


}


}


E take()


============


take 方法将会获取并移除队列里面延迟时间过期的元素?,如果队列里面没有过期元素则陷入等待。


public E take() throws InterruptedException {


// 获取独占锁


final ReentrantLock lock = this.lock;


lock.lockInterruptibly();


try {


for (;;) {


// 瞅一瞅谁最快过期


E first = q.peek();


// 队列为空,则将当前线程置入 available 的条件队列中,直到里面有元素


if (first == null)


available.await();


else {


// 看下还有多久过期


long delay = first.getDelay(NANOSECONDS);


// 哇,已经过期了,就移除它并返回


if (delay <= 0)


return q.poll();


first = null; // don't retain ref while waiting


// leader 不为 null 表示其他线程也在执行 take


// 则将当前线程置入 available 的条件队列中


if (leader != null)


available.await();


else {


// 如果 leader 为 null,则选择当前线程作为 leader 线程


Thread thisThread = Thread.currentThread();


leader = thisThread;


try {


// 等待 delay 时间,时间到之后,会出条件队列,继续竞争锁


available.awaitNanos(delay);


} finally {


if (leader == thisThread)


leader = null;


}

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
Java并发源码:阻塞队列实现之DelayQueue源码解析