一、前言
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,所以其内部的数据结构和ThreadPoolExecutor基本一样,并在其基础上增加了按时间调度执行任务的功能,分为延迟执行任务和周期性执行任务。
二、构造函数
ScheduledThreadPoolExecutor的构造函数只能传 3 个参数corePoolSize、ThreadFactory、RejectedExecutionHandler,默认maximumPoolSize为Integer.MAX_VALUE。
工作队列是高度定制化的延迟阻塞队列DelayedWorkQueue,其实现原理和DelayQueue基本一样,核心数据结构是二叉最小堆的优先队列,队列满时会自动扩容,所以offer操作永远不会阻塞,maximumPoolSize也就用不上了,所以线程池中永远会保持至多有corePoolSize个工作线程正在运行。
 public ScheduledThreadPoolExecutor(int corePoolSize,                                   ThreadFactory threadFactory,                                   RejectedExecutionHandler handler) {    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,          new DelayedWorkQueue(), threadFactory, handler);}
   复制代码
 三、DelayedWorkQueue 延迟阻塞队列
1、基本架构
DelayedWorkQueue的实现原理中规中矩,内部维护了一个以RunnableScheduledFuture类型数组实现的最小二叉堆,初始容量是 16,使用ReentrantLock和Condition实现生产者和消费者模式。
 static class DelayedWorkQueue extends AbstractQueue<Runnable>    implements BlockingQueue<Runnable> {        private static final int INITIAL_CAPACITY = 16;    private RunnableScheduledFuture<?>[] queue =        new RunnableScheduledFuture<?>[INITIAL_CAPACITY];    private final ReentrantLock lock = new ReentrantLock();    private int size = 0;    private Thread leader = null;    private final Condition available = lock.newCondition();}
   复制代码
 2、offer 添加元素
ScheduledThreadPoolExecutor提交任务时调用的是DelayedWorkQueue.add,而add、put等一些对外提供的添加元素的方法都调用了offer,其基本流程如下:
 public boolean offer(Runnable x) {    if (x == null)        throw new NullPointerException();    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;    final ReentrantLock lock = this.lock;    lock.lock();    try {        int i = size;        if (i >= queue.length)            //扩容            grow();        size = i + 1;        if (i == 0) {            //如果是入的是第一个元素,不需要堆化            queue[0] = e;            setIndex(e, 0);        } else {            //堆化            siftUp(i, e);        }        if (queue[0] == e) {            //如果堆顶元素刚好是入队列的元素,则唤醒take            leader = null;            available.signal();        }    } finally {        lock.unlock();    }    return true;}
   复制代码
 
如图为offer基本流程图:
(1)扩容 grow
可以看到,当队列满时,不会阻塞等待,而是继续扩容。新容量newCapacity在旧容量oldCapacity的基础上扩容 50%(oldCapacity >> 1相当于oldCapacity /2)。最后Arrays.copyOf,先根据newCapacity创建一个新的空数组,然后将旧数组的数据复制到新数组中。
 private void grow() {    int oldCapacity = queue.length;    int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%    if (newCapacity < 0) // overflow        newCapacity = Integer.MAX_VALUE;    queue = Arrays.copyOf(queue, newCapacity);}
   复制代码
 (2)向上堆化 siftUp
新添加的元素先会加到堆底,然后一步步和上面的父亲节点比较,若小于父亲节点则和父亲节点互换位置,循环比较直至大于父亲节点才结束循环。
 private void siftUp(int k, RunnableScheduledFuture<?> key) {    while (k > 0) {        //找到父亲节点        int parent = (k - 1) >>> 1;        RunnableScheduledFuture<?> e = queue[parent];        if (key.compareTo(e) >= 0)            // 添加的元素 大于父亲节点,则结束循环            break;        //添加的元素小于父亲节点,则位置互换        queue[k] = e;        setIndex(e, k);        k = parent;    }    queue[k] = key;    setIndex(key, k);}
   复制代码
 
如下图为siftUp向上堆化过程图:
3、take 消费元素
Worker工作线程启动后就会循环消费工作队列中的元素,因为ScheduledThreadPoolExecutor的keepAliveTime=0,所以消费任务其只调用了DelayedWorkQueue.take。take 基本流程如下:
- 首先获取可中断锁,判断堆顶元素是否是空,空的则阻塞等待- available.await()。
 
- 堆顶元素不为空,则获取其延迟执行时间- delay,- delay <= 0说明到了执行时间,出队列- finishPoll。
 
- delay > 0还没到执行时间,判断- leader线程是否为空,不为空则说明有其他 take 线程也在等待,当前 take 将无限期阻塞等待。
 
- leader线程为空,当前 take 线程设置为- leader,并阻塞等待- delay时长。
 
- 当前 leader 线程等待 delay 时长自动唤醒护着被其他 take 线程唤醒,则最终将- leader设置为- null。
 
- 再循环一次判断- delay <= 0出队列。
 
- 跳出循环后判断 leader 为空并且堆顶元素不为空,则唤醒其他 take 线程,最后是否锁。 
 public RunnableScheduledFuture<?> take() throws InterruptedException {    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {        for (;;) {            RunnableScheduledFuture<?> first = queue[0]; //取出堆顶元素            if (first == null)                //堆为空,等待                available.await();            else {                long delay = first.getDelay(NANOSECONDS);                if (delay <= 0)                    //到了执行时间,出队列                    return finishPoll(first);                first = null; // don't retain ref while waiting                //还没到执行时间                if (leader != null)                    //此时若有其他take线程在等待,当前take将无限期等待                    available.await();                else {                    Thread thisThread = Thread.currentThread();                    leader = thisThread;                    try {                        available.awaitNanos(delay);                    } finally {                        if (leader == thisThread)                            leader = null;                    }                }            }        }    } finally {        if (leader == null && queue[0] != null)            available.signal();        lock.unlock();    }}
   复制代码
 
如下图为 take 基本流程图:
(1)take 线程阻塞等待
可以看出这个生产者 take 线程会在两种情况下阻塞等待:
(2)finishPoll 出队列
堆顶元素delay<=0,执行时间到,出队列就是一个向下堆化的过程siftDown。
 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {    int s = --size;    RunnableScheduledFuture<?> x = queue[s];    queue[s] = null;    if (s != 0)        siftDown(0, x);    setIndex(f, -1);    return f;}
   复制代码
 (3)siftDown 向下堆化
由于堆顶元素出队列后,就破坏了堆的结构,需要组织整理下,将堆尾元素移到堆顶,然后向下堆化:
- 从堆顶开始,父亲节点与左右子节点中较小的孩子节点比较(左孩子不一定小于右孩子)。 
- 父亲节点小于等于较小孩子节点,则结束循环,不需要交换位置。 
- 若父亲节点大于较小孩子节点,则交换位置。 
- 继续向下循环判断父亲节点和孩子节点的关系,直到父亲节点小于等于较小孩子节点才结束循环。 
 private void siftDown(int k, RunnableScheduledFuture<?> key) {    //k = 0, key = queue[size-1]    //无符号右移,相当于size/2    int half = size >>> 1;    while (k < half) {        //只需要比较一半        //找到左孩子节点        // child = 2k + 1        int child = (k << 1) + 1;        RunnableScheduledFuture<?> c = queue[child];        //右孩子节点        int right = child + 1;        //比较左右孩子大小        if (right < size && c.compareTo(queue[right]) > 0)            //c左孩子大于右孩子,则将有孩子赋值给左孩子            c = queue[child = right];        //比较key和孩子c        if (key.compareTo(c) <= 0)            //key小于等于c,则结束循环            break;        //key大于孩子c,则key与孩子交换位置        queue[k] = c;        setIndex(c, k);        k = child;    }    queue[k] = key;    setIndex(key, k);}
   复制代码
 
代码中使用移位运算,需要说明:
如下图为 siftDown 向下堆化过程图:
(4)leader 线程
leader线程的设计,是Leader-Follower模式的变种,旨在于为了不必要的时间等待。当一个take线程变成leader线程时,只需要等待下一次的延迟时间,而不是leader线程的其他take线程则需要等leader线程出队列了才唤醒其他take线程。
4、remove 删除指定元素
删除指定元素一般用于取消任务时,任务还在阻塞队列中,则需要将其删除。当删除的元素不是堆尾元素时,需要做堆化处理。
 public boolean remove(Object x) {    final ReentrantLock lock = this.lock;    lock.lock();    try {        int i = indexOf(x);        if (i < 0)            return false;        //维护heapIndex        setIndex(queue[i], -1);        int s = --size;        RunnableScheduledFuture<?> replacement = queue[s];        queue[s] = null;        if (s != i) {            //删除的不是堆尾元素,则需要堆化处理            //先向下堆化            siftDown(i, replacement);            if (queue[i] == replacement)                //若向下堆化后,i位置的元素还是replacement,说明四无需向下堆化的,                //则需要向上堆化                siftUp(i, replacement);        }        return true;    } finally {        lock.unlock();    }}
   复制代码
 四、总结
- DelayedWorkQueue添加元素满了之后会自动扩容原来容量的 1/2,即永远不会阻塞,最大扩容可达- Integer.MAX_VALUE,所以线程池中至多有- corePoolSize个工作线程正在运行。。
 
- DelayedWorkQueue消费元素 take,在堆顶元素为空和 delay >0 时,阻塞等待。
 
- DelayedWorkQueue是一个生产永远不会阻塞,消费可以阻塞的生产者消费者模式。
 
- DelayedWorkQueue有一个 leader 线程的变量,是- Leader-Follower模式的变种。当一个- take线程变成- leader线程时,只需要等待下一次的延迟时间,而不是- leader线程的其他- take线程则需要等- leader线程出队列了才唤醒其他- take线程。
 
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!
评论