史上最详细!那些你不知道的 WorkManager 流程分析和源码解析
.then(workC1, workC2).enqueue()
3.4 观察 Worker 的进度或状态
WorkManager.getInstance(myContext).getWorkInfoByIdLiveData(uploadWorkRequest.id).observe(lifecycleOwner, Observer { workInfo ->})
04?WorkManager 流程分析与源码解析
这个章节将会从以下几个方面梳理 WorkManager 的流程与源码:
创建 a.WorkManager 的初始化 b.WorkRequest 的创建
非约束条件任务的执行
带约束条件任务的执行
从最基础的流程开始分析:创建一个不带任何约束条件的一次性任务。在 doWork()中让线程休息 5s。
val work1Request = OneTimeWorkRequestBuilder<Worker1>().build()WorkManager.getInstance(this).enqueue(work1Request)
class Worker1(appContext: Context, workerParams: WorkerParameters) :Worker(appContext, workerParams) {
override fun doWork(): Result {Thread.sleep(5000)return Result.success()}}
4.1 创建
首先梳理一下 WorkManager 的初始化过程。
4.1.1. WorkManager 的初始化
在默认的情况下,WorkManager 并不是在我们调用 WorkManager.getInstance() 时创建的。通过反编译一下 apk,会发现在 AndroidManifest 文件中注册了名为 WorkManagerInitializer 的 ContentProvider。因此 WorkManager 在 app 冷启动的时候已经被创建。
//AndroidManifest.xml<providerandroid:name="androidx.work.impl.WorkManagerInitializer"android:exported="false"android:multiprocess="true"android:authorities="com.jandroid.multivideo.workmanager-init"android:directBootAware="false" />
WorkManagerInitializer 的 onCreate()方法:
//WorkManagerInitializerpublic boolean onCreate() {// Initialize WorkManager with the default configuration.WorkManager.initialize(getContext(), new Configuration.Builder().build());return true;}
由于 WorkManager 是个单例,在此时 WorkManager 就已经被初始化了。在 initialize()之前,会创建一个默认的 Configuration。Configuration 设置了许多属性,用来管理和调度工作的方式。通常我们使用 WorkManager 默认创建的 Configuration 即可。如需使用自己的 Configuration,可参考官方文档,有明确的使用说明。我们继续看 initialize()的实现,由于 WorkManager 是个抽象类,真正的构造方法是在他的子类 WorkManagerImpl 实现的:
//WorkManagerImpl@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public static void initialize(@NonNull Context context, @NonNull Configuration configuration) {synchronized (sLock) {if (sDelegatedInstance != null && sDefaultInstance != null) {throw new IllegalStateException("WorkManager is already initialized. Did you "
"try to initialize it manually without disabling "
"WorkManagerInitializer? See "
"WorkManager#initialize(Context, Configuration) or the class level "
"Javadoc for more information.");}
if (sDelegatedInstance == null) {context = context.getApplicationContext();if (sDefaultInstance == null) {sDefaultInstance = new WorkManagerImpl(context,configuration,new WorkManagerTaskExecutor(configuration.getTaskExecutor()));}sDelegatedInstance = sDefaultInstance;}}}
此时 sDelegatedInstance 为 null,WorkManager 会先创建一个默认的 WorkManagerTaskExecutor 对象,用来执行 WorkManager 的任务。之后创建一个 WorkManagerImpl 对象:
//WorkManagerImpl@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public WorkManagerImpl(@NonNull Context context,@NonNull Configuration configuration,@NonNull TaskExecutor workTaskExecutor) {this(context,configuration,workTaskExecutor,context.getResources().getBoolean(R.bool.workmanager_test_configuration));}
//WorkManagerImpl@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public WorkManagerImpl(@NonNull Context context,@NonNull Configuration configuration,@NonNull TaskExecutor workTaskExecutor,boolean useTestDatabase) {this(context,configuration,workTaskExecutor,WorkDatabase.create(context.getApplicationContext(),workTaskExecutor.getBackgroundExecutor(),useTestDatabase));}
WorkManager 在此时创建了数据库。WorkDatabase.create()将任务列表序列化到本地,记录每一个任务的属性,执行条件,执行顺序及执行状态等。从而保证任务在冷启动或硬件重启后,可以根据条件继续执行。接着看 this()的实现:
//WorkManagerImpl@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public WorkManagerImpl(@NonNull Context context,@NonNull Configuration configuration,@NonNull TaskExecutor workTaskExecutor,@NonNull WorkDatabase database) {Context applicationContext = context.getApplicationContext();Logger.setLogger(new Logger.LogcatLogger(configuration.getMinimumLoggingLevel()));List<Scheduler> schedulers = createSchedulers(applicationContext, workTaskExecutor);Processor processor = new Processor(context,configuration,workTaskExecutor,database,schedulers);internalInit(context, configuration, workTaskExecutor, database, schedulers, processor);}
到这里有三个重要的初始化步骤。分别是 createSchedulers()来根据 Build Version 创建不同的 Schedulers 进行任务调度,Processor()用来管理 Schedulers 的执行,和 internalInit()真正的初始化。先看 createSchedulers()的实现:
//WorkManagerImpl@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)@NonNullpublic List<Scheduler> createSchedulers(@NonNull Context context,@NonNull TaskExecutor taskExecutor) {
return Arrays.asList(Schedulers.createBestAvailableBackgroundScheduler(context, this),// Specify the task executor directly here as this happens before internalInit.// GreedyScheduler creates ConstraintTrackers and controllers eagerly.new GreedyScheduler(context, taskExecutor, this));}
return 一个 Scheduler 数组。其中 GreedyScheduler()是常驻的,用来执行没有任何约束的非周期性的任务。接下来看 createBestAvailableBackgroundScheduler()的实现。
//Scheduler@NonNullstatic Scheduler createBestAvailableBackgroundScheduler(@NonNull Context context,@NonNull WorkManagerImpl workManager) {
Scheduler scheduler;
if (Build.VERSION.SDK_INT >= WorkManagerImpl.MIN_JOB_SCHEDULER_API_LEVEL) {scheduler = new SystemJobScheduler(context, workManager);setComponentEnabled(context, SystemJobService.class, true);Logger.get().debug(TAG, "Created SystemJobScheduler and enabled SystemJobService");} else {scheduler = tryCreateGcmBasedScheduler(context);if (scheduler == null) {scheduler = new SystemAlarmScheduler(context);setComponentEnabled(context, SystemAlarmService.class, true);Logger.get().debug(TAG, "Created SystemAlarmScheduler");}}return scheduler;}
这段代码对 build version 进行了判断。若>=23,则返回 SystemJobScheduler(),即利用 JobScheduler 进行任务管理。<23 的时候先尝试使用 GcmScheduler 进行管理。若无法创建 GcmScheduler 则返回 SystemAlarmScheduler()使用 AlamManager 进行任务管理。返回的这个 Scheduler 是用来执行周期性,或者有约束性的任务。由此可见,WorkManager 创建了两个 Scheduler,分别为执行非约束非周期性任务的 GreedyScheduler,和执行约束性周期性任务的 SystemJobScheduler/GcmBasedScheduler/SystemAlarmScheduler。这几种 Scheduler 的构造和执行之后再分析。之后初始化 Processor。Processor 存储了 Configuration,TaskExecutor,WorkDatabase,schedulers 等,用来在适当的时机进行任务调度。再来看 internalInit():
//WorkManagerImplprivate void internalInit(@NonNull Context context,@NonNull Configuration configuration,@NonNull TaskExecutor workTaskExecutor,@NonNull WorkDatabase workDatabase,@NonNull List<Scheduler> schedulers,@NonNull Processor processor) {
context = context.getApplicationContext();mContext = context;mConfiguration = configuration;mWorkTaskExecutor = workTaskExecutor;mWorkDatabase = workDatabase;mSchedulers = schedulers;mProcessor = processor;mPreferenceUtils = new PreferenceUtils(workDatabase);mForceStopRunnableCompleted = false;
// Checks for app force stops.mWorkTaskExecutor.executeOnBackgroundThread(new ForceStopRunnable(context, this));}
记录了 Configuration,TaskExecutor,WorkDatabase,schedulers,Processor 等。然后我们看最后一行执行语句,启动了一个 ForceStopRunnable,这个 Runnable 是干什么用的呢?直接看 run()的实现:
//ForceStopRunnable@Overridepublic void run() {// Migrate the database to the no-backup directory if necessary.WorkDatabasePathHelper.migrateDatabase(mContext);// Clean invalid jobs attributed to WorkManager, and Workers that might have been// interrupted because the application crashed (RUNNING state).Logger.get().debug(TAG, "Performing cleanup operations.");try {boolean needsScheduling = cleanUp();if (shouldRescheduleWorkers()) {Logger.get().debug(TAG, "Rescheduling Workers.");mWorkManager.rescheduleEligibleWork();// Mark the jobs as migrated.mWorkManager.getPreferenceUtils().setNeedsReschedule(false);} else if (isForceStopped()) {Logger.get().debug(TAG, "Application was force-stopped, rescheduling.");mWorkManager.rescheduleEligibleWork();} else if (needsScheduling) {Logger.get().debug(TAG, "Found unfinished work, scheduling it.");Schedulers.schedule(mWorkManager.getConfiguration(),mWorkManager.getWorkDatabase(),mWorkManager.getSchedulers());}mWorkManager.onForceStopRunnableCompleted();} catch (SQLiteCantOpenDatabaseException| SQLiteDatabaseCorruptException| SQLiteAccessPermException exception) {// ForceStopRunnable is usually the first thing that accesses a database (or an app's// internal data directory). This means that weird PackageManager bugs are attributed// to ForceStopRunnable, which is unfortunate. This gives the developer a better error// message.String message ="The file system on the device is in a bad state. WorkManager cannot access "
"the app's internal data store.";Logger.get().error(TAG, message, exception);throw new IllegalStateException(message, exception);}}
这段代码的实现细节先不做深究。但是很明显,这个 Runnable 的作用就是在 WorkManager 初始化过程中,发现了未完成的,需要重新执行的任务,或者 app 被强制 kill 的情况下,直接对 Scheduler 进行调度。到此,一个 WorkManager 的初始化流程就完成了。
总结
WorkManager 的初始化是在 app 冷启动后,由 WorkManagerInitializer 这个 ContentProvider 执行的。
初始化过程包含了 Configuration,WorkManagerTaskExecutor,WorkDatabase,Schedulers,Processor 等的初始化过程。
Schedulers 有两个。(1) GreedyScheduler:执行没有任何约束的非周期性的任务。(2) SystemJobScheduler/GcmBasedScheduler/SystemAlarmScheduler:执行周期性或者有约束性的任务。优先返回 SystemJobScheduler,在 build version 小于 23 的情况下先尝试返回 GcmBasedScheduler,若返回为空再返回 SystemAlarmScheduler。
初始化的最后,会根据情况找到需要被执行的任务进行调度执行。
WorkManager 的初始化流程图:
4.1.2.WorkRequest 的创建
梳理完 WorkManager 的初始化过程后,我们回到示例代码,创建一个 OneTimeWorkRequest
val?work1Request?=?OneTimeWorkRequestBuilder<Worker1>().build()
//OneTimeWorkRequest.Builder/**
Creates a {@link OneTimeWorkRequest}.
@param workerClass The {@link ListenableWorker} class to run for this work*/public Builder(@NonNull Class<? extends ListenableWorker> workerClass) {super(workerClass);mWorkSpec.inputMergerClassName = OverwritingInputMerger.class.getName();}
//WorkRequest.BuilderBuilder(@NonNull Class<? extends ListenableWorker> workerClass) {mId = UUID.randomUUID();mWorkerClass = workerClass;mWorkSpec = new WorkSpec(mId.toString(), workerClass.getName());addTag(workerClass.getName());}
OneTimeWorkRequest 为 builder 对象创建了 WorkSpec 对象用来保存任务 id 和类名,其中 id 是通过 UUID 自动生成的。request 的 tag 默认是通过类名生成的,外部也可调用 addTag()方法设置标签。另外为 WorkSpec 设置了默认的任务输入流的合并规则:OverwritingInputMerger。接着看 build()方法的实现:
//WorkRequest.Builderpublic final @NonNull W build() {W returnValue = buildInternal();// Create a new id and WorkSpec so this WorkRequest.Builder can be used multiple times.mId = UUID.randomUUID();mWorkSpec = new WorkSpec(mWorkSpec);mWorkSpec.id = mId.toString();return returnValue;}
buildInternal()方法返回了一个 WorkRequest 对象,这是个抽象方法,在子类 OneTimeWorkRequest.Builder 中的实现如下:
//OneTimeWorkRequest.Builder@Override@NonNull OneTimeWorkRequest buildInternal() {if (mBackoffCriteriaSet&& Build.VERSION.SDK_INT >= 23&& mWorkSpec.constraints.requiresDeviceIdle()) {throw new IllegalArgumentException("Cannot set backoff criteria on an idle mode job");}if (mWorkSpec.runInForeground&& Build.VERSION.SDK_INT >= 23&& mWorkSpec.constraints.requiresDeviceIdle()) {throw new IllegalArgumentException("Cannot run in foreground with an idle mode constraint");}return new OneTimeWorkRequest(this);}
由于我们没有为 WorkSpec 设置其他属性,目前也没有约束条件,所以直接返回一个 OneTimeWorkRequest 对象。
//OneTimeWorkRequestOneTimeWorkRequest(Builder builder) {super(builder.mId, builder.mWorkSpec, builder.mTags);}
把 Builder 的 id, WorkSpec 对象和 tag 赋给 OneTimeWorkRequest 对象。再回到 Builder 的 build()方法:
//OneTimeWorkRequest.Builderpublic final @NonNull W build() {W returnValue = buildInternal();// Create a new id and WorkSpec so this WorkRequest.Builder can be used multiple times.mId = UUID.randomUUID();mWorkSpec = new WorkSpec(mWorkSpec);mWorkSpec.id = mId.toString();return returnValue;}
在 buildInternal()拿到 OneTimeWorkRequest 对象之后,为 Builder 创建了一个新的 WorkSpec 对象,并赋予了新的 UUID。虽然与原先的 WorkSpec 对象中每个属性的值是一致的,但指向了不同的内存地址。这么做的目的是为了这个 Builder 对象可被重复利用。好了,现在我们一个任务的 WorkRequest 创建就完成了。
总结
WorkRequest 的创建是为了持有三个重要的成员变量。分别是:
mId:由 UUID 生成的任务 id。
mWorkSpec:每个任务的属性。
mTags:每个任务的标签。
WorkRequest 创建的流程图
4.2 非约束条件任务的执行过程
执行 OneTimeWorkRequest
WorkManager.getInstance(this).enqueue(work1Request)
根据第一节的分析,WorkManager 是个单例,在 app 启动的时候就已经被初始化了。所以直接看 enqueue()的实现:
//WorkManager@NonNullpublic final Operation enqueue(@NonNull WorkRequest workRequest) {return enqueue(Collections.singletonList(workRequest));}
//WorkManager@NonNullpublic abstract Operation enqueue(@NonNull List<? extends WorkRequest> requests);
//WorkManagerImpl@NonNullpublic Operation enqueue(@NonNull List<? extends WorkRequest> workRequests) {
// This error is not being propagated as part of the Operation, as we want the// app to crash during development. Having no workRequests is always a developer error.if (workRequests.isEmpty()) {throw new IllegalArgumentException("enqueue needs at least one WorkRequest.");}return new WorkContinuationImpl(this, workRequests).enqueue();}
创建一个 WorkContinuationImpl()对象,再执行 enqueue()方法。WorkContinuationImpl 是 WorkContinuation 的子类。用来把多个 OneTimeWorkRequest 根据需求串行,并行或合并处理。我们熟悉的 then(),combine(),enqueue()等都是这个类的方法。
//WorkContinuationImplWorkContinuationImpl(@NonNull WorkManagerImpl workManagerImpl,String name,ExistingWorkPolicy existingWorkPolicy,@NonNull List<? extends WorkRequest> work,@Nullable List<WorkContinuationImpl> parents) {mWorkManagerImpl = workManagerImpl;mName = name;mExistingWorkPolicy = existingWorkPolicy;mWork = work;mParents = parents;mIds = new ArrayList<>(mWork.size());mAllIds = new ArrayList<>();if (parents != null) {for (WorkContinuationImpl parent : parents) {mAllIds.addAll(parent.mAllIds);}}for (int i = 0; i < work.size(); i++) {String id = work.get(i).getStringId();mIds.add(id);mAllIds.add(id);}}
WorkContinuation 保存了任务相关的所有信息,如 WorkManager,WorkRequest,父 WorkContinuation 等。继续看 WorkContinuationImpl 的 enqueue()方法的实现:
//WorkContinuationImpl@Overridepublic @NonNull Operation enqueue() {// Only enqueue if not already enqueued.if (!mEnqueued) {// The runnable walks the hierarchy of the continuations// and marks them enqueued using the markEnqueued() method, parent first.EnqueueRunnable runnable = new EnqueueRunnable(this);mWorkManagerImpl.getWorkTaskExecutor().executeOnBackgroundThread(runnable);mOperation = runnable.getOperation();} else {Logger.get().warning(TAG,String.format("Already enqueued work ids (%s)", TextUtils.join(", ", mIds)));}return mOperation;}
WorkManager 的 TaskExecutor 执行了 EnqueueRunnable。EnqueueRunnable 中 run()的实现:
//EnqueueRunnable@Overridepublic void run() {try {if (mWorkContinuation.hasCycles()) {throw new IllegalStateException(String.format("WorkContinuation has cycles (%s)", mWorkContinuation));}boolean needsScheduling = addToDatabase();if (needsScheduling) {// Enable RescheduleReceiver, only when there are Worker's that need scheduling.final Context context =mWorkContinuation.getWorkManagerImpl().getApplicationContext();PackageManagerHelper.setComponentEnabled(context, RescheduleReceiver.class, true);scheduleWorkInBackground();}mOperation.setState(Operation.SUCCESS);} catch (Throwable exception) {mOperation.setState(new Operation.State.FAILURE(exception));}}
addToDatabase()的作用是把 WorkSpec 存入到数据库,并对任务的状态进行校验。当前的 case 会返回 true。PackageManagerHelper.setComponentEnabled()开启了 RescheduleReceiver。通过反编译我们得知这个 Receiver 是在 AndroidManifest 中注册的,默认是 disable 的。监听了开机,时间变化,时区变化这三个广播。
//AndroidManifest<receiver android:directBootAware="false" android:enabled="false" android:exported="false" android:name="androidx.work.impl.background.systemalarm.RescheduleReceiver"><intent-filter><action android:name="android.intent.action.BOOT_COMPLETED"/><action android:name="android.intent.action.TIME_SET"/><action android:name="android.intent.action.TIMEZONE_CHANGED"/></intent-filter></receiver>
scheduleWorkInBackground()的实现:
//EnqueueRunnable/**
Schedules work on the background scheduler.*/@VisibleForTestingpublic void scheduleWorkInBackground() {WorkManagerImpl workManager = mWorkContinuation.getWorkManagerImpl();Schedulers.schedule(workManager.getConfiguration(),workManager.getWorkDatabase(),workManager.getSchedulers());}
这部分就是任务调度的实现。拿到 WorkManager 对象后调用了 Schedulers.schedule()方法,传入了 Configuration, WorkDatabase, Scheduler 这三个对象。执行 schedule()方法:
//Schedulerspublic static void schedule(@NonNull Configuration configuration,@NonNull WorkDatabase workDatabase,List<Scheduler> schedulers) {if (schedulers == null || schedulers.size() == 0) {return;}
WorkSpecDao workSpecDao = workDatabase.workSpecDao();List<WorkSpec> eligibleWorkSpecs;
workDatabase.beginTransaction();try {eligibleWorkSpecs = workSpecDao.getEligibleWorkForScheduling(configuration.getMaxSchedulerLimit());if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {long now = System.currentTimeMillis();
// Mark all the WorkSpecs as
scheduled.// Calls to Scheduler#schedule() could potentially result in more schedules// on a separate thread. Therefore, this needs to be done first.for (WorkSpec workSpec : eligibleWorkSpecs) {workSpecDao.markWorkSpecScheduled(workSpec.id, now);}}workDatabase.setTransactionSuccessful();} finally {workDatabase.endTransaction();}
if (eligibleWorkSpecs != null && eligibleWorkSpecs.size() > 0) {WorkSpec[] eligibleWorkSpecsArray = eligibleWorkSpecs.toArray(new WorkSpec[0]);// Delegate to the underlying scheduler.for (Scheduler scheduler : schedulers) {scheduler.schedule(eligibleWorkSpecsArray);}}}
先进行了一系列的数据库操作,然后开始根据条件每个任务进行调度。其中 eligibleWorkSpecs 返回的是在 ENQUEUED 状态下,未被执行且未被取消的 WorkSpec 列表,然后更新这些任务的 request 状态到数据库。最后遍历 schedulers 调用 scheduler.schedule()对每个任务进行调度处理。由于示例代码创建的是没有约束的一次性任务,所以看一下 GreedyScheduler 对于 schedule()方法的实现:
//GreedyScheduler@Overridepublic void schedule(@NonNull WorkSpec... workSpecs) {if (mIsMainProcess == null) {// The default process name is the package name.mIsMainProcess = TextUtils.equals(mContext.getPackageName(), getProcessName());}
if (!mIsMainProcess) {Logger.get().info(TAG, "Ignoring schedule request in non-main process");return;}
registerExecutionListenerIfNeeded();
// Keep track of the list of new WorkSpecs whose constraints need to be tracked.// Add them to the known list of constrained WorkSpecs and call replace() on// WorkConstraintsTracker. That way we only need to synchronize on the part where we// are updating mConstrainedWorkSpecs.List<WorkSpec> constrainedWorkSpecs = new ArrayList<>();List<String> constrainedWorkSpecIds = new ArrayList<>();for (WorkSpec workSpec : workSpecs) {if (workSpec.state == WorkInfo.State.ENQUEUED&& !workSpec.isPeriodic()&& workSpec.initialDelay == 0L&& !workSpec.isBackedOff()) {if (workSpec.hasConstraints()) {if (SDK_INT >= 23 && workSpec.constraints.requiresDeviceIdle()) {// Ignore requests that have an idle mode constraint.Logger.get().debug(TAG,String.format("Ignoring WorkSpec %s, Requires device idle.",workSpec));} else if (SDK_INT >= 24 && workSpec.constraints.hasContentUriTriggers()) {// Ignore requests that have content uri triggers.Logger.get().debug(TAG,String.format("Ignoring WorkSpec %s, Requires ContentUri triggers.",workSpec));} else {constrainedWorkSpecs.add(workSpec);constrainedWorkSpecIds.add(workSpec.id);}} else {Logger.get().debug(TAG, String.format("Starting work for %s", workSpec.id));mWorkManagerImpl.startWork(workSpec.id);}}}
// onExecuted() which is called on the main thread also modifies the list of mConstrained// WorkSpecs. Therefore we need to lock here.synchronized (mLock) {if (!constrainedWorkSpecs.isEmpty()) {Logger.get().debug(TAG, String.format("Starting tracking for [%s]",TextUtils.join(",", constrainedWorkSpecIds)));mConstrainedWorkSpecs.addAll(constrainedWorkSpecs);mWorkConstraintsTracker.replace(mConstrainedWorkSpecs);}}}
在: (1) WorkSpec 是 ENQUEUED 的状态 (2) 非周期性任务 (3) 非延迟任务 (4) 非撤销的任务 (5) 没有其它约束的任务 满足这五个条件后,直接调用:
//GreedySchedulermWorkManagerImpl.startWork(workSpec.id);
让 WorkManager 直接去执行任务。继续看 startWork()的实现:
//WorkManagerImpl/**
@param workSpecId The {@link WorkSpec} id to start
@hide*/@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public void startWork(@NonNull String workSpecId) {startWork(workSpecId, null);}
//WorkManagerImpl/**
@param workSpecId The {@link WorkSpec} id to start
@param runtimeExtras The {@link WorkerParameters.RuntimeExtras} associated with this work
@hide*/@RestrictTo(RestrictTo.Scope.LIBRARY_GROUP)public void startWork(@NonNull String workSpecId,@Nullable WorkerParameters.RuntimeExtras runtimeExtras) {mWorkTaskExecutor.executeOnBackgroundThread(new StartWorkRunnable(this, workSpecId, runtimeExtras));}
WorkTaskExecutor 对任务进行了调度。StartWorkRunnable 的 run()的实现:
//StartWorkRunnable@Overridepublic void run() {mWorkManagerImpl.getProcessor().startWork(mWorkSpecId, mRuntimeExtras);}
StartWorkRunnable 会将任务的信息交给 Processor,由 Processor 调用 startWork()去执行任务:
//Processor/**
Starts a given unit of work in the background.
@param id The work id to execute.
@param runtimeExtras The {@link WorkerParameters.RuntimeExtras} for this work, if any.
@return {@code true} if the work was successfully enqueued for processing*/public boolean startWork(@NonNull String id,@Nullable WorkerParameters.RuntimeExtras runtimeExtras) {
WorkerWrapper workWrapper;synchronized (mLock) {// Work may get triggered multiple times if they have passing constraints// and new work with those constraints are added.if (mEnqueuedWorkMap.containsKey(id)) {Logger.get().debug(TAG,String.format("Work %s is already enqueued for processing", id));return false;}
workWrapper =new WorkerWrapper.Builder(mAppContext,mConfiguration,mWorkTaskExecutor,this,mWorkDatabase,id).withSchedulers(mSchedulers).withRuntimeExtras(runtimeExtras).build();ListenableFuture<Boolean> future = workWrapper.getFuture();future.addListener(new FutureListener(this, id, future),mWorkTaskExecutor.getMainThreadExecutor());mEnqueuedWorkMap.put(id, workWrapper);}mWorkTaskExecutor.getBackgroundExecutor().execute(workWrapper);Logger.get().debug(TAG, String.format("%s: processing %s", getClass().getSimpleName(), id));return true;}
startWork()方法中创建了一个 WorkerWrapper 的 Runnable 对象,交由 WorkTaskExecutor 调度处理。WorkerWrapper 的 run()方法的实现:
//WorkerWrapper@WorkerThread@Overridepublic void run() {mTags = mWorkTagDao.getTagsForWorkSpecId(mWorkSpecId);mWorkDescription = createWorkDescription(mTags);runWorker();}
//WorkerWrapperprivate void runWorker() {if (tryCheckForInterruptionAndResolve()) {return;}
mWorkDatabase.beginTransaction();try {mWorkSpec = mWorkSpecDao.getWorkSpec(mWorkSpecId);...mWorkDatabase.setTransactionSuccessful();} finally {mWorkDatabase.endTransaction();}
// Merge inputs. This can be potentially expensive code, so this should not be done inside// a database transaction....
WorkerParameters params = new WorkerParameters(UUID.fromString(mWorkSpecId),input,mTags,mRuntimeExtras,mWorkSpec.runAttemptCount,mConfiguration.getExecutor(),mWorkTaskExecutor,mConfiguration.getWorkerFactory(),new WorkProgressUpdater(mWorkDatabase, mWorkTaskExecutor),new WorkForegroundUpdater(mForegroundProcessor, mWorkTaskExecutor));
// Not always creating a worker here, as the WorkerWrapper.Builder can set a worker override// in test mode.if (mWorker == null) {mWorker = mConfiguration.getWorkerFactory().createWorkerWithDefaultFallback(mAppContext,mWorkSpec.workerClassName,params);}...
// Try to set the work to the running state. Note that this may fail because another thread// may have modified the DB since we checked last at the top of this function.if (trySetRunning()) {if (tryCheckForInterruptionAndResolve()) {return;}
final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();// Call mWorker.startWork() on the main thread.mWorkTaskExecutor.getMainThreadExecutor().execute(new Runnable() {@Overridepublic void run() {try {Logger.get().debug(TAG, String.format("Starting work for %s",mWorkSpec.workerClassName));mInnerFuture = mWorker.startWork();future.setFuture(mInnerFuture);} catch (Throwable e) {future.setException(e);}
}});
// Avoid synthetic accessors....}
这段代码很长,我们省略了一些判断步骤和与示例无关的参数设置。先创建一个 WorkerParameters 对象。然后调用 mConfiguration.getWorkerFactory().createWorkerWithDefaultFallback()方法创建 Worker 对象。
这个方法我们不展开了,返回的就是我们自己的 Woker 对象,即 Worker1 的实例。之后交由 WorkTaskExecutor 调度处理。在 run()方法的实现,我们看到调用了 mWorker.startWork()方法:
//ListenableWorker????@MainThread????public?abstract?@NonNull?ListenableFuture<Result>?startWork();
ListenableWorker 是个抽象类,是所有 Worker 的父类。Worker1 也继承 Worker 类,startWork()在 Worker 中的实现:
//Worker@Overridepublic final @NonNull ListenableFuture<Result> startWork() {mFuture = SettableFuture.create();getBackgroundExecutor().execute(new Runnable() {@Overridepublic void run() {try {Result result = doWork();mFuture.set(result);} catch (Throwable throwable) {mFuture.setException(throwable);}
}});return mFuture;}
在 run()的实现执行了 doWork()方法,即执行了我们 Worker1 的 doWork()方法。
//Worker1class Worker1(appContext: Context, workerParams: WorkerParameters) :Worker(appContext, workerParams) {
override fun doWork(): Result {Thread.sleep(5000)return Result.success()}}
在执行完这个任务后,返回了 success。Worker 也就执行完成了。回到 WorkerWrapper 的 runWorker()方法看接下来的处理:
//WorkerWrapperprivate void runWorker() {...final SettableFuture<ListenableWorker.Result> future = SettableFuture.create();// Call mWorker.startWork() on the main thread.mWorkTaskExecutor.getMainThreadExecutor().execute(new Runnable() {@Overridepublic void run() {try {Logger.get().debug(TAG, String.format("Starting work for %s",mWorkSpec.workerClassName));mInnerFuture = mWorker.startWork();future.setFuture(mInnerFuture);} catch (Throwable e) {future.setException(e);}
}});
// Avoid synthetic accessors.final String workDescription = mWorkDescription;future.addListener(new Runnable() {@Override@SuppressLint("SyntheticAccessor")public void run() {try {// If the ListenableWorker returns a null result treat it as a failure.ListenableWorker.Result result = future.get();if (result == null) {Logger.get().error(TAG, String.format("%s returned a null result. Treating it as a failure.",mWorkSpec.workerClassName));} else {Logger.get().debug(TAG, String.format("%s returned a %s result.",mWorkSpec.workerClassName, result));mResult = result;}} catch (CancellationException exception) {// Cancellations need to be treated with care here because innerFuture// cancellations will bubble up, and we need to gracefully handle that.Logger.get().info(TAG, String.format("%s was cancelled", workDescription),exception);} catch (InterruptedException | ExecutionException exception) {Logger.get().error(TAG,String.format("%s failed because it threw an exception/error",workDescription), exception);} finally {onWorkFinished();}}}, mWorkTaskExecutor.getBackgroundExecutor());} else {resolveIncorrectStatus();}}
startWork()返回了一个 Future 对象 mInnerFuture,调用 future.setFuture(mInnerFuture)去处理 doWork()返回的 result。再经过一系列判断后,最终执行了 onWorkFinished()方法:
//WorkerWrappervoid onWorkFinished() {boolean isWorkFinished = false;if (!tryCheckForInterruptionAndResolve()) {mWorkDatabase.beginTransaction();try {WorkInfo.State state = mWorkSpecDao.getState(mWorkSpecId);mWorkDatabase.workProgressDao().delete(mWorkSpecId);if (state == null) {// state can be null here with a REPLACE on beginUniqueWork().// Treat it as a failure, and rescheduleAndResolve() will// turn into a no-op. We still need to notify potential observers// holding on to wake locks on our behalf.resolve(false);isWorkFinished = true;} else if (state == RUNNING) {handleResult(mResult);// Update state after a call to handleResult()state = mWorkSpecDao.getState(mWorkSpecId);isWorkFinished = state.isFinished();} else if (!state.isFinished()) {rescheduleAndResolve();}mWorkDatabase.setTransactionSuccessful();} finally {mWorkDatabase.endTransaction();}}
// Try to schedule any newly-unblocked workers, and workers requiring rescheduling (such as// periodic work using AlarmManager). This code runs after runWorker() because it should// happen in its own transaction.
// Cancel this work in other schedulers. For example, if this work was// completed by GreedyScheduler, we should make sure JobScheduler is informed// that it should remove this job and AlarmManager should remove all related alarms.if (mSchedulers != null) {if (isWorkFinished) {for (Scheduler scheduler : mSchedulers) {scheduler.cancel(mWorkSpecId);}}Schedulers.schedule(mConfiguration, mWorkDatabase, mSchedulers);}}
在 onWorkFinished()会对刚刚执行完毕的任务作进一步处理。首先获取任务的当前状态 state,然后从 db 中删除这个任务,再根据 state 作进一步处理。在我们的示例中,这时候 state 应该是 RUNNING,我们看一下 handleResult(mResult)的实现:
//WorkerWrapperprivate void handleResult(ListenableWorker.Result result) {if (result instanceof ListenableWorker.Result.Success) {Logger.get().info(TAG,String.format("Worker result SUCCESS for %s", mWorkDescription));
评论