前言
在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,如果用程序员的视角看,这就是一个个的定时任务,在日常的开发工作中也有很多的定时任务场景:
数仓系统凌晨进行的数据同步
订单 12 小时未支付的状态校验
rpc 调用超时时间的校验
缓存数据失效时间的延长
定时开启的促销活动
……
假如现在有一个任务需要 3s 后执行,你会如何实现?
简单点,直接一个线程的休眠,thread.sleep(3000),一行代码就能达到目的,但是性能嘛……,由于每个任务都需要一个单独的线程,当系统中存在大量任务,
任务调度
假如,现在有一个任务需要 3s 后执行,你会如何实现呢?
简单点,直接一个休眠,让线程 sleep 3s,不就达到目的了吗?但是性能嘛……,由于每个任务都需要一个单独的线程,在系统中存在大量任务时,这种方案的消耗是极其巨大的,那么如何实现高效的调度呢?大佬们低头看了一眼手表,一个算法出现了
时间轮的数据结构
如图所示,这就是时间轮的一个基础结构,一个存储了定时任务的环形队列,可以理解为一个时间钟,队列的每个节点称为时间槽,每个槽位又使用列表存储着需要执行的定时任务。和生活中的钟表运行机制一样,每隔固定的单位时间,就会从一个槽位跳到下一个槽位,就像秒针跳动了一次,再取出当前槽位的任务进行执行。假如固定单位时间为 1S,当前槽位位 2,如果需要插入一个 3S 后的任务,就会在槽位 5 的的列表里加上当前任务。等指针运行到第五个槽位时,取出任务执行就可以了。
时间轮的最大优势是在时间复杂度上的优势,一个任务简单的生命周期:
创建任务,插入到数据结构中。
查询任务,找到满足条件的任务
执行任务。
任务归档,从任务调度的列表中删出。
其中第三步的执行时间是固定的,所以 1,2,4 这三部就的时间复杂度就决定了整个任务调度流程的复杂度,而时间轮是链式存储结构,所以在增删和查询时,时间复杂度都是 0(1),其他常见的任务调度算法例如最小堆和红黑树以及跳表。
最小堆是一颗完全二叉树而且子节点的值总是大于等于父节点的值,所以在插入时候需要判断父节点的关系,它的时间添加操作时间复杂度是 O(logn),在任务执行时,只需要判断最顶节点就行,所以它的查询时间复杂度时哦 O(1)。
根据红黑树的特性已经被归纳法证明它的增加的时间复杂度是 O(logn),查找最小节点的时间复杂度位 O(h)。
跳表的的本质是实现二分查找法的有序链表,但是他有多个层级,和红黑树的高度值相似,它的时间复杂度也是 O(logn)
高级时间轮
如上图所示,如果一个刻度代表 1S,那么一个周期就是 1 分钟,但是如果我一个任务是在 3 分钟后执行呢,如果是在一个 12 小时后执行呢?
当然如果是单纯的增加环形链表的长度也是可以的,直接扩大到 3600*24,一天一个周期,直接放进来。但是还有更好的办法。
带轮次标记的任务
任务执行轮次的计算公式:((任务执行时间-当前时间)/固定单位时间)%槽位数量
根据槽位计算公式可以算出当前任务需要插入执行的轮次,我在任务上面加一个字段 round,当每次执行到该槽位时,就遍历该槽位的任务列表,每个任务的 round-1,取出来 round=0 的任务执行就行。
for(Task task:taskList){
int round= task.getRound();
round=(round-1);
task.setRound(round);
if(round==0){
doTask(task);
}
}
复制代码
如果任务间隔不是很大,看起来也是不错的一种解决方式。
但是工作中有很多任务,延迟执行的时间是很久以后的,例如延保履约服务成功之后会有一个 7 天自动完成的定时任务,甚至有一些几年后才会执行的任务,如果都用 round 来处理的话,那这个 round 将会变的非常大的一个数字,也会在任务列表中插入很多当前不需要执行的任务,如果每次都执行上面的逻辑,显然会浪费大量的资源。
多层时间轮
多层时间轮的核心思想是:
就上上图的水表,有很多小的表盘,但是每个表盘的刻度其实是不一样,又或者手表里的时分秒或者日历上的年月日。
针对时间复杂度的问题:不做遍历计算 round,只要到了当前槽位,就把任务列表的所有任务拿出来执行。
针对空间复杂度的问题:分层,每个层级的时间轮刻度不一样,多个时间轮协调工作。
如上图所示,第一次时间轮,每个刻度是 1ms,一轮是 20ms,第二个层时间轮的刻度是 20ms,一轮就是 400ms,第三层的刻度是 400ms,一轮就是 8000ms,每层的周期就等于 20ms *2 的 n 次方。这要使用多层级时间轮就可以很容易把任务区分开来。每当高层次时间轮到达当前节点,就把任务降级到低层级的时间轮上。对于 400ms 的时间轮来说,小于 1ms 和小于 399ms 的任务都是过期任务,只要不大于 400ms,都认为是过期任务。
代码实现的话,往上也有很多,最近比较火热的 POWER-JOB 的分布式调度框架就是才有的时间轮算法,粘贴下核心代码大家看下:
1.首先定义了一个任务接口
public interface TimerTask extends Runnable {
}
复制代码
2.调度中的任务对象
public interface TimerFuture {
/**
* 获取实际要执行的任务
* @return
*/
TimerTask getTask();
/**
* 取消任务
* @return
*/
boolean cancel();
/**
* 任务是否取消
* @return
*/
boolean isCancelled();
/**
* 任务是否完成
* @return
*/
boolean isDone();
}
复制代码
3.调度器接口
public interface Timer {
/**
* 调度定时任务
*/
TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);
/**
* 停止所有调度任务
*/
Set<TimerTask> stop();
}
复制代码
4.时间轮的实现
public class HashedWheelTimer implements Timer {
private final long tickDuration;
private final HashedWheelBucket[] wheel;
private final int mask;
private final Indicator indicator;
private final long startTime;
private final Queue<HashedWheelTimerFuture> waitingTasks = Queues.newLinkedBlockingQueue();
private final Queue<HashedWheelTimerFuture> canceledTasks = Queues.newLinkedBlockingQueue();
private final ExecutorService taskProcessPool;
public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
this(tickDuration, ticksPerWheel, 0);
}
/**
* 新建时间轮定时器
* @param tickDuration 时间间隔,单位毫秒(ms)
* @param ticksPerWheel 轮盘个数
* @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)
*/
public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
this.tickDuration = tickDuration;
// 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余
int ticksNum = CommonUtils.formatSize(ticksPerWheel);
wheel = new HashedWheelBucket[ticksNum];
for (int i = 0; i < ticksNum; i++) {
wheel[i] = new HashedWheelBucket();
}
mask = wheel.length - 1;
// 初始化执行线程池
if (processThreadNum <= 0) {
taskProcessPool = null;
}else {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("HashedWheelTimer-Executor-%d").build();
// 这里需要调整一下队列大小
BlockingQueue<Runnable> queue = Queues.newLinkedBlockingQueue(8192);
int core = Math.max(Runtime.getRuntime().availableProcessors(), processThreadNum);
// 基本都是 io 密集型任务
taskProcessPool = new ThreadPoolExecutor(core, 2 * core,
60, TimeUnit.SECONDS,
queue, threadFactory, RejectedExecutionHandlerFactory.newCallerRun("PowerJobTimeWheelPool"));
}
startTime = System.currentTimeMillis();
// 启动后台线程
indicator = new Indicator();
new Thread(indicator, "HashedWheelTimer-Indicator").start();
}
@Override
public TimerFuture schedule(TimerTask task, long delay, TimeUnit unit) {
long targetTime = System.currentTimeMillis() + unit.toMillis(delay);
HashedWheelTimerFuture timerFuture = new HashedWheelTimerFuture(task, targetTime);
// 直接运行到期、过期任务
if (delay <= 0) {
runTask(timerFuture);
return timerFuture;
}
// 写入阻塞队列,保证并发安全(性能进一步优化可以考虑 Netty 的 Multi-Producer-Single-Consumer队列)
waitingTasks.add(timerFuture);
return timerFuture;
}
@Override
public Set<TimerTask> stop() {
indicator.stop.set(true);
taskProcessPool.shutdown();
while (!taskProcessPool.isTerminated()) {
try {
Thread.sleep(100);
}catch (Exception ignore) {
}
}
return indicator.getUnprocessedTasks();
}
/**
* 包装 TimerTask,维护预期执行时间、总圈数等数据
*/
private final class HashedWheelTimerFuture implements TimerFuture {
// 预期执行时间
private final long targetTime;
private final TimerTask timerTask;
// 所属的时间格,用于快速删除该任务
private HashedWheelBucket bucket;
// 总圈数
private long totalTicks;
// 当前状态 0 - 初始化等待中,1 - 运行中,2 - 完成,3 - 已取消
private int status;
// 状态枚举值
private static final int WAITING = 0;
private static final int RUNNING = 1;
private static final int FINISHED = 2;
private static final int CANCELED = 3;
public HashedWheelTimerFuture(TimerTask timerTask, long targetTime) {
this.targetTime = targetTime;
this.timerTask = timerTask;
this.status = WAITING;
}
@Override
public TimerTask getTask() {
return timerTask;
}
@Override
public boolean cancel() {
if (status == WAITING) {
status = CANCELED;
canceledTasks.add(this);
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return status == CANCELED;
}
@Override
public boolean isDone() {
return status == FINISHED;
}
}
/**
* 时间格(本质就是链表,维护了这个时刻可能需要执行的所有任务)
*/
private final class HashedWheelBucket extends LinkedList<HashedWheelTimerFuture> {
public void expireTimerTasks(long currentTick) {
removeIf(timerFuture -> {
// processCanceledTasks 后外部操作取消任务会导致 BUCKET 中仍存在 CANCELED 任务的情况
if (timerFuture.status == HashedWheelTimerFuture.CANCELED) {
return true;
}
if (timerFuture.status != HashedWheelTimerFuture.WAITING) {
log.warn("[HashedWheelTimer] impossible, please fix the bug");
return true;
}
// 本轮直接调度
if (timerFuture.totalTicks <= currentTick) {
if (timerFuture.totalTicks < currentTick) {
log.warn("[HashedWheelTimer] timerFuture.totalTicks < currentTick, please fix the bug");
}
try {
// 提交执行
runTask(timerFuture);
}catch (Exception ignore) {
} finally {
timerFuture.status = HashedWheelTimerFuture.FINISHED;
}
return true;
}
return false;
});
}
}
private void runTask(HashedWheelTimerFuture timerFuture) {
timerFuture.status = HashedWheelTimerFuture.RUNNING;
if (taskProcessPool == null) {
timerFuture.timerTask.run();
}else {
taskProcessPool.submit(timerFuture.timerTask);
}
}
/**
* 模拟时针转动的线程
*/
private class Indicator implements Runnable {
private long tick = 0;
private final AtomicBoolean stop = new AtomicBoolean(false);
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public void run() {
while (!stop.get()) {
// 1. 将任务从队列推入时间轮
pushTaskToBucket();
// 2. 处理取消的任务
processCanceledTasks();
// 3. 等待指针跳向下一刻
tickTack();
// 4. 执行定时任务
int currentIndex = (int) (tick & mask);
HashedWheelBucket bucket = wheel[currentIndex];
bucket.expireTimerTasks(tick);
tick ++;
}
latch.countDown();
}
/**
* 模拟指针转动,当返回时指针已经转到了下一个刻度
*/
private void tickTack() {
// 下一次调度的绝对时间
long nextTime = startTime + (tick + 1) * tickDuration;
long sleepTime = nextTime - System.currentTimeMillis();
if (sleepTime > 0) {
try {
Thread.sleep(sleepTime);
}catch (Exception ignore) {
}
}
}
/**
* 处理被取消的任务
*/
private void processCanceledTasks() {
while (true) {
HashedWheelTimerFuture canceledTask = canceledTasks.poll();
if (canceledTask == null) {
return;
}
// 从链表中删除该任务(bucket为null说明还没被正式推入时间格中,不需要处理)
if (canceledTask.bucket != null) {
canceledTask.bucket.remove(canceledTask);
}
}
}
/**
* 将队列中的任务推入时间轮中
*/
private void pushTaskToBucket() {
while (true) {
HashedWheelTimerFuture timerTask = waitingTasks.poll();
if (timerTask == null) {
return;
}
// 总共的偏移量
long offset = timerTask.targetTime - startTime;
// 总共需要走的指针步数
timerTask.totalTicks = offset / tickDuration;
// 取余计算 bucket index
int index = (int) (timerTask.totalTicks & mask);
HashedWheelBucket bucket = wheel[index];
// TimerTask 维护 Bucket 引用,用于删除该任务
timerTask.bucket = bucket;
if (timerTask.status == HashedWheelTimerFuture.WAITING) {
bucket.add(timerTask);
}
}
}
public Set<TimerTask> getUnprocessedTasks() {
try {
latch.await();
}catch (Exception ignore) {
}
Set<TimerTask> tasks = Sets.newHashSet();
Consumer<HashedWheelTimerFuture> consumer = timerFuture -> {
if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
tasks.add(timerFuture.timerTask);
}
};
waitingTasks.forEach(consumer);
for (HashedWheelBucket bucket : wheel) {
bucket.forEach(consumer);
}
return tasks;
}
}
}
复制代码
作者:京东保险 陈建华
来源:京东云开发者社区
评论