SpringCloudRPC 调用核心原理:RxJava 响应式编程框架,聚合操作符
@Test
public void countDemo()
{
String[] items = {"one", "two", "three", "four"};
Integer count = Observable
.from(items)
.count() .toBlocking().single();
log.info("计数的结果为 {}",count);
}
}
运行以上代码,输出的结果节选如下:
[main] INFO?
c.c.d.r.basic.AggregateDemo - 计数的结果为 4 可以看出,count 操作符将一个 Observable 源流转换成一个弹射单个值的 Observable 输出流,输出流的唯一数据项的值为原始 Observable 流所弹射的数据项数量。
在上面的代码中,为了获取 count 输出流中的数据项,使用了 toBlocking()和 single()两个操作符。其中,Observable.toBlocking()操作返回了一个 BlockingObservable 阻塞型实例,该类型不是一种新的数据流,仅仅是对源 Observable 的包装,只是该类型会阻塞当前线程,一直等待直到内部的源 Observable 弹射了自己想要的数据。BlockingObservable.single()方法表示阻塞当前线程,直到从封装的源 Observable 获取到唯一的弹射数据元素项,如果 Observable 源流弹射出的数据元素不止一个,single()方法就会抛出异常。
reduce 操作符
==========
Reduce(归约)操作符对一个 Observable 流序列的每一项应用一个归约函数,最后将流的最终归约计算结果弹射出去。除了第一项之外,reduce 操作符会将上一个数据项应用归约函数的结果作为下一个数据项在应用归约函数时的输入。所以,和 scan 操作符一样,reduce 操作符也有点类似递归操作。
假定归约函数为一个简单的累加函数,然后使用 reduce 操作符对 1~5 的数据流序列进行归约,其具体的归约流程如图 4-10 所示。
图 4-10 reduce 操作符对 1~5 的数据流序列的归约流程
使用 reduce 操作符实现对 1~5 的数据流序列的归约,参考如下的实现代码:
package com.crazymaker.demo.rxJava.basic;
//省略 import
@Slf4j
public class AggregateDemo
{
/**
演示
操作符 *演示 reduce 操作符
*/
@Test
public void reduceDemo()
{
/**
*定义一个 accumulator 归约函数
*/
Func2<Integer, Integer, Integer> accumulator =
new Func2<Integer, Integer, Integer>()
{
@Override
public Integer call(Integer input1, Integer input2)
{
log.info(" {} + {} = {} ", input1, input2, input1 + input2);
return input1 + input2;
}
};
/**
*使用 reduce 进行流归约
*/
Observable.range(1, 5)
.reduce(accumulator)
.subscribe(new Action1<Integer>()
{
@Override
public void call(Integer sum)
{
log.info(" 归约的结果: {} ", sum);
}
});
}
}
评论