写点什么

从观察者模式出发,聊聊 RxJava,flutter 开发实战详解 pdf

用户头像
Android架构
关注
发布于: 刚刚

具体的观察者是如何实例化的

我们看一下这段代码:


Observable mObservable = Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {}});


public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}


public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;// 是否有别的其他操作符运算,有的话,在此 Observable 上执行一遍 if (f != null) {return apply(f, source);}return source;}


RxJava 的代码里,很多时候会有 ObjectHelper.requireNonNull 这种空检查的地方,一律都是为了最大程度的防止 NPE 的出现,后面出现就不再赘述了.


我们使用 create 操作符创建 Observable 的过程中,看似经历了很多方法,在不考虑任何其他操作符的前提下,整个过程简化一下的话就这么一句代码


Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())


从之前的分析,我们也看到了 ObservableCreate 就是 Observeable 抽象类的一个子类。我们简单看一下他的实现。


public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;


public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}


@Overrideprotected void subscribeActual(Observer<? super T> observer) {……}}


可以看到,他唯一的构造函数需要一个 ObservableOnSubscribe 实例,同时他实现 subscribeActual 方法,说明他真正处理主题观察者之间实现订阅的逻辑。


看了半天,你可能一直很好奇,这个 ObservableOnSubscribe 是个什么东西呢?他其实很简单。


/**


  • A functional interface that has a {@code subscribe()} method that receives

  • an instance of an {@link ObservableEmitter} instance that allows pushing

  • events in a cancellation-safe manner.

  • @param <T> the value type pushed*/public interface ObservableOnSubscribe<T> {


/**


  • Called for each Observer that subscribes.

  • @param e the safe emitter instance, never null

  • @throws Exception on error*/void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;}


ε=(′ο`*)))唉,怎么又一个 subscribe,这又是啥?不要慌,看注释。意思是说,这里的 subscribe 接收到一个 ObservableEmitter 实例后,就会允许他以一种可以安全取消(也就是一定能取消)的形式发送事件。


就是说会有某个对象,给他一个 ObservableEmitte 的实例,没给他之前他是不会主动发送事件的,会一直憋着。,到这里,你是不是想到了什么,我们知道在 RxJava 中只有观察者(下游)订阅(subscribe)了主题(上游),主题才会发送事件。这就是和普通的观察者模式有区别的地方之一。


好了,最后再来看看这个神秘的 ObservableEmitter 是个什么鬼?


public interface ObservableEmitte


《Android学习笔记总结+最新移动架构视频+大厂安卓面试真题+项目实战源码讲义》
浏览器打开:qq.cn.hn/FTe 免费领取
复制代码


r<T> extends Emitter<T> {


void setDisposable(@Nullable Disposable d);


void setCancellable(@Nullable Cancellable c);


boolean isDisposed();


ObservableEmitter<T> serialize();


/**


  • Attempts to emit the specified {@code Throwable} error if the downstream

  • hasn't cancelled the sequence or is otherwise terminated, returning false

  • if the emission is not allowed to happen due to lifecycle restrictions.

  • <p>

  • Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called

  • if the error could not be delivered.

  • @param t the throwable error to signal if possible

  • @return true if successful, false if the downstream is not able to accept further

  • events

  • @since 2.1.1 - experimental*/boolean tryOnError(@NonNull Throwable t);}


这里可以关注一下 tryOnError 这个方法,可以看到他会把某些类型的 error 传递到下游。


o(╥﹏╥)o,又是一个接口,而且还继承了另一个接口,什么情况?继续看


public interface Emitter<T> {


void onNext(@NonNull T value);


void onError(@NonNull Throwable error);


void onComplete();}


惊不惊喜,意不意外? 哈哈,终于找到你了,熟悉的 onNext,onError,onComplete.原来在这里。


这里有个问题可以思考一下,在抽象观察者中,定义了四个处理事件的方法,这里只有三个,按照对应关系来说似乎缺了一个 onSubscribe,这又是怎么回事呢?后面会有分析,可以自己先想想


这两个接口的含义很明显了,总结一下:


  • Emitter 定义了可以发送的事件的三种机制

  • ObservableEmitter 在 Emitter 做了扩展,添加了 Disposable 相关的方法,可以用来取消事件的发送。


好了,绕了一大圈,就为了一行代码:


Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())


总结一下具体主题(上游)的到底干了啥:


  • 创建了一个 ObservableCreate 的实例对象

  • ObservableCreate 内持有 ObservableOnSubscribe 对象的引用

  • ObservableOnSubscribe 是一个接口,内部有一个 subscribe 方法,调用他之后,会用其 ObservableEmitter 实例开始发送事件。

  • ObservableEmitter 继承自 Emitte。

如何实现订阅、发送事件和接收事件

为了方便叙述,把问题 3 和 4 连在一起说了。


通过上面的叙述,现在具体主题和具体的观察者都创建好了,接下来就是实现二者的订阅关系。


mObservable.subscribe(mObserver);


这里需要明确的一点是,是观察者(下游)订阅了主题(上游),虽然从代码上看好像了前者订阅了后者,不要搞混了。


我们看 Observable 的 subscribe() 方法:


public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);


ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");


subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {……}}


这个前面已经提到过了,Observable 并没有真正的去实现 subscribe,而是把他转接给了 subscribeActual()方法。


前面已经说过,Observable 的实例是一个 ObservableCreate 对象,那么我们就到这个类里去看看 subscribeActual()的实现。


// 为了方便,顺便再看一眼构造函数 public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);


try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}


CreateEmitter 实现了之前提到的 ObservableEmitter 接口。这里有一句关键的代码:


observer.onSubscribe(parent);


之前在看到 Emitter 的定义时,我们说缺少了 onSubscribe 方法,到这里就明白了。onSubscribe 并不是由主题(上游)主动发送的事件,而是有观察者(下游)自己调用的一个事件,只是为了方便获取 Emitter 的实例对象,准确的说应该是 Disposable 的实例对象,这样下游就可以控制上游了。


接下来就更简单了,source 是 ObservableOnSubscribe,按照之前的逻辑,调用其 subscribe 方法,给他一个 ObservableEmitter 对象实例,ObservableEmitter 就会开始发送事件序列。这样,一旦开始订阅了,主题(上游)就开始发送事件了。也就是我们熟悉的 onNext,onComplete,onError 方法真正的开始执行了。


接着看看 CreateEmitter 的实现。


public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;


public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}


@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);……}


static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {


private static final long serialVersionUID = -3434801548987643227L;


final Observer<? super T> observer;


CreateEmitter(Observer<? super T> observer) {this.observer = observer;}


@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}


@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}


@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}


@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}


@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}


@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}


@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}


@Overridepublic void dispose() {DisposableHelper.dispose(this);}


@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());

用户头像

Android架构

关注

还未添加个人签名 2021.10.31 加入

还未添加个人简介

评论

发布
暂无评论
从观察者模式出发,聊聊RxJava,flutter开发实战详解pdf