写点什么

探索 Flutter 异步消息的实现

用户头像
Android架构
关注
发布于: 5 小时前

timer._enqueue();


return timer;


}


_Timer._internal(


this._callback, this._wakeupTime, this._milliSeconds, this._repeating)


: _id = _nextId();


// 这里 _milliSeconds == 0,会向 ZeroTimer 队列插入消息,然后调用 _notifyZeroHandler


void _enqueue() {


if (_milliSeconds == 0) {


if (_firstZeroTimer == null) {


_lastZeroTimer = this;


_firstZeroTimer = this;


} else {


_lastZeroTimer._indexOrNext = this;


_lastZeroTimer = this;


}


// Every zero timer gets its own event.


_notifyZeroHandler();


} else {


......


// 延迟消息这里先不分析


}


}


折腾了一大圈,最后只是构造了一个 _Timer 实例并把其加入到 ZeroTimer 队列中,如果是延迟消息则会加入到 TimeoutTimerHeap 中,最后调用 _notifyZeroHandler 方法, 其主要做如下操作:


  • 创建 RawReceivePort 并设置一个叫 _handleMessage 方法做为引擎层的回调方法

  • 向引擎层 Event queue 发送一个普通优先级的 ?_ZERO_EVENT ,引擎层处理该消息的时候会最终回调到上面设置的 _handleMessage 方法。


具体代码如下:


/src/third_party/dart/runtime/lib/timer_impl.dart


static void _notifyZeroHandler() {


if (_sendPort == null) {


_createTimerHandler();


}


// 底层会调到 PortMap 的 PostMessage 方法,进而唤醒消息处理,后面会分析这个流程


_sendPort.send(_ZERO_EVENT);


}


// 创建和引擎层通信的 RawReceivePort,并设置引擎层的回调方法 _handleMessage


static void _createTimerHandler() {


assert(_receivePort == null);


assert(_sendPort == null);


_receivePort = new RawReceivePort(_handleMessage);


_sendPort = _receivePort.sendPort;


_scheduledWakeupTime = null;


}


/src/third_party/dart/runtime/lib/isolate_patch.dart


@patch


class RawReceivePort {


@patch


factory RawReceivePort([Function handler]) {


_RawReceivePortImpl result = new _RawReceivePortImpl();


result.handler = handler;


return result;


}


}


// 最终将回调设置到 _RawReceivePortImpl 的 _handlerMap 中,引擎层会从这个 map 寻找消息的 handler


@pragma("vm:entry-point")


class _RawReceivePortImpl implements RawReceivePort {


void set handler(Function value) {


_handlerMap[this._get_id()] = value;


}


}


_handleMessage 回调方法会收集 Timer 并执行,具体代码实现如下:


/src/third_party/dart/runtime/lib/timer_impl.dart


static void _handleMessage(msg) {


var pendingTimers;


if (msg == _ZERO_EVENT) {


// 找到所有的待处理 Timers


pendingTimers = _queueFromZeroEvent();


assert(pendingTimers.length > 0);


} else {


......


// 延时消息这里不分析


}


// 处理 Timer,即调用设置的 callback


_runTimers(pendingTimers);


......


}

2. ?向 Event Queue 发送消息

前面说到 RawReceiverPort 会向引擎层 Event queue 发送一个 _ZERO_EVENT ?,其内部是通过调用 PortMap 的 PostMessage 方法将消息发送到 Event queue,该方法首先会根据接收方的 port id 找到对应的 message_handler,然后将消息根据优先级保存到相应的 queue 中,最后唤醒 message_notify_callback 回调函数 ,具体代码如下:


/src/third_party/dart/runtime/vm/port.cc


bool PortMap::PostMessage(Message* message, bool before_events) {


......


intptr_t index = FindPort(message->dest_port());


......


MessageHandler* handler = map_[index].handler;


......


handler->PostMessage(message, before_events);


return true;


}


/src/third_party/dart/runtime/vm/message_handler.cc


void MessageHandler::PostMessage(Message* message, bool before_events) {


Message::Priority saved_priority;


bool task_running = true;


......


// 根据消息优先级进入不同的队列


if (message->IsOOB()) {


oob_queue_->Enqueue(message, before_events);


} else {


queue_->Enqueue(message, before_events);


}


......


//唤醒并处理消息


MessageNotify(saved_priority);


}


/src/third_party/dart/runtime/vm/isolate.cc


void IsolateMessageHandler::MessageNotify(Message::Priority priority) {


if (priority >= Message::kOOBPriority) {


I->ScheduleInterrupts(Thread::kMessageInterrupt);


}


// 最后调用的 message_notify_callback 所指向的函数


Dart_MessageNotifyCallback callback = I->message_notify_callback();


if (callback) {


(*callback)(Api::CastIsolate(I));


}


}

3. ? Event Queue 消息处理

前面消息已经发送成功并调用了消息处理唤醒的操作,下面我们需要知道 message_notify_callback 所指向的函数的实现, root isolate 在初始化时会设置该变量,具体代码如下:


/src/flutter/runtime/dart_isolate.cc


bool DartIsolate::Initialize(Dart_Isolate dart_isolate, bool is_root_isolate) {


......


// 设置 message handler 的 task runner 为 UI Task Runner


SetMessageHandlingTaskRunner(GetTaskRunners().GetUITaskRunner(),


is_root_isolate);


......


return true;


}


void DartIsolate::SetMessageHandlingTaskRunner(


fml::RefPtrfml::TaskRunner runner,


bool is_root_isolate) {


......


message_handler().Initialize(


[runner](std::function<void()> task) { runner->PostTask(task); });


}


进一步跟进分析发现通过 Dart_SetMessageNotifyCallback 将 ?root isolate 的 message_notify_callback 设置为 MessageNotifyCallback 方法,具体代码如下:


/src/third_party/tonic/dart_message_handler.cc


void DartMessageHandler::Initialize(TaskDispatcher dispatcher) {


TONIC_CHECK(!task_dispatcher_ && dispatcher);


task_dispatcher_ = dispatcher;


Dart_SetMessageNotifyCallback(MessageNotifyCallback);


}


MessageNotifyCallback 会在 Event queue 收到消息后执行,但其执行过程中并没有拿到 Event queue 中的消息,而是往 UI Thread 的 MessageLoop Post 了一个 Task 闭包,这个 Task 闭包会通过调用 Dart_HandleMessage 来处理 Event queue 中的消息,具体代码流程如下:


/src/third_party/tonic/dart_message_handler.cc


void DartMessageHandler::MessageNotifyCallback(Dart_Isolate dest_isolate) {


auto dart_state = DartState::From(dest_isolate);


TONIC_CHECK(dart_state);


dart_state->message_handler().OnMessage(dart_state);


}


void DartMessageHandler::OnMessage(DartState* dart_state) {


auto task_dispatcher_ = dart_state->message_handler().task_dispatcher_;


// 往 ui 线程 MessageLoop Post 了一个 Task


auto weak_dart_state = dart_state->GetWeakPtr();


task_dispatcher_(weak_dart_state {


if (auto dart_state = weak_dart_state.lock()) {


dart_state->message_handler().OnHandleMessage(dart_state.get());


}


});


}


void DartMessageHandler::OnHandleMessage(DartState* dart_state) {


......


if (Dart_IsPausedOnStart()) {


......


} else if (Dart_IsPausedOnExit()) {


......


} else {


// 调用 Dart_HandleMessage 方法处理消息


result = Dart_HandleMessage();


......


}


......


}


Dart_HandleMessage 的实现很简单,只是调用 message_handler 的 HandleNextM


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


essage 方法,具体代码实现如下:


/src/third_party/dart/runtime/vm/dart_api_impl.cc


DART_EXPORT Dart_Handle Dart_HandleMessage() {


......


if (I->message_handler()->HandleNextMessage() != MessageHandler::kOK) {


return Api::NewHandle(T, T->StealStickyError());


}


return Api::Success();


}


我们进一步跟进 ?HandleNextMessage 方法的实现,最终来到如下代码:


/src/third_party/dart/runtime/vm/message_handler.cc


// 依次遍历 message_handler 的消息队列,对每个消息进程处理


MessageHandler::MessageStatus MessageHandler::HandleMessages(


MonitorLocker* ml,


bool allow_normal_messages,


bool allow_multiple_normal_messages) {


......


Message* message = DequeueMessage(min_priority);


while (message != NULL) {


......


MessageStatus status = HandleMessage(message);


......


message = DequeueMessage(min_priority);


}


return max_status;


}


// 取消息的时候会优先处理 OOB Message


Message* MessageHandler::DequeueMessage(Message::Priority min_priority) {


Message* message = oob_queue_->Dequeue();


if ((message == NULL) && (min_priority < Message::kOOBPriority)) {


message = queue_->Dequeue();


}


return message;


}


每个消息的处理都是在 HandleMessage 方法中,该方法会根据不同的消息优先级做相应的处理,具体代码如下:


/src/third_party/dart/runtime/vm/isolate.cc


MessageHandler::MessageStatus IsolateMessageHandler::HandleMessage(


Message* message) {


......


Object& msg_handler = Object::Handle(zone);


// 非 OOB 消息,需要获取 dart 层的 handler 函数


if (!message->IsOOB() && (message->dest_port() != Message::kIllegalPort)) {


msg_handler = DartLibraryCalls::LookupHandler(message->dest_port());


......


}


......


MessageStatus status = kOK;


if (message->IsOOB()) {


// 处理 OOB 消息,详细实现可自己看代码,这里不分析 OOB 消息


......


} else if (message->dest_port() == Message::kIllegalPort) {


......


} else {


......


// 调用前面找到的 msg_handler 来处理普通消息


const Object& result =


Object::Handle(zone, DartLibraryCalls::HandleMessage(msg_handler, msg));


......


}


delete message;


return status;


}


这里我们主要看普通消息的处理逻辑,首先会通过调用 DartLibraryCalls::LookupHandler 方法来从 dart 层寻找相应的 handler 函数,然后通过 DartLibraryCalls::HandleMessage 执行相应的处理函数,具体实现代码如下:


/src/third_party/dart/runtime/vm/dart_entry.cc


RawObject* DartLibraryCalls::LookupHandler(Dart_Port port_id) {


Thread* thread = Thread::Current();


Zone* zone = thread->zone();


Function& function = Function::Handle(


zone, thread->isolate()->object_store()->lookup_port_handler());


const int kTypeArgsLen = 0;


const int kNumArguments = 1;


// 如果没有消息处理方法,则进行查找,最终找到的是 RawReceivePortImpl 的 _lookupHandler 方法。


if (function.IsNull()) {


Library& isolate_lib = Library::Handle(zone, Library::IsolateLibrary());


ASSERT(!isolate_lib.IsNull());


const String& class_name = String::Handle(


zone, isolate_lib.PrivateName(Symbols::_RawReceivePortImpl()));


const String& function_name = String::Handle(


zone, isolate_lib.PrivateName(Symbols::_lookupHandler()));


function = Resolver::ResolveStatic(isolate_lib, class_name, function_name,


kTypeArgsLen, kNumArguments,


Object::empty_array());


ASSERT(!function.IsNull());


thread->isolate()->object_store()->set_lookup_port_handler(function);


}


// 执行消息处理函数


const Array& args = Array::Handle(zone, Array::New(kNumArguments));


args.SetAt(0, Integer::Handle(zone, Integer::New(port_id)));


const Object& result =


Object::Handle(zone, DartEntry::InvokeFunction(function, args));


return result.raw();


}


最终执行的是 RawReceivePortImpl 的 _lookupHandler 方法,在前面在创建 Future 的时候我们已经设置 _handleMessage 到 _handlerMap 中,_lookupHandler 方法会从 _handlerMap 中找到设置的回调方法,最后执行回调方法。具体代码如下:


/src/third_party/dart/runtime/lib/isolate_patch.dart


@pragma("vm:entry-point")


class _RawReceivePortImpl implements RawReceivePort {


......


// Called from the VM to retrieve the handler for a message.


@pragma("vm:entry-point", "call")


static _lookupHandler(int id) {


var result = _handlerMap[id];


return result;


}


......


}


Future 的创建到这就分析完了,整个过程涉及到了 EventQueue 的消息收发。


上面主要分析了非延迟消息的处理,如果是延迟的 Timer 会怎么处理呢?这里简单提一下,在 dart vm 内部还有个 EventHandler 线程,如果是延迟消息则会通过管道向这个线程写入延迟数据,这个线程会负责延迟计数,到时间了就往引擎层 Post 唤醒消息,具体代码可参见/src/third_party/dart/runtime/bin/eventhandler.cc,这里就不再赘述,感兴趣的可以自行分析。


至此,通过分析 Future 我们已经把 Event Queue 的消息处理流程了解了。


四、Microtask


=====================================================================

1. ?向 ?Microtask queue 发送消息

假设 Zone.?current 为 rootZone,?直接看 scheduleMicrotask 方法的实现:


/src/third_party/dart/sdk/lib/async/schedule_microtask.dart


void scheduleMicrotask(void callback()) {


_Zone currentZone = Zone.current;


if (identical(_rootZone, currentZone)) {


_rootScheduleMicrotask(null, null, _rootZone, callback);


return;


}


......


}


跟进 _rootScheduleMicrotask 方法的实现,最终来到 _scheduleAsyncCallback 方法,该方法做了两件事情:


  • 将传入的 callback 加入 callback 队列

  • 将 _startMicrotaskLoop 作为闭包参数调用 ?_AsyncRun._scheduleImmediate 方法,_startMicrotaskLoop 中会依次执行 callback 队列保存的回调。


具体代码如下:


/src/third_party/dart/sdk/lib/async/schedule_microtask.dart


void _scheduleAsyncCallback(_AsyncCallback callback) {


_AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);


if (_nextCallback == null) {


_nextCallback = _lastCallback = newEntry;


if (!_isInCallbackLoop) {


_AsyncRun._scheduleImmediate(_startMicrotaskLoop);


}


} else {


_lastCallback.next = newEntry;


_lastCallback = newEntry;


}


}


// 该方法被作为回调设置到引擎,会在处理所有的 Microtask 的时候执行


void _startMicrotaskLoop() {


_isInCallbackLoop = true;


try {


_microtaskLoop();


} finally {


_lastPriorityCallback = null;


_isInCallbackLoop = false;


if (_nextCallback != null) {


_AsyncRun._scheduleImmediate(_startMicrotaskLoop);


}


}


}


class _AsyncRun {


external static void _scheduleImmediate(void callback());


}


根据前面的经验,会在对应的 patch 文件中找到 _AsyncRun._scheduleImmediate 的实现,其内部调用了 _ScheduleImmediate._closure 指向的方法。


具体代码如下:


/src/third_party/dart/runtime/lib/schedule_microtask_patch.dart


@patch


class _AsyncRun {


@patch


static void _scheduleImmediate(void callback()) {


if (_ScheduleImmediate._closure == null) {


throw new UnsupportedError("Microtasks are not supported");


}


_ScheduleImmediate._closure(callback);


}


}


// 通过该方法设置 _ScheduleImmediate._closure


@pragma("vm:entry-point", "call")


void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {


_ScheduleImmediate._closure = closure;


}


那么 _ScheduleImmediate._closure 指向的是什么呢?我们需要找到 _setScheduleImmediateClosure 的调用方。root isolate 初始化时会执行一系列的 vm hook 调用,我们从中找到了 _setScheduleImmediateClosure 的调用,具体代码如下:


/src/flutter/lib/ui/dart_runtime_hooks.cc


static void InitDartAsync(Dart_Handle builtin_library, bool is_ui_isolate) {


Dart_Handle schedule_microtask;


if (is_ui_isolate) {


// 这里的 builtin_library 是 Flutter 扩展的 ui library


schedule_microtask =


GetFunction(builtin_library, "_getScheduleMicrotaskClosure");


} else {


......


}


Dart_Handle async_library = Dart_LookupLibrary(ToDart("dart:async"));


Dart_Handle set_schedule_microtask = ToDart("_setScheduleImmediateClosure");


Dart_Handle result = Dart_Invoke(async_library, set_schedule_microtask, 1,


&schedule_microtask);


PropagateIfError(result);


}


进一步跟进,最终找到了 _ScheduleImmediate._closure 指向的方法,是一个 native 实现的函数,具体代码如下:


/src/flutter/lib/ui/natives.dart


Function _getScheduleMicrotaskClosure() => _scheduleMicrotask;


void _scheduleMicrotask(void callback()) native 'ScheduleMicrotask';


跟进 _scheduleMicrotask 的 native 实现,发现其会把传入的 _startMicrotaskLoop 方法加入到底层的 Microtask queue,具体代码如下:


/src/flutter/lib/ui/dart_runtime_hooks.cc


void ScheduleMicrotask(Dart_NativeArguments args) {


Dart_Handle closure = Dart_GetNativeArgument(args, 0);


UIDartState::Current()->ScheduleMicrotask(closure);


}


/src/flutter/lib/ui/ui_dart_state.cc


void UIDartState::ScheduleMicrotask(Dart_Handle closure) { if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) { return; } microtask_queue_.ScheduleMicrotask(closure);}

2. ?Microtask queue 消息处理

前面已经将 _startMicrotaskLoop 方法加入到了 Microtask queue ,那么 Microtask queue 内的方法何时执行呢?我们通过跟进 Microtask queue ?的 RunMicrotasks 方法的调用方,最终找到 Microtask queue 内方法的执行时机 FlushMicrotasksNow,具体代码如下:


/src/flutter/lib/ui/ui_dart_state.cc


void UIDartState::ScheduleMicrotask(Dart_Handle closure) {


if (tonic::LogIfError(closure) || !Dart_IsClosure(closure)) {


return;


}


microtask_queue_.ScheduleMicrotask(closure);


}


再跟进 FlushMicrotasksNow 方法的调用方,发现有两处调用:


  • 这里是在每一帧开始的时候去执行 Microtask


/src/flutter/lib/ui/window/window.cc


void UIDartState::FlushMicrotasksNow() {


microtask_queue_.RunMicrotasks();


}


  • 另外一处调用是通过 TaskObserve 的形式,具体代码如下:


/src/flutter/lib/ui/ui_dart_state.cc


void UIDartState::AddOrRemoveTaskObserver(bool add) {


......


if (add) {


// 这个 add_callback_ 是啥呢?


add_callback_(reinterpret_cast<intptr_t>(this),


this { this->FlushMicrotasksNow(); });


} else {


remove_callback_(reinterpret_cast<intptr_t>(this));


}


}


跟进 add_callback_ 的赋值,这里是 android 的实现


/src/flutter/shell/platform/android/flutter_main.cc


void FlutterMain::Init(JNIEnv* env,


jclass clazz,


jobject context,


jobjectArray jargs,


jstring bundlePath,


jstring appStoragePath,


jstring engineCachesPath) {


......


settings.task_observer_add = [](intptr_t key, fml::closure callback) {


fml::MessageLoop::GetCurrent().AddTaskObserver(key, std::move(callback));


};


......


}


FlushMicrotasksNow() 是作为 MessageLoop 的 TaskObserver 来执行的, TaskObserver 会在处理完 task 之后把该 Task 创建的 MicroTask 全部执行,也就是说在下一个 Task 运行前执行。代码如下:


/src/flutter/fml/message_loop_impl.cc


void MessageLoopImpl::FlushTasks(FlushType type) {


......


for (const auto& invocation : invocations) {


invocation();


for (const auto& observer : task_observers_) {


observer.second();


}


}


}


五、结束


==============================================================


通过前面的分析,Flutter 的异步消息处理流程还是挺复杂的,主要是代码写的比较乱,跳转层次太多,可以通过对整个流程的掌控来寻找 UI 线程的优化及监控点,进而降低 UI 线程的处理时间,希望本篇文章让大家对 Flutter 的异步消息的整体处理流程有更深的理解。

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
探索 Flutter 异步消息的实现