大数据培训 Flink 源码解析 Async IO
以下文章来源于大数据技术与架构
Async I/O 的使用方式
在 Flink 中使用 Async I/O 的话,需要有一个支持异步请求的客户端,或者以多线程异步的方式来将同步操作转化为异步操作调用;
以官方文档给出的说明为例:
// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {
/** The database specific client that can issue concurrent requests with callbacks */
private transient DatabaseClient client;
@Override
public void open(Configuration parameters) throws Exception {
client = new DatabaseClient(host, post, credentials);
}
@Override
public void close() throws Exception {
client.close();
}
@Override
public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {
// issue the asynchronous request, receive a future for result
// 发起异步请求,返回结果是一个 Future
final Future<String> result = client.query(key);
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
// 请求完成时的回调,将结果交给 ResultFuture
CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
// Normally handled explicitly.
return null;
}
}
}).thenAccept( (String dbResult) -> {
resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
});
}
}
// create the original stream
DataStream<String> stream = ...;
// apply the async I/O transformation
// 应用 async I/O 转换,设置等待模式、超时时间、以及进行中的异步请求的最大数量
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
AsyncDataStream 提供了两种调用方法,分别是 orderedWait 和 unorderedWait,这分别对应了有序和无序两种输出模式。
之所以会提供两种输出模式,是因为异步请求的完成时间是不确定的,先发出的请求的完成时间可能会晚于后发出的请求。
在“有序”的输出模式下,所有计算结果的提交完全和消息的到达顺序一致;
而在“无序”的输出模式下,计算结果的提交则是和请求的完成顺序相关的,先处理完成的请求的计算结果会先提交。
值得注意的是,在使用“事件时间”的情况下,“无序”输出模式仍然可以保证 watermark 的正常处理,即在两个 watermark 之间的消息的异步请求结果可能是异步提交的,但在 watermark 之后的消息不能先于该 watermark 之前的消息提交_大数据培训。
由于异步请求的完成时间不确定,需要设置请求的超时时间,并配置同时进行中的异步请求的最大数量。
Async I/O 的实现
AsyncDataStream 在运行时被转换为 AsyncWaitOperator 算子,它是 AbstractUdfStreamOperator 的子类。其 AsyncWaitOperator 的基本实现原理如下:
基本原理
AsyncWaitOperator 算子相比于其它算子的最大不同在于,它的输入和输出并不是同步的。
因此,在 AsyncWaitOperator 内部采用了一种“生产者-消费者”模型,基于一个队列解耦异步计算和计算结果的提交。StreamElementQueue 提供了一种队列的抽象,一个“消费者”线程 Emitter 从中取出已完成的计算结果,并提交给下游算子,而异步请求则充当了队列“生产者”的角色;
如图所示,AsyncWaitOperator 主要由两部分组成:StreamElementQueue 和 Emitter。
StreamElementQueue 是一个 Promise 队列,所谓 Promise 是一种异步抽象表示将来会有一个值,这个队列是未完成的 Promise 队列,也就是进行中的请求队列。Emitter 是一个单独的线程,负责发送消息(收到的异步回复)给下游。
图中 E5 表示进入该算子的第五个元素(”Element-5”),在执行过程中首先会将其包装成一个“Promise” P5,然后将 P5 放入队列。最后调用 AsyncFunction 的 asyncInvoke 方法,该方法会向外部服务发起一个异步的请求,并注册回调。
该回调会在异步请求成功返回时调用 AsyncCollector.collect 方法将返回的结果交给框架处理。
实际上 AsyncCollector 是一个 Promise,也就是 P5,在调用 collect 的时候会标记 Promise 为完成状态,并通知 Emitter 线程有完成的消息可以发送了。Emitter 就会从队列中拉取完成的 Promise,并从 Promise 中取出消息发送给下游。
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Queue to store the currently in-flight stream elements into. */
private transient StreamElementQueue queue; // 存储带有异步返回值的请求队列
/** Pending stream element which could not yet added to the queue. */
private transient StreamElementQueueEntry<?> pendingStreamElementQueueEntry;
private transient ExecutorService executor;
/** Emitter for the completed stream element queue entries. */
private transient Emitter<OUT> emitter; // 异步返回后的消费线程
/** Thread running the emitter. */
private transient Thread emitterThread;
@Override
public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
super.setup(containingTask, config, output);
this.checkpointingLock = getContainingTask().getCheckpointLock();
this.inStreamElementSerializer = new StreamElementSerializer<>(getOperatorConfig().<IN>getTypeSerializerIn1(getUserCodeClassloader()));
// create the operators executor for the complete operations of the queue entries
this.executor = Executors.newSingleThreadExecutor();
// 根据不同的数据输出模式 有序、无序;选择构建不同的 StreamElementQueue queue
switch (outputMode) {
case ORDERED:
queue = new OrderedStreamElementQueue(
capacity,
executor,
this);
break;
case UNORDERED:
queue = new UnorderedStreamElementQueue(
capacity,
executor,
this);
break;
default:
throw new IllegalStateException("Unknown async mode: " + outputMode + '.');
}
}
@Override
public void open() throws Exception {
super.open();
// create the emitter
this.emitter = new Emitter<>(checkpointingLock, output, queue, this);
// start the emitter thread
// 构建 消费者线程 emitter Thread
this.emitterThread = new Thread(emitter, "AsyncIO-Emitter-Thread (" + getOperatorName() + ')');
emitterThread.setDaemon(true);
emitterThread.start();
// .........
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
final StreamRecordQueueEntry<OUT> streamRecordBufferEntry = new StreamRecordQueueEntry<>(element);
// 注册一个定时器,在超时时调用 timeout 方法
if (timeout > 0L) {
// register a timeout for this AsyncStreamRecordBufferEntry
long timeoutTimestamp = timeout + getProcessingTimeService().getCurrentProcessingTime();
final ScheduledFuture<?> timerFuture = getProcessingTimeService().registerTimer(
timeoutTimestamp,
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
userFunction.timeout(element.getValue(), streamRecordBufferEntry);
}
});
// Cancel the timer once we've completed the stream record buffer entry. This will remove
// the register trigger task
streamRecordBufferEntry.onComplete(
(StreamElementQueueEntry<Collection<OUT>> value) -> {
timerFuture.cancel(true);
},
executor);
}
// 加入队列
addAsyncBufferEntry(streamRecordBufferEntry);
// 发送异步请求
userFunction.asyncInvoke(element.getValue(), streamRecordBufferEntry);
}
//尝试将待完成的请求加入队列,如果队列已满(到达异步请求的上限),会阻塞
private <T> void addAsyncBufferEntry(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
assert(Thread.holdsLock(checkpointingLock));
pendingStreamElementQueueEntry = streamElementQueueEntry;
while (!queue.tryPut(streamElementQueueEntry)) { // 将该请求加入队列;如果队列已满(到达异步请求的上限),会阻塞
// we wait for the emitter to notify us if the queue has space left again
checkpointingLock.wait();
}
pendingStreamElementQueueEntry = null;
}
}
public class Emitter<OUT> implements Runnable {
@Override
public void run() {
try {
while (running) {
LOG.debug("Wait for next completed async stream element result.");
// 从队列阻塞地获取元素,之后再向下游传递
AsyncResult streamElementEntry = streamElementQueue.peekBlockingly();
output(streamElementEntry);
}
} catch (InterruptedException e) {
// .........
}
}
}
有序模式
在“有序”模式下,所有异步请求的结果必须按照消息的到达顺序提交到下游算子。在这种模式下,StreamElementQueue 的具体是实现是 OrderedStreamElementQueue。
OrderedStreamElementQueue 的底层是一个有界的队列,异步请求的计算结果按顺序加入到队列中,只有队列头部的异步请求完成后才可以从队列中获取计算结果。
有序模式比较简单,使用一个队列就能实现。所有新进入该算子的元素(包括 watermark),都会包装成 Promise 并按到达顺序放入该队列。
如下图所示,尽管 P4 的结果先返回,但并不会发送,只有 P1(队首)的结果返回了才会触发 Emitter 拉取队首元素进行发送。如下图所示:
public class OrderedStreamElementQueue implements StreamElementQueue {
/** Capacity of this queue. */
private final int capacity;
/** Queue for the inserted StreamElementQueueEntries. */
private final ArrayDeque<StreamElementQueueEntry<?>> queue;
@Override
public AsyncResult peekBlockingly() throws InterruptedException { // 从队列中阻塞地获取已异步完成的元素
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
// 只有队列头部的请求完成后才解除阻塞状态
LOG.debug("Peeked head element from ordered stream element queue with filling degree " + "({}/{}).", queue.size(), capacity);
return queue.peek();
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
while (queue.isEmpty() || !queue.peek().isDone()) {
headIsCompleted.await();
}
notFull.signalAll();
LOG.debug("Polled head element from ordered stream element queue. New filling degree " + "({}/{}).", queue.size() - 1, capacity);
return queue.poll();
} finally {
lock.unlock();
}
}
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly(); // 将该请求加入队列;如果队列已满(到达异步请求的上限),返回 false,其外部会阻塞
try {
if (queue.size() < capacity) { // 未达容量上限
addEntry(streamElementQueueEntry);
LOG.debug("Put element into ordered stream element queue. New filling degree " + "({}/{}).", queue.size(), capacity);
return true;
} else {
LOG.debug("Failed to put element into ordered stream element queue because it " + "was full ({}/{}).", queue.size(), capacity);
return false;
}
} finally {
lock.unlock();
}
}
}
无序模式
在“无序”模式下,异步计算结果的提交不是由消息到达的顺序确定的,而是取决于异步请求的完成顺序。
当然,在使用“事件时间”的情况下,要保证 watermark 语义的正确性。
在使用“处理时间”的情况下,由于不存在 Watermark,因此可以看作一种特殊的情况。
在 UnorderedStreamElementQueue 中巧妙地实现了这两种情况。
ProcessingTime 无序
ProcessingTime 无序也比较简单,因为没有 watermark,不需要协调 watermark 与消息的顺序性,所以使用两个队列就能实现,一个 uncompletedQueue、一个 completedQueue。所有新进入该算子的元素,同样的包装成 Promise 并放入 uncompletedQueue 队列,当 uncompletedQueue 队列中任意的 Promise 返回了数据,则将该 Promise 移到 completedQueue 队列中,并通知 Emitter 消费。如下图所示:
EventTime 无序
EventTime 无序类似于有序与 ProcessingTime 无序的结合体。因为有 watermark,需要协调 watermark 与消息之间的顺序性,所以 uncompletedQueue 中存放的元素从原先的 Promise 变成了 Promise 集合。
如果进入算子的是消息元素,则会包装成 Promise 放入队尾的集合中。
如果进入算子的是 watermark,也会包装成 Promise 并放到一个独立的集合中,再将该集合加入到 uncompletedQueue 队尾,最后再创建一个空集合加到 uncompletedQueue 队尾。
这样,watermark 就成了消息顺序的边界。
只有处在队首的集合中的 Promise 返回了数据,才能将该 Promise 移到 completedQueue 队列中,由 Emitter 消费发往下游。
只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个 watermark 之前所有的消息都已经被发送了,该 watermark 才能被发送。
过程如下图所示:
public class UnorderedStreamElementQueue implements StreamElementQueue {
/** Queue of uncompleted stream element queue entries segmented by watermarks. */
private final ArrayDeque<Set<StreamElementQueueEntry<?>>> uncompletedQueue;
/** Queue of completed stream element queue entries. */
private final ArrayDeque<StreamElementQueueEntry<?>> completedQueue;
/** First (chronologically oldest) uncompleted set of stream element queue entries. */
private Set<StreamElementQueueEntry<?>> firstSet;
// Last (chronologically youngest) uncompleted set of stream element queue entries. New
// stream element queue entries are inserted into this set.
private Set<StreamElementQueueEntry<?>> lastSet;
@Override
public <T> boolean tryPut(StreamElementQueueEntry<T> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
if (numberEntries < capacity) {
addEntry(streamElementQueueEntry);
LOG.debug("Put element into unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
return true;
} else {
LOG.debug("Failed to put element into unordered stream element queue because it " + "was full ({}/{}).", numberEntries, capacity);
return false;
}
} finally {
lock.unlock();
}
}
private <T> void addEntry(StreamElementQueueEntry<T> streamElementQueueEntry) {
assert(lock.isHeldByCurrentThread());
if (streamElementQueueEntry.isWatermark()) {
// 如果是 watermark,就要构造一个只包含这个 watermark 的 set 加入到 uncompletedQueue 队列中
lastSet = new HashSet<>(capacity);
if (firstSet.isEmpty()) {
firstSet.add(streamElementQueueEntry);
} else {
Set<StreamElementQueueEntry<?>> watermarkSet = new HashSet<>(1);
watermarkSet.add(streamElementQueueEntry);
uncompletedQueue.offer(watermarkSet);
}
uncompletedQueue.offer(lastSet);
} else {
lastSet.add(streamElementQueueEntry); // 正常记录,加入 lastSet 中
}
streamElementQueueEntry.onComplete( // 设置异步请求完成后的回调
(StreamElementQueueEntry<T> value) -> {
try {
onCompleteHandler(value);
} catch (InterruptedException e) {
// ......
}
}, executor);
numberEntries++;
}
// 异步请求完成的回调
public void onCompleteHandler(StreamElementQueueEntry<?> streamElementQueueEntry) throws InterruptedException {
lock.lockInterruptibly();
try {
// 如果完成的异步请求在 firstSet 中,那么就将 firstSet 中已完成的异步请求转移到 completedQueue 中
if (firstSet.remove(streamElementQueueEntry)) {
completedQueue.offer(streamElementQueueEntry);
while (firstSet.isEmpty() && firstSet != lastSet) {
// 如果 firset 中所有的异步请求都完成了,那么就从 uncompletedQueue 获取下一个集合作为 firstSet
firstSet = uncompletedQueue.poll();
Iterator<StreamElementQueueEntry<?>> it = firstSet.iterator();
while (it.hasNext()) {
StreamElementQueueEntry<?> bufferEntry = it.next();
if (bufferEntry.isDone()) {
completedQueue.offer(bufferEntry);
it.remove();
}
}
}
LOG.debug("Signal unordered stream element queue has completed entries.");
hasCompletedEntries.signalAll();
}
} finally {
lock.unlock();
}
}
@Override
public AsyncResult poll() throws InterruptedException {
lock.lockInterruptibly();
try {
// 等待 completedQueue 中的元素
while (completedQueue.isEmpty()) {
hasCompletedEntries.await();
}
numberEntries--;
notFull.signalAll();
LOG.debug("Polled element from unordered stream element queue. New filling degree " + "({}/{}).", numberEntries, capacity);
return completedQueue.poll();
} finally {
lock.unlock();
}
}
}
容错
在异步调用模式下,可能会同时有很多个请求正在处理中。因而在进行快照的时候,需要将异步调用尚未完成,www.atguigu.com 以及结果尚未提交给下游的消息加入到状态中。在恢复的时候,从状态中取出这些消息,再重新处理一遍。为了保证 exactly-once 特性,对于异步调用已经完成,且结果已经由 emitter 提交给下游的消息就无需保存在快照中。
public class AsyncWaitOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, AsyncFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, OperatorActions {
/** Recovered input stream elements. */
private transient ListState<StreamElement> recoveredStreamElements;
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
recoveredStreamElements = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
}
@Override
public void open() throws Exception {
super.open();
// create the emitter
// 创建 emitter 消费线程
// process stream elements from state, since the Emit thread will start as soon as all
// elements from previous state are in the StreamElementQueue, we have to make sure that the
// order to open all operators in the operator chain proceeds from the tail operator to the
// head operator.
// 状态恢复的时候,从状态中取出所有未完成的消息,重新处理一遍
if (recoveredStreamElements != null) {
for (StreamElement element : recoveredStreamElements.get()) {
if (element.isRecord()) {
processElement(element.<IN>asRecord());
}
else if (element.isWatermark()) {
processWatermark(element.asWatermark());
}
else if (element.isLatencyMarker()) {
processLatencyMarker(element.asLatencyMarker());
}
else {
throw new IllegalStateException("Unknown record type " + element.getClass() + " encountered while opening the operator.");
}
}
recoveredStreamElements = null;
}
}
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
// 先清除状态
ListState<StreamElement> partitionableState =
getOperatorStateBackend().getListState(new ListStateDescriptor<>(STATE_NAME, inStreamElementSerializer));
partitionableState.clear();
// 将所有未完成处理请求对应的消息加入状态中
Collection<StreamElementQueueEntry<?>> values = queue.values();
try {
for (StreamElementQueueEntry<?> value : values) {
partitionableState.add(value.getStreamElement());
}
// add the pending stream element queue entry if the stream element queue is currently full
if (pendingStreamElementQueueEntry != null) {
partitionableState.add(pendingStreamElementQueueEntry.getStreamElement());
}
} catch (Exception e) {
partitionableState.clear();
throw new Exception("Could not add stream element queue entries to operator state " + "backend of operator " + getOperatorName() + '.', e);
}
}
}
评论