从观察者模式出发,聊聊 RxJava,flutter 开发实战详解 pdf
具体的观察者是如何实例化的
我们看一下这段代码:
Observable mObservable = Observable.create(new ObservableOnSubscribe() {@Overridepublic void subscribe(ObservableEmitter e) throws Exception {}});
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {Function<? super Observable, ? extends Observable> f = onObservableAssembly;// 是否有别的其他操作符运算,有的话,在此 Observable 上执行一遍 if (f != null) {return apply(f, source);}return source;}
RxJava 的代码里,很多时候会有 ObjectHelper.requireNonNull 这种空检查的地方,一律都是为了最大程度的防止 NPE 的出现,后面出现就不再赘述了.
我们使用 create 操作符创建 Observable 的过程中,看似经历了很多方法,在不考虑任何其他操作符的前提下,整个过程简化一下的话就这么一句代码
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
从之前的分析,我们也看到了 ObservableCreate 就是 Observeable 抽象类的一个子类。我们简单看一下他的实现。
public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}
@Overrideprotected void subscribeActual(Observer<? super T> observer) {……}}
可以看到,他唯一的构造函数需要一个 ObservableOnSubscribe 实例,同时他实现 subscribeActual 方法,说明他真正处理主题和观察者之间实现订阅的逻辑。
看了半天,你可能一直很好奇,这个 ObservableOnSubscribe 是个什么东西呢?他其实很简单。
/**
A functional interface that has a {@code subscribe()} method that receives
an instance of an {@link ObservableEmitter} instance that allows pushing
events in a cancellation-safe manner.
@param <T> the value type pushed*/public interface ObservableOnSubscribe<T> {
/**
Called for each Observer that subscribes.
@param e the safe emitter instance, never null
@throws Exception on error*/void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;}
ε=(′ο`*)))唉,怎么又一个 subscribe,这又是啥?不要慌,看注释。意思是说,这里的 subscribe 接收到一个 ObservableEmitter 实例后,就会允许他以一种可以安全取消(也就是一定能取消)的形式发送事件。
就是说会有某个对象,给他一个 ObservableEmitte 的实例,没给他之前他是不会主动发送事件的,会一直憋着。,到这里,你是不是想到了什么,我们知道在 RxJava 中只有观察者(下游)订阅(subscribe)了主题(上游),主题才会发送事件。这就是和普通的观察者模式有区别的地方之一。
好了,最后再来看看这个神秘的 ObservableEmitter 是个什么鬼?
public interface ObservableEmitte
r<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
/**
Attempts to emit the specified {@code Throwable} error if the downstream
hasn't cancelled the sequence or is otherwise terminated, returning false
if the emission is not allowed to happen due to lifecycle restrictions.
<p>
Unlike {@link #onError(Throwable)}, the {@code RxJavaPlugins.onError} is not called
if the error could not be delivered.
@param t the throwable error to signal if possible
@return true if successful, false if the downstream is not able to accept further
events
@since 2.1.1 - experimental*/boolean tryOnError(@NonNull Throwable t);}
这里可以关注一下 tryOnError 这个方法,可以看到他会把某些类型的 error 传递到下游。
o(╥﹏╥)o,又是一个接口,而且还继承了另一个接口,什么情况?继续看
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();}
惊不惊喜,意不意外? 哈哈,终于找到你了,熟悉的 onNext,onError,onComplete.原来在这里。
这里有个问题可以思考一下,在抽象观察者中,定义了四个处理事件的方法,这里只有三个,按照对应关系来说似乎缺了一个 onSubscribe,这又是怎么回事呢?后面会有分析,可以自己先想想
这两个接口的含义很明显了,总结一下:
Emitter 定义了可以发送的事件的三种机制
ObservableEmitter 在 Emitter 做了扩展,添加了 Disposable 相关的方法,可以用来取消事件的发送。
好了,绕了一大圈,就为了一行代码:
Observable mObservable=new ObservableCreate(new ObservableOnSubscribe())
总结一下具体主题(上游)的到底干了啥:
创建了一个 ObservableCreate 的实例对象
ObservableCreate 内持有 ObservableOnSubscribe 对象的引用
ObservableOnSubscribe 是一个接口,内部有一个 subscribe 方法,调用他之后,会用其 ObservableEmitter 实例开始发送事件。
ObservableEmitter 继承自 Emitte。
如何实现订阅、发送事件和接收事件
为了方便叙述,把问题 3 和 4 连在一起说了。
通过上面的叙述,现在具体主题和具体的观察者都创建好了,接下来就是实现二者的订阅关系。
mObservable.subscribe(mObserver);
这里需要明确的一点是,是观察者(下游)订阅了主题(上游),虽然从代码上看好像了前者订阅了后者,不要搞混了。
我们看 Observable 的 subscribe() 方法:
public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);} catch (NullPointerException e) { // NOPMDthrow e;} catch (Throwable e) {……}}
这个前面已经提到过了,Observable 并没有真正的去实现 subscribe,而是把他转接给了 subscribeActual()方法。
前面已经说过,Observable 的实例是一个 ObservableCreate 对象,那么我们就到这个类里去看看 subscribeActual()的实现。
// 为了方便,顺便再看一眼构造函数 public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);
try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}}
CreateEmitter 实现了之前提到的 ObservableEmitter 接口。这里有一句关键的代码:
observer.onSubscribe(parent);
之前在看到 Emitter 的定义时,我们说缺少了 onSubscribe 方法,到这里就明白了。onSubscribe 并不是由主题(上游)主动发送的事件,而是有观察者(下游)自己调用的一个事件,只是为了方便获取 Emitter 的实例对象,准确的说应该是 Disposable 的实例对象,这样下游就可以控制上游了。
接下来就更简单了,source 是 ObservableOnSubscribe,按照之前的逻辑,调用其 subscribe 方法,给他一个 ObservableEmitter 对象实例,ObservableEmitter 就会开始发送事件序列。这样,一旦开始订阅了,主题(上游)就开始发送事件了。也就是我们熟悉的 onNext,onComplete,onError 方法真正的开始执行了。
接着看看 CreateEmitter 的实现。
public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}
@Overrideprotected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);……}
static final class CreateEmitter<T>extends AtomicReference<Disposable>implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {this.observer = observer;}
@Overridepublic void onNext(T t) {if (t == null) {onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));return;}if (!isDisposed()) {observer.onNext(t);}}
@Overridepublic void onError(Throwable t) {if (!tryOnError(t)) {RxJavaPlugins.onError(t);}}
@Overridepublic boolean tryOnError(Throwable t) {if (t == null) {t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");}if (!isDisposed()) {try {observer.onError(t);} finally {dispose();}return true;}return false;}
@Overridepublic void onComplete() {if (!isDisposed()) {try {observer.onComplete();} finally {dispose();}}}
@Overridepublic void setDisposable(Disposable d) {DisposableHelper.set(this, d);}
@Overridepublic void setCancellable(Cancellable c) {setDisposable(new CancellableDisposable(c));}
@Overridepublic ObservableEmitter<T> serialize() {return new SerializedEmitter<T>(this);}
@Overridepublic void dispose() {DisposableHelper.dispose(this);}
@Overridepublic boolean isDisposed() {return DisposableHelper.isDisposed(get());
评论