写点什么

详解 -RxJava2- 的线程切换原理 (1)

用户头像
Android架构
关注
发布于: 8 小时前

public void onNext(String s) {Log.i(TAG, s);}


@Overridepublic void onError(Throwable e) {


}


@Overridepublic void onComplete() {


}});}



RxJava2 基本的运行流程

根据上述源码分析出流程图,这里颜色相同的代表同一对象。根据流程图看一遍源码基本流程就能理通


RxJava2 线程切换原理

一、observeOn() 的线程切换原理

根据运行流程来看 observeOn() 执行后是得到 ObservableObserveOn 对象,那么当 ObservableObserveOn 绑定监听者的时候要运行 subscribe() 方法


public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");//调用 subscribeActual()subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {...}}


接下来我们看一下 subscribeActual() 方法


protected void subscribeActual(Observer<? super T> observer) {if (scheduler instanceof TrampolineScheduler) {source.subscribe(observer);} else {//scheduler 是传进来的线程调度对象,如 Schedulers.io() 、AndroidSchedulers.mainThread() 等,这里调用了 createWorker() 方法暂时看一下就好稍后分析 RxAndroid 会说明 Scheduler.Worker w = scheduler.createWorker();//我们看到他把 w 参数传进去了 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));}}


从上述代码我们可以看到 ObservableObserveOn 是被 ObserveOnObserver 监听的,所以收到通知也是由 ObserveOnObserver 作出响应,接下来我们假设当 Rxjava 发送 onNext 通知时会调用 ObserveOnObserver 的 onNext() 方法 ( PS:当然如果是 onComplete()、onError() 等也是一样的逻辑 ),然后我们来看一看 ObserveOnObserver 的 onNext() 方法,


@Overridepublic void onNext(T t) {if (done) {return;}if (sourceMode != QueueDisposable.ASYNC) {queue.offer(t);}//切换线程 schedule();}


void schedule() {if (getAndIncrement() == 0) {//直接调用了 worker 的 schedule 方法,需要注意的是这里他把自己传了进去 worker.schedule(this);}}


现在我先把把 schedule(Runnable run) 贴出来


public Disposable schedule(@NonNull Runnable run) {return schedule(run, 0L, TimeUnit.NANOSECONDS);}


  1. 我们看到这个他接收的参数是一个 Runnable,这是怎么回事呢,我们看一下 ObserveOnObserver 对象,他不但实现了 Observer 接口并且也实现了 Runnable 接口

  2. 接下看,继续调用 schedule( Runnable action, long delayTime, TimeUnit unit) 方法,但是这个方法是个抽象方法,这里我们就假设这里这个 worker 是 IO 线程,所以我直接贴 IoS


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


cheduler 的代码了


public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {if (tasks.isDisposed()) {// don't schedule, we are unsubscribedreturn EmptyDisposable.INSTANCE;}return threadWorker.scheduleActual(action, delayTime, unit, tasks);}


然后再贴一下 scheduleActual 的方法


public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//就是个 RunnableScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);


if (parent != null) {if (!parent.add(sr)) {return sr;}}


Future<?> f;try {//判断延迟时间,然后使用线程池运行 Runnableif (delayTime <= 0) {f = executor.submit((Callable<Object>)sr);} else {f = executor.schedule((Callable<Object>)sr, delayTime, unit);}sr.setFuture(f);} catch (RejectedExecutionException ex) {if (parent != null) {parent.remove(sr);}RxJavaPlugins.onError(ex);}return sr;}


这样一来就会在相应的线程中运行 ObserveOnObserver 的 run 方法


public void run() {//这个地方具体的我还没有搞明白,大概就是在这个方法里调用 onNext() ,然后 observeOn() 操作符之后的监听者的运行线程就变了 if (outputFused) {drainFused();} else {drainNormal();}}

二、subscribeOn() 的线程切换原理

PS:这个切换原理其实和 observeOn() 原理很像


跟 observeOn() 一样,只不过这个操作的对象是 ObservableSubscribeOn, 这个对象也是同样的代码逻辑,运行 subscribe() 方法,然后调用 subscribeActual() 方法,所以就直接贴 subscribeActual() 的代码


public void subscribeActual(final Observer<? super T> s) {//创建与之绑定的 SubscribeOnObserverfinal SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);s.onSubscribe(parent);//1. 创建 SubscribeTask 实际上就是个 Runnable//2. 然后调用 scheduler.scheduleDirect 方法 parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));}


我们看一下 scheduleDirect 的方法


public Disposable scheduleDirect(@NonNull Runnable run) {return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);}


public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {final Worker w = createWorker();final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//一个 Runnable 具体作用没分析 DisposeTask task = new DisposeTask(decoratedRun, w);//这个代码看着熟悉吗 没错上面 observeOn 提到过,知道它是运行 Runnable 我们就直接看 Runnable 里面的 run() 了 w.schedule(task, delay, unit);return task;}


我们看一下 DisposeTask 的 run()


public void run() {runner = Thread.currentThread();try {decoratedRun.run();} finally {dispose();runner = null;}}


调来调去我们又回到了 SubscribeTask 的 run()


public void run() {source.subscribe(parent);}


这个地方的运行线程已经被切换了,他又开始往上一层层的去订阅,所以 create(new ObservableOnSubscribe<String>(){})这个匿名实现接口运行 subscribe 的线程运行环境都被改变了,再去调用 onNext() 等方法线程环境也是被改变的

为什么 subscribeOn() 只有第一次切换有效

写到这里我们这个问题也就能回答了因为 RxJava 最终能影响 ObservableOnSubscribe 这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn() ,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn() 就是最后运行的,这就造成了写多个 subscribeOn() 并没有什么乱用的现象。



分析一下 RxAndroid


其实 RxAndroid 里面并没有什么复杂的代码,他其实只是提供一个能切换到 Android 主线程线程调度器。


其实它的原理和 RxJava 自带的那些线程调度器一样,如果你想了解 RxJava 的 IO 线程池,什么的可以自己看一看,我这里分析 RxAndroid 主要有以下几点原因


  1. 弄清楚 RxAndroid 这个库的具体作用

  2. 弄清楚他是怎么就能把线程切换到主线程(他是怎么提供的主线程环境)

  3. 弄清楚线程调度器的运行原理

  4. 最重要的是它相对于 RxJava 自带的那些调度器,他比较简单容易分析


###正文开始


首先我们找一下入口 AndroidSchedulers.mainThread() 这个地方应该是就是入口了,我们看一下 AndroidSchedulers 这个类的源码吧,总共也没几行


private static final class MainHolder {static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));}


private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(new Callable<Scheduler>() {@Override public Scheduler call() throws Exception {return MainHolder.DEFAULT;}});


public static Scheduler mainThread() {return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);}


public static Scheduler from(Looper looper) {if (looper == null) throw new NullPointerException("looper == null");return new HandlerScheduler(new Handler(looper));}

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
详解-RxJava2-的线程切换原理(1)