RxJava 沉思录(三),微信小程序的事件处理
虽然这个例子比较简单,但是它很好的表达了 Observable 可以在时间维度上对其发射的事件进行重新组织 , 从而做到之前 Callback 形式不容易做到的事情。
社交软件上消息的点赞与取消点赞
点赞与取消点赞是社交软件上经常出现的需求,假设我们目前有下面这样的点赞与取消点赞的代码:
boolean like;
likeBtn.setOnClickListener(v -> {if (like) {// 取消点赞 sendCancelLikeRequest(postId);} else {// 点赞 sendLikeRequest(postId);}like = !like;});
以下图片素材资源来自 Dribbble
如果你碰巧实现了一个非常酷炫的点赞动画,用户可能会玩得不亦乐乎,这个时候可能会对后端服务器造成一定的压力,因为每次点赞与取消点赞都会发起网络请求,假如很多用户同时在玩这个点赞动画,服务器可能会不堪重负。
和前一个例子的防抖动思路差不多,我们首先想到需要防抖动:
boolean like;PublishSubject<Boolean> likeAction = PublishSubject.create();
likeBtn.setOnClickListener(v -> {likeAction.onNext(like);like = !like;});
likeAction.debounce(1000, TimeUnit.MILLISECONDS).observerOn(AndroidSchedulers.mainThread()).subscribe(like -> {if (like) {sendCancelLikeRequest(postId);} else {sendLikeRequest(postId);}});
写到这个份上,其实已经可以解决服务器压力过大的问题了,但是还是有优化空间,假设当前是已赞状态,用户快速点击 2 下,按照上面的代码,还是会发送一次点赞的请求,由于当前是已赞状态,再发送一次点赞请求是没有意义的,所以我们优化的目标就是将这一类事件过滤掉:
Observable<Boolean> debounced = likeAction.debounce(1000, TimeUnit.MILLISECONDS);debounced.zipWith(debounced.startWith(like),(last, current) -> last == current ? new Pair<>(false, false) : new Pair<>(true, current)).flatMap(pair -> pair.first ? Observable.just(pair.second) : Observable.empty()).subscribe(like -> {if (like) {sendCancelLikeRequest(postId);} else {sendLikeRequest(postId);}});
zipWith
操作符可以把两个Observable
发射的相同序号(同为第 x 个)的元素,进行运算转换,得到的新元素作为新的Observable
对应序号所发射的元素。参考资料:ZipWith
上面的代码,我们可以看到,首先我们对事件流做了一次 debounce
操作,得到 debounced
事件流,然后我们把 debounced
事件流和 debounced.startWith(like)
事件流做了一次 zipWith
操作。相当于新的这个 Observable
中发射的第 n 个元素(n >= 2)是由 debounced
事件流中的第 n 和 第 n-1 个元素运算得到的(新的这个 Observable
中发射的第 1 个元素是由 debounced
事件流中的第 1 个元素和原始点赞状态 like
运算而来)。
运算的结果是得到一个 Pair
对象,它是一个双布尔类型二元组,二元组第一个元素为 true 代表这个事件不该被忽略,应该被观察者观察到;若为 false 则应该被忽略。二元组的第二个元素仅在第一个元素为 true 的情况下才有意义,true 表示应该发起一次点赞操作,而 false 表示应该发起一次取消点赞操作。上面提到的“运算”具体运算的规则是,比较两个元素,若相等,则把二元组的第一个元素置为 false,若不相等,则把二元组的第一个元素置为 true, 同时把二元组的第二个元素置为 debounced
事件流发射的那个元素。
随后的 flatMap
操作符完成了两个逻辑,一是过滤掉二元组第一个元素为 false 的二元组,二是把二元组转化回最初的 Boolean
事件流。其实这个逻辑也可由 filter
和 map
两个操作符配合完成,这里为了简单用了一个操作符。
虽然上面用了不少篇幅解释了每个操作符的意义,但其实核心思想是简单的,就是在原先 debounce
操作符的基础上,把得到的事件流里每个元素和它的上一个元素做比较,如果这个元素和上个元素相同(例如在已赞状态下再次发起点赞操作), 就把这个元素过滤掉,这样最终的观察者里只会在在真正需要改变点赞状态的时候才会发起网络请求了。
我们考虑用 Callback 实现相同逻辑,虽然比较本次操作与上次操作这样的逻辑通过 Callback 也可以做到,但是 debounce
这个操作符完成的任务,如果要使用 Callback 来实现就非常复杂了,我们需要定义一个计时器,还要负责启动与关闭这个计时器,我们的 Callback 内部会掺杂进很多和观察者本身无关的逻辑,相比 RxJava 版本的纯粹相去甚远。
检测双击事件
首先,我们需要定义双击事件,不妨先规定两次点击小于 500 毫秒则为一次双击事件。我们先使用 Callback 的方式实现:
long lastClickTimeStamp;
btn.setOnClickListener(v -> {if (System.currentTimeMillis() - lastClickTimeStamp < 500) {// handle double click}});
上面的代码很容易理解,我们引入一个中间变量 lastClickTimeStamp
, 通过比较点击事件发生时和上一次点击事件的时间差是否小于 500 毫秒,来确认是否发生了一次双击事件。那么如何通过 RxJava 来实现呢?就和上一个例子一样,我们可以在时间维度对 Observable
发射的事件进行重新组织,只过滤出与上次点击事件间隔小于 500 毫秒的点击事件,代码如下:
Observable<Long> clicks = RxView.clicks(btn).map(o -> System.currentTimeMillis()).share();
clicks.zipWith(clicks.skip(1), (t1, t2) -> t2 - t1).filter(interval -> interval < 500).subscribe(o -> {// handle double click});
我们再一次用到了 zipWith
操作符来对事件流自身相邻的两个元素做比较,另外这次代码中使用了 share
操作符,
用来保证点击事件的 Observable
被转为 Hot Observable。
在
RxJava
中,Observable
可以被分为Hot Observable
与Cold Observable
,引用《Learning Reactive Programming with Java 8》中一个形象的比喻(翻译后的意思):我们可以这样认为,Cold Observable
在每次被订阅的时候为每一个Subscriber
单独发送可供使用的所有元素,而Hot Observable
始终处于运行状态当中,在它运行的过程中,向它的订阅者发射元素(发送广播、事件),我们可以把Hot Observable
比喻成一个电台,听众从某个时刻收听这个电台开始就可以听到此时播放的节目以及之后的节目,但是无法听到电台此前播放的节目,而Cold Observable
就像音乐 CD ,人们购买 CD 的时间可能前后有差距,但是收听 CD 时都是从第一个曲目开始播放的。也就是说同一张 CD ,每个人收听到的内容都是一样的, 无论收听时间早或晚。
仅仅是上面这个双击检测的例子,还不能体现 RxJava 的优越性,我们把需求改得更复杂一点:如果用户在“短时间”内连续多次点击,只能算一次双击操作。这个需求是合理的,因为如果按照上面 Callback 的写法,虽然可以检测出双击操作,但是如果用户快速点击 n 次(间隔均小于 500 毫秒,n >= 2), 就会触发 n - 1 次双击事件,假设双击处理函数里需要发起网络请求,会对服务器造成压力。要实现这个需求其实也简单,和上一个例子类似,我们用到了 debounce
操作符:
Observable<Object> clicks = RxView.clicks(btn).share()
clicks.buffer(clicks.debounce(500, TimeUnit.MILLISECONDS)).filter(events -> events.size >= 2).observeOn(AndroidSchedulers.mainThread()).subscribe(o -> {// handle double click});
评论