前言
异步化设计在多核时代、多线程大行其道的今天使用场景十分广泛,几乎所以高性能框架都在Task设计中花费了大量精力,Task设计门槛相对较高,高内聚的同时还需要给用户提供简单易懂的对接接口,JDK1.5、1.8、9都分别针对Task相关领域提供了很多简单方便的API,使得Task编程接口变得越来越丰富,然而在业务中面向越来越多的API时,不同的业务还是需要将这些API二次封装,本篇站在中介的角度,根据Socket中台及在线课堂设计过程中所碰到的问题,思考如何优雅地为业务提供可高度扩展的Task设计。
低延迟是中台或是在线课堂通信核心技术指标,通信涉及到的链路长,环境不确定性强,常用的优化手段不仅限于数据批处理。数据包结构优化、序列化改进、网络加密、服务器改进。异步化设计是服务器低延迟技术改进方向之一。但要强调的是,低延迟指标往往是针对整体而非单个链路,异步思路朝着无阻塞方向在高并发情况下,提升整体的延迟指标数据。
目标
在讨论Task异步模型时,我们要讨论什么呢?
异步模型运行核心三要素:
执行者(executer)
任务(task)
预期(future可选)
Task异步模型在Socket中台使用场景:
AsyncEvent:异步事件
ActorBox:Actor模型(单线程执行器)
TimeoutFuture:带超时检测的异步Task(Heartbeat:心跳检测)
RPC:网关跨服务RPC、跨线程RPC
目标功能阶梯
将上述的目标进行分解得出一个目标实现的阶梯:
提交一个普通的异步任务
提交一个线程绑定的异步任务
提交一个带超时检测的异步任务
提交一个带超时且回执监听的异步任务
目标监控需求
无监控不生产,面向监控设计是一个硬性的上线标准而非可选标准,面向Task异步模型,在讨论监控时我们希望了解什么?
让我们回到三要素中看看如何设定监控指标,我们大致需要这些维度的数据。
执行者(executer)模型指标、线程池指标
任务(task)等待时间、执行时间、最大等待时间、最大执行时间、执行时间99线、95线
预期(future)成功次数、失败次数
目标设计要求
执行者是Task异步模型中的核心,不同的执行者有着其自身的工作方式,包括他擅长的和不擅长的,框架解决标准情况下的低门槛性问题,面对高性能场景,高度可扩展性是不可或缺的。
内聚
解耦
隔离性
扩展性
设计与实现
异步模型的设计最终要回到Executor这个接口,而整个java体系的异步化构建都由此展开。
public interface Executor { void execute(Runnable command);}
在设计与实现的道路上,异步化过3次重构和演进,接下来我们按照每一次的演进过程来介绍演进的设计思路。
初代异步化设计:原味线程池
在介绍初代异步化线程池设计时,先看看两个最常用的线程池,分别对应特定的使用场景。
固定核心线程池
Executors.newFixedThreadPool(int nThreads)
我们再看看默认的实现。
new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
自定义线程池
根据业务情况设定不同的参数是自定义线程池的标准玩法,也是Executor的基本实现ThreadPoolExecutor。
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
进阶传统玩法
异步线程池配合EventBus使用事件驱动来完成异步化调用,比如Guava的AsyncEventBus。
@Slf4j
public class DefaultAsyncEventBus {
private final AsyncEventBus eventBus;
private final BlockingQueue blockingQueue;
private final ThreadPoolExecutor executor;
public DefaultAsyncEventBus(int corePoolSize, int maxPoolSize) {
this.blockingQueue = new LinkedBlockingQueue<>(xxx);
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, xxx, TimeUnit.SECONDS, blockingQueue, ThreadPoolUtil.buildThreadFactory("DefaultAsyncEventBus", true), new ThreadPoolExecutor.AbortPolicy());
this.eventBus = new AsyncEventBus(executor, new SubscriberExceptionHandler() {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
log.error("异步消息队列异常: [subscribeMethod={}, event={} ]",context.getSubscriberMethod(), context.getEvent().toString(),exception);
}
});
}
* 注册事件
*/
public void register(Object object){
eventBus.register(object);
}
* 执行事件
* @param object
*/
public void post(Object object){
eventBus.post(object);
}
* 卸载事件
* @param object
*/
public void unRegister(Object object){
eventBus.unregister(object);
}
@PreDestroy
public void destroy() throws Exception {
ThreadPoolUtil.gracefulShutdown(executor, 3000);
}
}
线程池工厂
第一代线程池在玩法上不拘泥于某种固定参数的设定,加入了线程池工厂(Factory)用于多个种类线程池的创建,同时通过Manager针对所有线程池进行管理。
protected Executor get(ThreadPoolConfig config) {
String name = config.getName();
int corePoolSize = config.getCorePoolSize();
int maxPoolSize = config.getMaxPoolSize();
int keepAliveSeconds = config.getKeepAliveSeconds();
BlockingQueue<Runnable> queue = config.getQueue();
ThreadPoolExecutor executor = new DefaultExecutor(corePoolSize
, maxPoolSize
, keepAliveSeconds
, TimeUnit.SECONDS
, queue
, new NamedPoolThreadFactory(name)
, new DumpThreadRejectedHandler(config));
return executor;
}
标准的线程池管理器
如何对生产的线程池统一管理,将线程池注册到管理器统一交给线程池管理器完成生命周期和未来监控需求等维护入口。
public final class ThreadPoolManager {
private static final CommonExecutorFactory executorFactory = CommonExecutorFactory.getInstance();
private ThreadPoolManager() {
}
private static final Map<String, Executor> pools = new ConcurrentHashMap<>();
public static Executor getAsyncExecutor() {
return pools.computeIfAbsent(CommonExecutorFactory.MQ, s -> executorFactory.get(CommonExecutorFactory.MQ));
}
...
public static void register(String name, Executor executor) {
Objects.requireNonNull(name);
Objects.requireNonNull(executor);
pools.put(name, executor);
}
public static Map<String, Executor> getActivePools() {
return pools;
}
}
第一代线程池可以说是互联网的通用玩法,配合Eventbus在无状态服务场景下能够轻松完成整个系统同步转异步的解耦动作,但在有状态服务中却有着诸多的问题,有状态服务对于时序和状态的强一致性让整个异步的过程都需要考虑Full thread-safe问题,顺序性可以通过引入所谓的有序队列进行,线程安全也可以通过并发安全类进行控制,回到低延迟的目标,消除锁竞争是高性能并发的关键,于是异步化进入第二代设计。
一个最简单的有序的线程队列池
public DefaultPriorityExecutor() {
PriorityBlockingQueue queue = new PriorityBlockingQueue(xxx);
RejectedExecutionHandler reh = new ThreadPoolExecutor.AbortPolicy();
executor = new ThreadPoolExecutor(1, 1, xxx, TimeUnit.SECONDS, queue, ThreadPoolUtil.buildThreadFactory("DefaultPriorityExecutor", true), reh);
}
第二代异步化设计:单线程绑定
所谓单线程绑定,一个具备共同特征的对象对1个线程绑定,该共同特征往往是一个房间的对象,比如上课的用户,单线程绑定可以天然在模型设计上完成任务的线程安全和顺序性要求,业务透明也就意味着业务能够无需考虑并发方面问题。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
(图2-1)1v1线程模型图
Queue + FixThread
(图2-2)Nv1线程模型图
Netty EventLoop线程在JDK实现上事件模型就是用单线程进行事件监听。
第二代异步化设计解决了线程模型Full thread-safe问题,让业务在处理任务时无需考虑有序和线程安全问题。
第三代异步化设计:多线程绑定
Actor线程盒子模型(ActorBox)
Actor模型由Scale提出
private class Worker implements Runnable {
private Worker() {
}
@Override
public void run() {
for (Runnable work = ThreadBasedActor.this.getTask(); work != null; work = ThreadBasedActor.this.getTask()) {
try {
work.run();
} catch (Exception e) {
}
if (ThreadBasedActor.this.lock.decrementAndGet() == 0) break;
}
}
}
public static class ThreadBasedActor {
private final int threadHashed;
private final ExecutorService executorService;
@Getter
private final BlockingQueue<Runnable> queues;
private final AtomicInteger lock = new AtomicInteger();
private Worker worker = new Worker();
public ThreadBasedActor(ExecutorService executorService, final int threadHashed) {
this.executorService = executorService;
this.threadHashed = threadHashed;
this.queues = new LinkedTransferQueue<>();
}
... more constructor
public AtomicInteger getLock() {
return lock;
}
private Runnable getTask() {
return queues.poll();
}
* 提交任务到队列
*
* @return true, false
*/
public boolean submit(Runnable task) {
boolean success = this.queues.offer(task);
if (this.lock.getAndIncrement() == 0) {
executorService.execute(this.worker);
}
return success;
}
...
}
(图2-3)NvN线程模型图
进阶混合线程绑定(MixedActorBox)
一个房间多个线程池绑定:
* 动态绑定,设置一个开始索引,索引之前的特别兼容静态模型关系
*
* @param beginIndex
* @return
*/
public EventBusManager initWithDynamicActor(int beginIndex) {
if (beginIndex == 0) {
beginIndex = 1;
}
this.beginIndex = beginIndex;
threadBasedActors = Maps.newConcurrentMap();
for (int i = 0; i < beginIndex; i++) {
threadBasedActors.put(i, new ThreadBasedActor((ExecutorService) executor, i));
}
return this;
}
* 添加新的模型
*
* @param hash
* @return
*/
public EventBusManager addDynamicActor(int hash) {
threadBasedActors.put(this.beginIndex + hash, new ThreadBasedActor((ExecutorService) executor, hash));
return this;
}
3个补充
第三阶段异步化模型依旧在部分特殊化场景中无法很好的构建更加丰富、安全的框架模型,所以在第三代模型基础上提出了3个补充设计:
超时设计
异步化模型耗时公式:总时间=等待阶段时间+执行阶段时间
在不同的阶段对异步任务的Timeout监控和中断并不是一件凑效、简单的事情。
先来看看JDK9新增的CompletableFuture API Improvements改进点:
支持 delays 和 timeouts
提升了对子类化的支持
新的工厂方法
public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit)
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit)
JDK8及之前的版本看来需要自己进行一次Timeout封装。
public class TimeoutFutureTask<T> extends FutureTask {
@Getter
private final Callback callback;
@Getter
private final long futureId;
private final TimeoutFutureBus futureBus;
private AtomicBoolean CANCEL_STATE = new AtomicBoolean(Boolean.FALSE);
public TimeoutFutureTask(BusTask task, Callback callback, long futureId, TimeoutFutureBus futureBus) {
super(task, null);
this.futureId = futureId;
this.futureBus = futureBus;
this.callback = callback;
}
public void submitOnTimeout(int time, TimeUnit timeUnit) {
futureBus.put(this, time, timeUnit);
}
public TimeoutFutureTask cancel() {
if (this.cancel(true)) {
CANCEL_STATE.compareAndSet(false, true);
}
return futureBus.getAndRemove(this.futureId);
}
@Override
public void run() {
super.run();
}
public void timeout() {
cancel();
callback.onTimeout(CANCEL_STATE.get());
}
}
* 链式提交(支持Timeout操作)
* @param event
* @param callback
* @return
*/
public TimeoutFutureTask newTimeoutFuture(final IEvent event, final Callback callback) {
long futureId = FUTURE_ID.incrementAndGet();
TimeoutFutureTask task = new TimeoutFutureTask(new BusTask(event.getClass().getSimpleName(), () -> {
doSubmit(event, callback);
}), callback, futureId, futureBus);
long start = System.nanoTime();
runAsync(task,
(th) -> {
logger.error("submitAsync error: {}", event, th);
return null;
}, (t, h) -> {
futureBus.getAndRemove(futureId);
eventBusStat.handle(event.getClass().getSimpleName(), System.nanoTime() - start, true);
});
return task;
}
单元测试:
EventBusManager.getInstance().newTimeoutFuture(new TestEvent(), new Callback() {
@Override
public void onSuccess() {
System.out.println("onSuccess");
}
@Override
public void onFailure(Exception e) {
System.out.println("onFailure");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onTimeout(boolean isCancelSuccess) {
System.out.println("onTimeout, cancelResult:" + isCancelSuccess);
}
}).submitOnTimeout(1, TimeUnit.SECONDS);
回调设计
回调是异步模型设计中不可忽视的需求,主线程对工作现场进行任务派发后的反馈在跨进程或者跨线程调用上尤为频繁,回调设计需要建立在指定的异步模型上,否则无法共享异步模型对时序以及线程安全设计,针对第三代的线程模型,回调设计需要对上述模型进行功能增强。
跨线程RPC
public class ActorCaller {
public static <T> ActorFuture<T> newFutureCall(String name, Supplier<T> supplier, ActorBox actorBox) {
assert Objects.nonNull(actorBox) : "actorBox can not be null";
ActorFuture<T> request = new ActorFuture<T>(new Callable<T>() {
@Override
public T call() {
return supplier.get();
}
});
ActorTask task = new ActorTask(name, request);
actorBox.dispatch(task);
return request;
}
}
(图2-4)跨线程RPC时序图
跨进程RPC
跨进程RPC=进程RPC+TimeoutFuture+跨线程RPC
一个健壮的RPC框架在每个RPC阶段都需要经得起各种非预期情况下的考验。
(图2-5)跨服务时序图(省略通信部分)
优雅停机
不同的异步模型只需要考虑一个统一的优雅停机方案,在处理线程池优雅停机问题上使用JVM钩子进行shutdown的调用。
Runtime.getRuntime().addShutdownHock(new Thread(){xxx.destroy()})
public static void destroy() {
for (Executor executor : pools.values()) {
ThreadPoolUtil.gracefulShutdown(((ExecutorService)executor), 1, TimeUnit.MINUTES);
}
}
监控
监控是异步化设计不可或缺的一环,既是验证高性能成果的一次展示,也是发现问题的一个平台。
监控指标设定是监控环节的关键点,相对常规的异步监控,在第三代异步模型中,异步监控主要包括3个大类。
Task监控
* 可被拦截的BusTask,用于扩展监控
*/
public class InterceptableTask implements Runnable {
private final static Logger logger = LoggerFactory.getLogger(InterceptableTask.class);
private final Runnable runnable;
private final long bornTime;
private final CombineTaskInterceptor interceptor;
public InterceptableTask(Runnable runnable, CombineTaskInterceptor interceptor) {
this.runnable = runnable;
this.bornTime = System.nanoTime();
this.interceptor = interceptor;
}
@Override
public void run() {
Exception exception = null;
long start = System.nanoTime();
try {
if (Objects.nonNull(interceptor)) {
interceptor.preHandle(this);
}
runnable.run();
if (Objects.nonNull(interceptor)) {
interceptor.postHandle(this);
}
BusTaskStat.getInstance().addRecord(((BusTask)runnable).getName(), System.nanoTime() - start, true,start - bornTime);
} catch (Exception e) {
exception = e;
logger.error("InterceptableTask:{} run error!", runnable, e);
BusTaskStat.getInstance().addRecord(((BusTask)runnable).getName(), System.nanoTime() - start, false,start - bornTime);
} finally {
if (Objects.nonNull(interceptor)) {
try {
interceptor.afterCompletion(this, exception);
} catch (Exception e) {
}
}
}
}
}
Actor监控
private final String name;
private final int arraySize;
private AtomicInteger activeActor = new AtomicInteger(0);
private AtomicLong totalLockedNum = new AtomicLong(0);
private AtomicLong maxLockNum = new AtomicLong(0);
private AtomicLong maxLoopTime = new AtomicLong(0);
private AtomicLong maxSingleLockNum = new AtomicLong(0);
ExecutorService监控
for(Map.Entry<String, ThreadPoolExecutor> c: children.entrySet()) {
List<String> poolName = Arrays.asList(c.getKey());
ThreadPoolExecutor cValue = c.getValue();
activeCount.addMetric(poolName, cValue.getActiveCount());
queueSize.addMetric(poolName, cValue.getQueue().size());
completedTaskCount.addMetric(poolName, cValue.getCompletedTaskCount());
poolSize.addMetric(poolName, cValue.getPoolSize());
largestPoolSize.addMetric(poolName, cValue.getLargestPoolSize());
}
监控面板
(图3-1)Actor监控面板
(图3-2)Task监控面板
(图3-3)Executor监控面板
场景与使用
如何在高内聚的异步模型中为不同的业务提供足够优雅的API,回到最初在Socket中台中碰到的各种场景,如何快速构建异步线程模型并在运行时可以按需调用。
结合Event事件驱动类【默认】
异步Task派发类
TimeoutFuture类
RPC类
整个异步化设计模型在实际业务中的核心流程之一大致如下,更多情况下,异步化模型和事件驱动总是关联在一起:
SocketThread -> Dispatch -> Filter -> BizRoomThread
这其中的思路和Akka模型有着极大的相似之处:
有限状态机模式在Erlang design principles里面被很好描述出来.简而言之,它可以被视为一组关系:
State(S) x Event(E) -> Actions (A), State(S’)
这些关系描述为:
如果我们在状态S 和 时间E 发生,我们应该执行动作A 与转换到状态S’.
所以,在线课堂整体的异步化往往是由报文、事件、异步、行为、状态这些领域组成,是一个标准的CQRS架构模型。
(图4-1)CQPS架构模型
细节决定未来
关于下一代异步模型的思考随着业务类型的多样化,无锁设计和减少线程切换提升整体吞吐量的同时,对低延迟敏感型任务梳理是重中之重,随着越来越多的虚拟课堂的应用,未来高密度延迟类任务的提交对整个异步模型设计有着非常大的考验,如何快速、精准对延迟队列进行消费是未来的课题之一,同时,我们也会从优秀的框架中借鉴更好的无锁化设计,成为低延迟交互利器中的重要一员。
阻塞需要仔细的管理
在某些情况下阻塞操作不可避免,也就是令一个线程进入不定时间的休眠,等待一个外部事件发生。传统的RDBMS驱动或消息API就是例子,深层的原因通常是幕后发生了(网络)I/O。面对这一点,你可能倾向于仅仅用Future对象包装这个阻塞调用,并用跟此对象代替直接与IO之间的交互,但是这个策略实在是太简单了:当应用的负载增加时,你很可能会发现瓶颈所在,或耗尽内存,或线程过多。
引用下Akka的优秀设计思路
下面是“阻塞问题”恰当方案的不完全清单:
在一个角色(或由路由器管理的角色组)内部执行阻塞调用,确保配置一个足够大的线程池专门用于这一目的。
在一个Future对象内执行阻塞调用,确保此类调用在任意时间点内的上限(无限制的提交任务会耗尽你的内存或线程数)。
在一个Future对象内执行阻塞调用,提供一个线程池,这个线程池的线程上限要与运行应用程序的硬件相符。
专门用一个线程管理一组阻塞资源(比如说NIO选择器驱动多个通道),以角色消息的形式调度事件。
第一种方案可能尤其适用于单线程模型,比如数据库句柄传统上一次只能执行一个查询,并使用内部同步方式确保这一点。一个常见的模式是为N个角色创建一个路由器,每个角色包装一个数据库连接,而查询是发送到路由器的。数字N必须是最大吞吐量,而数字大小取决于在什么样的硬件上部署了哪种数据库管理系统(DBMS)。
其他优秀异步框架
在整个交互中台设计中,异步框架的建设不仅限于上述异步化模型,根据实际的具体场景,其他优秀的异步框架同样在高性能领域起到了至关重要的作用,比如:
本地MQ场景利器:Disruptor
业务逻辑处理器完全是运行在内存中(in-memory),使用事件源驱动方式(event sourcing). 业务逻辑处理器的核心是Disruptors。
Disruptor在多生产者多消费者模式下,业务和线程是M:N的多对多乱序关系,但能确保每个sequence只被一个processor消费,在同一个WorkPool中,确保多个WorkProcessor不会消费同样的sequence,这是一个典型的MQ模式,在该模式下无法完成业务和线程之间的绑定关系。
Disruptor线程模型核心代码
WorkerPool.java
for (WorkProcessor<?> processor : workProcessors)
{
processor.getSequence().set(cursor);
executor.execute(processor);
}
BasicExecutor.java
public void execute(Runnable command)
{
final Thread thread = factory.newThread(command);
if (null == thread)
{
throw new RuntimeException("Failed to create thread to run: " + command);
}
thread.start();
threads.add(thread);
}
Disruptor天然支持简单任务编排
本地延迟队列利器:Netty HashedWheelTimer
HashedWheelTimer是一个可扩展的,可用于替代java.util.Timer 或者java.util.concurrent.ScheduledThreadPoolExecutor。他可以处理许多计划的任务,并且他很容易取消任务。
HashedWheelTimer直接使用的场景非常之多,包含Pingpong心跳任务、RPC超时等等,在高并发高吞吐量环节功不可没。
在它的内部,使用一个Hash Table,大多数timer的操作,Hash Table的主键存放调度任务到yield的常数时间(it uses a hash table whose key is a task’s timing to yield constant time for most timer operations)。(java.util.Timer 使用二进制堆。)
分布式异步模型利器:Akka
Akka定义的Actor
Actors are reactive and message driven. An Actor doesn’t do anything until it receives a message.
在线程模型中,本篇讲到的线程模型和Akka对应的派发器有着非常高的相似度,如:
Dispatcher
PinnedDispatcher
CallingThreadDispatcher
在异步化进化中,异步模型的设计越来越趋向于Akka的异步模型,包括角色、派发器、FSM等概念设计,Akka是一个完整的服务框架,在异步之外,还有非常多的优秀的设计,在未来的高性能服务设计中,Akka将会是重要的一员,比如RemoteActor的设计。
以下内容公开资料比较丰富,不重复分析。
- Netty EventLoop/Netty Promise
- Guava ListenerFuture
- TimeoutFutureAPI JDK9+
参考资料:
http://ifeve.com/axon/
https://www.cnblogs.com/daoqidelv/p/7043696.html
评论