写点什么

JAVA 语言异步非阻塞设计模式(原理篇)

发布于: 18 小时前
JAVA语言异步非阻塞设计模式(原理篇)

本系列文章共 2 篇,对 Java 语言的异步非阻塞模式进行科普。《原理篇》讲解异步非阻塞模型的原理,以及核心设计模式“Promise”的基本特性。《应用篇》会展示更加丰富的应用场景,介绍 Promise 的变体,如异常处理、调度策略等,并将 Promise 和现有工具进行对比。


限于个人水平和篇幅,本系列以科普为主,内容更偏重于原理、API 设计、应用实践,但是不会深入讲解并发优化的具体细节。

1.概述

异步非阻塞[A]是一种高性能的线程模型,在 IO 密集型系统中得到广泛应用。在该模型下,系统发起耗时请求后不需要等待响应,期间可以执行其他操作;当收到响应后,系统收到通知并执行后续处理。由于消除了不必要的等待,这种模型能够充分利用 cpu、线程等资源,提高资源利用率。


然而,异步非阻塞模式在提升性能的同时,也带来了编码实现上的复杂性。请求和响应可能分离到不同线程中,需要编写额外代码完成响应结果的传递。Promise 设计模式可以降低这种复杂性,封装数据传递、时序控制、线程安全等实现细节,从而提供简洁的 API 形式。


本文首先介绍异步非阻塞模式,从线程模型的角度分析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的应用场景及工作流程。最后,提供一种简易的 Java 实 现,能够实现基本的功能需求,并做到线程安全。


在正式探索技术问题之前,我们先来看看什么是异步非阻塞模型。如图 1-1 所示,展示了两个小人通信的场景:


  1. 两个小人代表互相通信的两个线程,如数据库的客户端和服务端;他们可以部署在不同的机器上。

  2. 小人之间互相投递苹果,代表要传递的消息。根据具体业务场景,这些消息可能会称为 request、response、packet、document、record 等。

  3. 小人之间需要建立信道,消息才得以传递。根据场景,信道称为 channel、connection 等。


假设左侧小人发起请求,而右侧小人处理请求并发送响应:左侧小人先投出一个苹果 request,被右侧小人接收到;右侧小人进行处理后,再投出苹果 response,被左侧小人接收到。我们考察左侧小人在等待响应期间的行为,根据他在等待 response 期间是否能处理其他工作,将其归纳为“同步阻塞”和“异步非阻塞”两种模式。



图 1-1 两个小人通信


首先我们看看同步阻塞式通信的流程,如图 1-2a 所示。


  1. 投递。左侧小人投递 request,并等待接收 response。

  2. 等待。在等待接收 response 期间,左侧小人休息。不论是否还有其他 request 需要投递、是否还有其他工作需要处理,他都视若无睹,绝对不会因此打断休息。

  3. 响应。在收到 response 后,小人从休息中唤醒并处理 response。



图 1-2a 同步阻塞式通信


接下来我们看看异步非阻塞式通信的流程,如图 1-2b 所示。


  1. 缓存。左侧小人投递 request,并等待接收 response。和同步阻塞模式不同,小人并不需要亲手接住苹果 response,而是在地上放置一个盘子称为“buffer”;如果小人暂时不在场,那么所收到的苹果可以先存在盘子里,稍后再处理。

  2. 暂离。由于有盘子 buffer 的存在,小人投递 request 后就可以暂时离开,去处理其他工作,当然也可以去投递下一个 request;如果需要向不同的 channel 投递 request,那么小人可以多摆放几个盘子,和 channel 一一对应。

  3. 响应。小人离开后,一旦某个盘子收到了 response,一只“大喇叭”就会响起,发出“channelRead”通知,呼唤小人回来处理 response。如果要处理多个 response 或多个 channel,那么 channelRead 通知还需要携带参数,以说明从哪个 channel 上收到了哪个 response。


这里的大喇叭可以用 NIO 或 AIO 来实现。简单来说,NIO 是指不停地轮询每个盘子,一旦看到苹果就发出通知;AIO 是指在收到苹果时直接触发通知,而没有轮询的过程。当然,本系列文章的读者并不需要了解更多实现细节,只需知道异步非阻塞模式依赖于“大喇叭”来实现,它替代小人等待接收 response,从而解放小人去处理其他工作。



图 1-2b 异步非阻塞式通信


根据上面的分析,同步模式具有下列严重缺点


  1. 同步阻塞模式的工作效率十分低下。小人大部分时间都在休息,仅当投递请求、处理响应时,才偶尔醒来工作一小会;而在异步非阻塞模式下,小人从不休息,马不停蹄地投递请求、处理响应,或处理其他工作。

  2. 同步阻塞模式会带来延迟


我们考虑下面两种情况,如图 1-3 所示。


  • channel 复用,即左侧小人在一个 channel 上连续发送多条消息。在同步阻塞模式下,一轮(即请求+响应)只能投递一个请求(苹果 1),而后续请求(苹果 2-4)都只能排队等待,右侧小人需要等待很多轮才能收到所期望的全部消息。此外,左侧小人在等待接收某个 response 期间,没有机会处理收到的其他消息,造成了数据处理的延迟。不得不感慨,左侧小人太懒惰了!

  • 线程复用,即一个线程(小人)向多个 channel 发送消息(苹果 1-3,分别发向不同 channel)。左侧小人同一时刻只能做一件事,要么在工作,要么在休息;他投递了苹果 1 后就躺下休息,等待响应,全然不顾右侧小人 2、3 还在等待他们想要的苹果 2、3。



图 1-3a channel 复用



图 1-3b 线程复用


在这一章里我们用漫画的形式,初步体验了同步阻塞模式与异步非阻塞模式,并分析了两种模式的区别。接下来我们从 Java 线程入手,对两种模式进行更加正式、更加贴近实际的分析。

2.异步非阻塞模型

2.1 Java 线程状态

在 Java 程序中,线程是调度执行的单元。线程可以获得 CPU 使用权来执行代码,从而完成有意义的工作。工作进行期间,有时会因为等待获取锁、等待网络 IO 等原因而暂停,通称“同步”或“阻塞”;如果多项工作能够同时进行,之间不存在约束、不需要互相等待,这种情况就称为“异步”或“非阻塞”。受限于内存、系统线程数、上下文切换开销,Java 程序并不能无限创建线程;因此,我们只能创建有限个线程,并尽量提高线程的利用率,即增加其工作时长、降低阻塞时长。异步非阻塞模型是减少阻塞、提高线程利用率的有效手段。当然,这种模型并不能消除所有的阻塞。我们首先来看看 Java 线程有哪些状态,其中哪些阻塞是必要的,哪些阻塞可以避免。


Java 线程状态包括:


  • RUNNABLE:线程在执行有意义的工作如图 2-1a,线程如果在执行纯内存运算,那么处于 RUNNABLE 状态根据是否获得 cpu 使用权,又分为两个子状态:READY、RUNNING

  • BLOCKED/WAITING/TIMED_WAITING:线程正在阻塞如图 2-1b、2-1c、2-1d,根据阻塞原因,线程处于下列状态之一 BLOCKED:synchronized 等待获取锁 WAITING/TIMED_WAITING:Lock 等待获取锁。两种状态的区别为是否设置超时时长



图 2-1 Java 线程状态


此外,如果 Java 线程正在进行网络 IO,则线程状态为 RUNNABLE,但是实际上也发生了阻塞。以 socket 编程为例,如图 2-2 所示,在收到数据之前 InputStream.read() 会阻塞,此时线程状态为 RUNNABLE。



图 2-2 网络 IO


综上,Java 线程状态包括:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中,RUNNABLE 状态又分为内存计算(非阻塞)、网络 IO(阻塞)两种情况,而其余状态都是阻塞的。根据阻塞原因,本文将 Java 线程状态归纳为以下 3 类:RUNNABLE、IO、BLOCKED


  1. RUNNABLE:Java 线程状态为 RUNNABLE,并且在执行有用的内存计算,无阻塞

  2. IO:Java 线程状态为 RUNNABLE,但是正在进行网络 IO,发生阻塞

  3. BLOCKED:Java 线程状态为 BLOCKED/WAITING/TIMED_WAITING,在并发工具的控制下,线程等待获取某一种锁,发生阻塞


要提高线程利用率,就要增加线程处于 RUNNABLE 状态的时长,降低处于 IO 和 BLOCKED 状态的时长。BLOCKED 状态一般是不可避免的,因为线程间需要通信,需要对临界区进行并发控制;但是,如果采用适当的线程模型,那么 IO 状态的时长就可以得到降低,而这就是异步非阻塞模型。

2.2 线程模型:阻塞 vs 非阻塞

异步非阻塞模型能够降低 IO 阻塞时长,提高线程利用率。下面以数据库访问为例,分析同步和异步 API 的线程模型。如图 3 所示,过程中涉及 3 个函数:


  1. writeSync()或 writeAsync():数据库访问,发送请求

  2. process(result):处理服务器响应(以 result 表示)

  3. doOtherThings():任意其他操作,逻辑上不依赖服务器响应


同步 API 如图 3-a 所示:调用者首先发送请求,然后在网络连接上等待来自服务器的响应数据。API 会一直阻塞,直至收到响应才返回;期间调用者线程无法执行其他操作,即使该操作并不依赖服务器响应。实际的执行顺序为:


  1. writeSync()

  2. process(result)

  3. doOtherThings() // 直至收到结果,当前线程才能执行其他操作


异步 API 如图 2-3b 所示:调用者发送请求并注册回调,然后 API 立刻返回,接下来调用者可以执行任意操作。稍后底层网络连接收到响应数据,触发调用者所注册的回调。实际的执行顺序为:


  1. writeAsync()

  2. doOtherThings() // 已经可以执行其他操作,并不需要等待响应

  3. process(result)



图 2-3 同步 API & 异步 API


在上述过程中,函数 doOtherThings() 并不依赖服务器响应,原则上可以和数据库访问同时执行。然而对于同步 API,调用者被迫等待服务器响应,然后才可以执行 doOtherThings();即数据库访问期间线程阻塞于 IO 状态,无法执行其他有用的操作,利用率十分低下。而异步 API 就没有这个限制,显得更加紧凑、高效。


在 IO 密集型系统中,适当使用异步非阻塞模型,可以提升数据库访问吞吐量。考虑这样一个场景:需要执行多条数据库访问请求,且请求之间互相独立,无依赖关系。使用同步 API 和异步 API,线程状态随时间变化的过程如图 2-4 所示。线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下,线程执行内存计算,如提交请求、处理响应。在 IO 状态下,线程在网络连接上等待响应数据。在实际系统中,内存计算的速度非常快,RUNNABLE 状态的时长基本可忽略;而网络传输的耗时会相对更长(几十到几百毫秒),IO 状态的时长更加可观。


a.同步 API:调用者线程一次只能提交一个请求;直到请求返回后,才能再提交下一个请求。线程利用率很低,大部分时间消耗在 IO 状态上。


b.异步 API:调用者线程可以连续提交多个请求,而之前提交的请求都还没有收到响应。调用者线程会注册一些回调,这些回调存储在内存中;稍后网络连接上收到响应数据,某个接收线程被通知处理响应数据,从内存中取出所注册的回调,并触发回调。这种模型下,请求可以连续地提交、连续的响应,从而节约 IO 状态的耗时。



图 2-4 线程时间线:数据库访问


异步非阻塞模式在 IO 密集型系统中应用非常广泛。常用的中间件,如 http 请求[D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H],都支持异步 API。各位读者可以在参考文献中,查阅这些异步 API 的样例代码。关于中间件的异步 API,下面有几个注意事项:


  1. redis 的常见客户端有 jedis 和 lettuce [E]。其中 lettuce 提供了异步 API,而 jedis 只能提供同步 API;二者对比参见文章[I]。

  2. kafka producer[J]的 send()方法也支持异步 API,但是该 API 实际上不是纯异步的[K]:当底层缓存满,或者无法获取服务器(broker)信息时,send()方法会发生阻塞。个人认为这是一个非常严重的设计缺陷。kafka 常用于低延迟日志采集场景,系统会将日志通过网络写入到 kafka 服务器,以减少线程内的阻塞,提升线程吞吐量;稍后其他进程会从 kafka 消费所写入的日志,进行持久存储。设想一个实时通信系统,单条线程每秒需要处理几万到几十万条消息,响应时间一般为几毫秒到几十毫秒。系统在处理期间需要经常调用 send() 来上报日志,如果每次调用都发生哪怕 1 秒的延迟(实际有可能达几十秒),延迟积累起来也会严重劣化吞吐量和延迟。


最后,异步 API 有多种实现,包括线程池、select(如 netty 4.x[L])、epoll 等。其共同点是调用者不需要在某一条网络连接上阻塞,以等待接收数据;相反,API 底层常驻有限数目的线程,当收到数据后,某一线程得到通知并触发回调。这种模型也称为“响应式”模型,非常贴切。限于篇幅原因,本文主要关注异步 API 设计,而不深入讲解异步 API 的实现原理。

3.Promise 设计模式

3.1 API 形式:同步、异步 listener、异步 Promise

上一章介绍了异步非阻塞模式和异步 API 的函数形式。异步 API 具有以下特征:


  1. 在提交请求时注册回调;

  2. 提交请求后,函数立刻返回,不需要等待收到响应;

  3. 收到响应后,触发所注册的回调;根据底层实现,可以利用有限数目的线程来接收响应数据,并在这些线程中执行回调。


在保留异步特性的基础上,异步 API 的形式可以进一步优化。上一章图 2-3b 展示了异步 API 的 listener 版本,特点是在提交请求时必须注册恰好一个回调;因而在下列场景下,listener API 会难以满足功能需求,需要调用者做进一步处理:


  1. 多个对象都关注响应数据,即需要注册多个回调;但是 listener 只支持注册一个回调。

  2. 需要将异步调用转为同步调用。例如某些框架(如 spring)需要同步返回,或者我们希望主线程阻塞直至操作完成,然后主线程结束、进程退出;但是 listener 只支持纯异步,调用者需要重复编写异步转同步的代码。


为了应对上述场景,我们可以使用 Promise 设计模式来重构异步 API,以支持多个回调和同步调用。下面对同步 API、异步 listener API、异步 Promise API 的函数形式进行对比,如图 3-1 所示:


  • a.同步:调用 writeSync() 方法并阻塞;收到响应后函数停止阻塞,并返回响应数据

  • b.异步 listener:调用 writeAsync() 方法并注册 listener,函数立刻返回;收到响应后,在其他线程触发所注册的 listener;

  • c.异步 Promise:调用 writeAsync(),但不需要在函数中注册 listener,函数立刻返回 Promise 对象。调用者可以调用异步的 Promise.await(listener),注册任意数目的 listener,收到响应后会按顺序触发;此外,也可以调用同步的 Promise.await(),阻塞直至收到响应。



图 3-1 API 形式:同步、异步 listener、异步 Promise


综上,Promise API 在保持异步特性的前提下,提供了更高的灵活性。调用者可以自由选择函数是否阻塞,以及注册任意数目的回调。


3.2 Promise 的特性与实现

上一节介绍了 Promise API 的使用样例,其核心是一个 Promise 对象,支持注册 listener,以及同步获取响应 result;而本节将对 Promise 的功能进行更加详细的定义。注意,本节并不限定 Promise 的某一具体实现(例:jdk CompletableFuture、netty DefaultPromise),只展示共有的、必须具备的特性;缺少这些特性,Promise 将无法完成异步传递响应数据的工作。

3.2.1 功能特性

  • Promise 的基本方法


Promise 的基本功能是传递响应数据,需要支持下列方法,如表 3-1 所示:



下面以上一小节的数据库访问 API 为例,演示 Promise 的工作流程,如图 3-2 所示:


  • a.调用者调用 writeAsync() API,提交数据库访问请求并获取 Promise 对象;然后调用 Promise.await(listener),注册对响应数据的 listener。Promise 对象也可以传递给程序中其他地方,使得关心响应数据的其他代码,各自注册更多 listener。

  • b.writeAsync()内部,创建 Promise 对象并和这次请求关联起来,假设以 requestId 标识。

  • c.writeAsync()底层常驻有限数目的线程,用于发送请求和接收响应。以 netty 为例,当从网络上收到响应据后,其中一个线程得到通知,执行 channelRead() 函数进行处理;函数取出响应数据和对应的 Promise 对象,并调用 Promise.signalAll() 进行通知。注意这里是伪代码,和 netty 中回调函数的实际签名略有区别。



图 3-2a 提交数据库访问请求



图 3-2b 创建 Promise 对象



图 3-2c 通知 Promise 对象


- Promise 的时序


Promise 的方法需要保证以下时序。此处以“A 对 B 可见”来描述时序,即:如果先执行操作 A(注册 listener)就会产生某种永久效应(永久记录这个 listener),之后再执行操作 B(通知 result)就必须考虑到这种效应,执行相应的处理(触发之前记录的 listener)。


  1. await(listener)对 signalAll(result)可见:注册若干 listener 后,通知 result 时必须触发每一个 listener,不允许遗漏。

  2. signalAll(result)对 await(listener)可见:通知 result 后,再注册 listener 就会立刻触发。

  3. 首次 signalAll(result)对后续 signalAll(result)可见。首次通知 result 后,result 即唯一确定,永不改变。之后再通知 result 就会忽略,不产生任何副作用。请求超时是该特性一种典型应用:在提交请求的同时创建一个定时任务;如果能在超时时长内正确收到响应数据,则通知 Promise 正常结束;否则定时任务超时,通知 Promise 异常结束。不论上述事件哪个先发生,都保证只采纳首次通知,使得请求结果唯一确定。


此外,某次 await(listener) 最好对后续 await(listener) 可见,以保证 listener 严格按照注册顺序来触发。


- Promise 的非线程安全实现


如不考虑线程安全,那么下列代码清单可以实现 Promise 的基本特性;线程安全的实现见下一小节。代码清单依次展示了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个注意事项


  1. 字段 listeners 存储 await(listener) 所注册的 listener。字段类型为 LinkedList,以存储任意数目的 listener,同时维护 listener 的触发顺序。

  2. 字段 isSignaled 记录是否通知过 result。如果 isSignaled=true,则后续调用 await(listener) 时立刻触发 listener,且后续调用 signalAll(result) 时直接忽略。此外,我们以 isSignaled=true 而不是 result=null 来判断是否通知过 result,因为某些情况下 null 本身也可以作为响应数据。例如,我们以 Promise<Exception>表示数据库写入的结果,通知 null 表示写入成功,通知 Exception 对象(或某一子类)表示失败原因。

  3. signalAll(T result)在末尾处调用 listeners.clear() 以释放内存,因为 listeners 已经触发过,不再需要在内存中存储。


public class Promise<T> {
private boolean isSignaled = false; private T result;
private final List<Consumer<T>> listeners = new LinkedList<>();
public void await(Consumer<T> listener) { if (isSignaled) { listener.accept(result); return; }
listeners.add(listener); }
public void signalAll(T result) { if (isSignaled) { return; }
this.result = result; isSignaled = true; for (Consumer<T> listener : listeners) { listener.accept(result); } listeners.clear(); }
public T await() { // 适当阻塞,直至signalAll()被调用;实际实现见3.3节 return result; }}
复制代码

3.2.2 线程安全特性

上一章 3.2.1 节讲解了 Promise 的功能,并提供了非线程安全的实现。本节展示如何使用并发工具,实现线程安全的 Promise,如下所示。有下列几个注意事项:


  1. 线程安全。各个字段均被多个线程访问,因此都属于临界区,需要使用适当的线程安全工具进行上锁,如 synchronized、Lock。一种最简单的实现,是将全部代码纳入临界区内,进入方法时上锁,离开方法时放锁。注意在使用 return 进行提前返回时,不要忘记放锁。

  2. 在临界区外触发 listener,以减少在临界区内停留的时长,并减少潜在的死锁风险。

  3. 同步 await()。可以使用任何一种同步等待的工具来实现,如 CountDownLatch、Condition。此处使用 Condition 实现,注意根据 java 语法,操作 Condition 时必须先获取 Condition 所关联的锁。


public class Promise<T> {
private final ReentrantLock lock = new ReentrantLock(); private final Condition resultCondition = lock.newCondition();
private boolean isSignaled = false; private T result;
private final List<Consumer<T>> listeners = new LinkedList<>();
public void await(Consumer<T> listener) { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 listener.accept(result); // 在临界区外触发listener return; }
listeners.add(listener); lock.unlock(); }
public void signalAll(T result) { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 return; }
this.result = result; isSignaled = true;
// this.listeners的副本 List<Consumer<T>> listeners = new ArrayList<>(this.listeners); this.listeners.clear(); lock.unlock();
for (Consumer<T> listener : listeners) { listener.accept(result); // 在临界区外触发listener }
/* 操作Condition时须上锁*/ lock.lock(); resultCondition.signalAll(); lock.unlock(); }
public T await() { lock.lock(); if (isSignaled) { lock.unlock(); // 不要忘记放锁 return result; }
while (!isSignaled) { resultCondition.awaitUninterruptibly(); } lock.unlock();
return result; }}
复制代码


上述实现仅做演示使用,仍有较大的改进空间。生产环境的实现原理,读者可以参考 jdk CompletableFutre、netty DefaultPromise。可以改进的地方包括:


  1. 使用 CAS 设置响应数据。字段 isSignaled、result 可以合并为一个数据对象,然后使用 CAS 进行设值,从而进一步降低阻塞时长。

  2. 触发 listener 的时序。在上述代码中,Promise.signalAll() 会依次触发 listener;在此期间,如果其他线程调用了异步 await(listener),由于 Promise 的响应数据已通知,该线程也会触发 listener。上述过程中,两个线程同时触发 listener,因此没有严格保证触发顺序。作为改进,类似于 netty DefaultPromise,Promise.signalAll() 内部可以设置一个循环,不断触发 listener 直至 listeners 排空,以防期间注册了新的 listener;在此期间,新注册的 listene r 可以直接加入到 listeners 中,而不是立刻触发。

  3. listener 的移除。在通知响应数据之前,Promise 长期持有 listener 的引用,导致 listener 对象无法被 gc。可以添加 remove(listener) 方法,或者允许仅持有 listener 的弱引用。

3.2.3 须避免的特性

前面的小节展示了 Promise 的特性与实现原理。纯正的 Promise 是异步传递响应数据的工具,其应当只实现必要的数据传递特性,而不应当夹杂请求提交、数据处理等逻辑。接下来我们来看一看,Promise 在实现时应避免哪些特性,以防限制调用者所能做出的决策。


1.异步 await() 发生阻塞;该规则不仅适用于 Promise,也适用于任何异步 API ·。异步 API 常用于实时通信等延时敏感的场景,作用是减少线程阻塞,避免推迟后续其他操作。一旦发生阻塞,系统的响应速度和吞吐量就会受到严重冲击。


以连续提交数据库请求为例。如图 3-3a 所示,调用者调用了一个异步 API,连续提交 3 次写入请求,并在所返回的 Promise 上注册回调。


我们考察 writeAsync()与 await() 如发生阻塞阻塞,将会对调用者造成什么影响,如图 3-3b 所示。提交请求是纯内存操作,线程处于 RUNNABLE 状态;writeAsync() 或 await() 如果发生阻塞,则线程处于 BLOCKED 状态,暂停工作而无法执行后续操作。当发生阻塞时,调用者每提交一个请求就不得不等待一段时间,从而降低了提交请求的频率,进而推迟了服务器对这些请求的响应,使得系统的吞吐量降低、延迟上升。特别地,如果系统采用了多路复用机制,即一个线程可以处理多个网络连接或多个请求,那么线程阻塞将会严重拖慢后续请求的处理,造成比较难排查的故障。


常见的阻塞原因包括:


  • Thread.sleep()

  • 向队列提交任务,调用了 BlockingQueue.put()和 take();应改为非阻塞的 offer()和 poll()

  • 向线程池提交任务,ExecutorService.submit(),如果线程池拒绝策略为 CallerRunsPolicy,而任务本身又是耗时的。

  • 调用了阻塞的函数,包括:InputStream.read()、同步的 Promise.await()、KafkaProducer.send()。注意 KafkaProducer.send() 虽然形式上是异步 API,但是在底层缓存满或者无法获取服务器(broker)信息时,send()方法仍会发生阻塞。



图 3-3a 连续提交请求



图 3-3b 请求处理时间线


2.绑定线程池(ExecutorService),用于执行请求。如图 3-4 所示,线程池是异步 API 的一种可选模型,但并不是唯一实现。


  • 线程池模型。为了不阻塞调用者,API 内置了线程池来提交请求、处理响应;调用者可以向线程池连续提交多个请求,但是不需要等待响应。调用者提交一条请求后,线程池中的某条线程就会被独占,等待接收响应并进行处理,但在此之前无法再处理其他请求;完成处理后,该条线程重新变为空闲,可以继续处理后续请求。

  • 响应式模型。类似地,API 内置了发送和接收线程来提交请求、处理响应,调用者也不需要同步等待。调用者提交一条请求后,发送线程向网络发送请求;完成发送后,线程立刻变为空闲,可以发送后续请求。当收到响应数据时,接收线程得到通知以处理响应;完成处理后,线程立刻变为空闲,可以处理后续响应数据。上述过程中,任何一条线程都不会被某一请求独占,即线程随时都可以处理请求,而不需要等待之前的请求被响应。


综上,如果绑定了线程池,Promise 就实现了对其他模型(如响应式模型)的兼容性。



图 3-4 线程时间线:线程池 vs select


3.在构造方法创建 Promise 对象时,定义如何提交请求。这种方式只能定义如何处理单条请求,而无法实现请求的批量处理。


以数据库访问为例,现代数据库一般支持批量读写,以略微提升单次访问的延迟为代价,换来吞吐量显著提升;如果吞吐量得到提升,那么平均延迟反而会下降。下面的代码片段展示了一个批量请求 API:数据对象 BulkRequest 可以携带多条普通请求 Request,从而实现批量提交。


/* 提交单条请求*/client.submit(new Request(1));client.submit(new Request(2));client.submit(new Request(3));
/* 提交批量请求*/client.submit(new BulkRequest( new Request(1), new Request(2), new Request(3)));
复制代码


为了充分利用“批量请求”的特性,调用者需要进行跨越多条请求的“宏观调控”。请求产生后可以先缓存起来;等待一段时间后,取出所缓存的多条请求,组装一个批量请求来一并提交。因此,如下面的代码片段所示,在构造 Promise 时指定如何提交单条请求是没有意义的,这部分代码(client.submit(new Request(...)))并不会被执行;而实际希望执行的代码,其实是提交批量请求(client.submit(new BulkRequest(...)))。


/* Promise:提交单条请求*/new Promise<>(() -> client.submit(new Request(1)));new Promise<>(() -> client.submit(new Request(2)));new Promise<>(() -> client.submit(new Request(3)));
复制代码


  1. 在构造方法创建 Promise 对象时,定义如何处理响应数据,而不允许后续对响应数据注册回调。如下面的代码片段所示,在构造 Promise 对象时,注册了对响应数据的处理 process(result);但是除此以外,其他代码也有可能关心响应数据,需要注册回调 process1(result)、process2(result)。如果 Promise 只能在构造时注册唯一回调,那么其他关注者就无法注册所需回调函数,即 Promise API 退化回 listener API。


/* 定义如何处理响应数据*/Promise<String> promise = new Promise<>(result -> process(result));
/* 其他代码也关心响应数据*/promise.await(result -> process1(result));promise.await(result -> process2(result));
复制代码


综上,Promise 应该是一个纯粹的数据对象,其职责是存储回调函数、存储响应数据;同时做好时序控制,保证触发回调函数无遗漏、保证触发顺序。除此以外,Promise 不应该和任何实现策略相耦合,不应该杂糅提交请求、处理响应的逻辑。

4.总结

本文讲解了异步非阻塞设计模式,并对同步 API、异步 listener API、异步 Promise API 进行了对比。相比于其他两种 API,Promise API 具有无可比拟的灵活性,调用者可以自由决定同步返回还是异步返回,并允许对响应数据注册多个回调函数。最后,本文讲解了 Promise 基本功能的实现,并初步实现了线程安全特性。


本系列共 2 篇文章,本文为第 1 篇《原理篇》。在下一篇《应用篇》中,我们将看到 Promise 设计模式丰富的应用场景,将其和现有工具进行结合或对比,以及对 Promise API 进行进一步变形和封装,提供异常处理、调度策略等特性。

参考文献

[A] 异步非阻塞 IO

https://en.wikipedia.org/wiki/Asynchronous_I/O


[B] Promise

https://en.wikipedia.org/wiki/Futures_and_promises


[C] java 线程状态

https://segmentfault.com/a/1190000038392244


[D] http 异步 API 样例:apache HttpAsyncClient

https://hc.apache.org/httpcomponents-asyncclient-4.1.x/quickstart.html


[E] redis 异步 API 样例:lettuce

https://github.com/lettuce-io/lettuce-core/wiki/Asynchronous-API


[F] mongo DB 异步 API 样例:AsyncMongoClient

https://mongodb.github.io/mongo-java-driver/3.0/driver-async/getting-started/quick-tour/


[G] elasticsearch 异步 API 样例:RestHighLevelClient

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high-document-index.html


[H] influx DB 异步 API 样例:influxdb-java

https://github.com/influxdata/influxdb-java/blob/master/MANUAL.md


[I] jedis vs lettuce

https://redislabs.com/blog/jedis-vs-lettuce-an-exploration/


[J] kafka

http://cloudurable.com/blog/kafka-tutorial-kafka-producer/index.html


[K] KafkaProducer.send()阻塞

https://stackoverflow.com/questions/57140680/kafka-asynchronous-send-not-really-asynchronous


[L] netty

https://netty.io/wiki/user-guide-for-4.x.html

发布于: 18 小时前阅读数: 13
用户头像

高效学习,从有道开始 2021.03.10 加入

分享有道人的技术思考与实践。

评论

发布
暂无评论
JAVA语言异步非阻塞设计模式(原理篇)