写点什么

【Flutter 专题】80 初识 Flutter Stream (一)

发布于: 3 小时前
【Flutter 专题】80 初识 Flutter Stream (一)

      小菜在之前尝试 EventChannel 时曾经用到过 Stream 流数据,现在准备学习一下 flutter_bloc 时也主要用到 Stream 来做异步处理,于是简单学习一下何为 Stream

A source of asynchronous data events.

      Stream 主要应用于 Flutter 的异步操作,在其他编程语言中也存在;Stream 提供了一种接受事件队列的方法,可通过 listen 进行数据监听,通过 error 接收失败状态,通过 done 来接收结束状态;


1. Stream 创建

      Flutter 提供了多种创建 Stream 的方式;

1.1 Stream.fromFuture(Future<T> future)

      Stream 通过 Future 创建新的单订阅流,当 Future 完成时会触发 data / error,然后以 done 事件结束;


Future<String> getData() async {  await Future.delayed(Duration(seconds: 3));  return '当前时间为:${DateTime.now()}';}
_streamFromFuture() { Stream.fromFuture(getData()) .listen((event) => print('Stream.fromFuture -> $event')) .onDone(() => print('Stream.fromFuture -> done 结束'));}
复制代码


1.2 Stream.fromFutures(Iterable<Future<T>> futures)

      Stream 通过一系列的 Future 创建新的单订阅流,每个 Future 都会有自身的 data / error 事件,当这一系列的 Future 均完成时,Streamdone 事件结束;若 Futures 为空,则 Stream 会立刻关闭;其分析源码,很直接的看到是将每一个 Future 事件监听完之后才会执行的微事件结束;


====================================== 源码 ======================================factory Stream.fromFutures(Iterable<Future<T>> futures) {    _StreamController<T> controller =        new _SyncStreamController<T>(null, null, null, null);    int count = 0;    var onValue = (T value) {      if (!controller.isClosed) {        controller._add(value);        if (--count == 0) controller._closeUnchecked();      }    };    var onError = (error, StackTrace stack) {      if (!controller.isClosed) {        controller._addError(error, stack);        if (--count == 0) controller._closeUnchecked();      }    };    for (var future in futures) {      count++;      future.then(onValue, onError: onError);    }    // Use schedule microtask since controller is sync.    if (count == 0) scheduleMicrotask(controller.close);    return controller.stream;}====================================== 测试 ======================================_streamFromFutures() {  var data = [getData(), getData(), getData()];  Stream.fromFutures(data)      .listen((event) => print('Stream.fromFutures -> $event'))      .onDone(() => print('Stream.fromFutures -> done 结束'));}
复制代码


1.3 Stream.fromIterable(Iterable<T> elements)

      Stream 通过数据集合中获取并创建单订阅流,通过 listen 监听迭代器中每一个子 element,当 Stream 监听到取消订阅或 Iterator.moveNext 返回 false / throw 异常 时停止迭代;


_streamFromIterable() {  var data = [1, 2, '3.toString()', true, false, 6];  Stream.fromIterable(data)      .listen((event) => print('Stream.fromIterable -> $event'))      .onDone(() => print('Stream.fromIterable -> done 结束'));}
复制代码


1.4 Stream.periodic(Duration period, [T computation(int computationCount)])

      Stream 通过 Duration 对象作为参数创建一个周期性事件流,其中若不设置 computationonData 获取数据为 null;若没有事件结束则会一直周期性执行;


_streamFromPeriodic() {  Duration interval = Duration(seconds: 1);  // onData 获取数据为 null  Stream<int> stream = Stream<int>.periodic(interval);  stream.listen((event) {  print('Stream.periodic -> $event');  }).onDone(() {  print('Stream.periodic -> done 结束');  });
// onData 获取数据为 int 类型 data Stream<int> streamData = Stream<int>.periodic(interval, (data) => data); streamData.listen((event) { print('Stream.periodic -> $event'); if (event >= 10) {} }).onDone(() { print('Stream.periodic -> done 结束'); });}
复制代码



2. Stream 基本操作

2.1 Stream<T> take(int count)

      take() 对于单订阅方式,可以提供 take 设置之前的 Stream 订阅数据,例如设置中断 Stream.periodic 周期展示次数;小菜粗略理解为 take 可以作为中断订阅,如果 take 设置次数大于 onDone 之前的订阅数据次数,Stream 依旧获取所有 onDone 之前的订阅数据;


_streamFromPeriodic() {  Duration interval = Duration(seconds: 1);  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.take(5).listen((event) {    print('Stream.periodic -> $event');  }).onDone(() {    print('Stream.periodic -> done 结束');  });}
复制代码



_streamFromIterable() {  var data = [1, 2, '3.toString()', true, false, 6];  Stream.fromIterable(data)      .take(8)      .listen((event) => print('Stream.fromIterable -> $event'))      .onDone(() => print('Stream.fromIterable -> done 结束'));}
复制代码


2.2 Stream<T> takeWhile(bool test(T element))

      takeWhile 也可以实现上述相同效果,通过 test 返回一个 boolean 类型,如果为 false 则中断订阅;


_streamFromPeriodic() {  Duration interval = Duration(seconds: 1);  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData.takeWhile((element) {    print('Stream.periodic.takeWhile -> $element');    return element <= 5;  }).listen((event) {    print('Stream.periodic -> $event');  }).onDone(() {    print('Stream.periodic -> done 结束');  });}
复制代码


2.3 Stream<T> where(bool test(T event))

      where 用于在当前 Stream 中创建一个新的 Stream 用来丢弃不符合 test 的数据;小菜简单理解为类似数据库查询一样,仅过滤符合需求的数据流;且 where 可以设置多次;


Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);streamData.takeWhile((element) {  print('Stream.periodic.takeWhile -> $element');  return element <= 5;}).where((event) {  print('Stream.periodic.where -> $event');  return event > 3;}).listen((event) {  print('Stream.periodic -> $event');}).onDone(() {  print('Stream.periodic -> done 结束');});
复制代码


2.4 Stream<T> distinct([bool equals(T previous, T next)])

      distinct 小菜理解为相邻两个数据滤重;


var data = [1, 2, '3.toString()', true, true, false, true, 6];Stream.fromIterable(data)    .distinct()    .listen((event) => print('Stream.fromIterable -> $event'))    .onDone(() => print('Stream.fromIterable -> done 结束'));
复制代码


2.5 Stream<T> skip(int count)

      skip 用于跳过符合条件的订阅数据次数;


Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);streamData.takeWhile((element) {      print('Stream.periodic.takeWhile -> $element');      return element <= 6;    }).where((event) {      print('Stream.periodic.where -> $event');      return event > 2;    })    .skip(2).listen((event) {      print('Stream.periodic -> $event');    }).onDone(() {      print('Stream.periodic -> done 结束');    });
复制代码


2.6 Stream<T> skipWhile(bool test(T element))

      skipWhile 用于跳过在 where 符合条件下满足设置 test 条件的订阅数据;即当 testtrue 时跳过当前订阅数据监听;


Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);streamData.takeWhile((element) {  print('Stream.periodic.takeWhile -> $element');  return element <= 6;}).where((event) {  print('Stream.periodic.where -> $event');  return event > 2;}).skipWhile((element) {  print('Stream.periodic.skipWhile -> $element');  return element <= 4;}).listen((event) {  print('Stream.periodic -> $event');}).onDone(() {  print('Stream.periodic -> done 结束');});
复制代码


2.7 Stream<S> map<S>(S convert(T event))

      在当前 Stream 基础上创建一个新的 Stream 并对当前 Stream 进行数据操作,onData 监听到的是 map 变更后的新的数据流;


Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);streamData.takeWhile((element) {  print('Stream.periodic.takeWhile -> $element');  return element <= 6;}).where((event) {  print('Stream.periodic.where -> $event');  return event > 2;}).skipWhile((element) {  print('Stream.periodic.skipWhile -> $element');  return element <= 4;}).map((event) {  print('Stream.periodic.map -> $event -> ${event * 100}');  return event * 100;}).listen((event) {  print('Stream.periodic -> $event');}).onDone(() {  print('Stream.periodic -> done 结束');});
复制代码


2.8 Stream<S> expand<S>(Iterable<S> convert(T element))

      在当前 Stream 基础上创建新的 Stream 并将当前订阅数据转为新的订阅数据组onData 监听 数据组 中每个新的订阅数据元素;


Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);streamData.takeWhile((element) {  print('Stream.periodic.takeWhile -> $element');  return element <= 6;}).where((event) {  print('Stream.periodic.where -> $event');  return event > 2;}).skipWhile((element) {  print('Stream.periodic.skipWhile -> $element');  return element <= 4;}).expand((element) {  print('Stream.periodic.expand -> $element -> ${element * 10} -> ${element * 100}');  return [element, element * 10, element * 100];}).listen((event) {  print('Stream.periodic -> $event');}).onDone(() {  print('Stream.periodic -> done 结束');});
复制代码


2.9 Future<int> get length

      Stream 监听订阅事件结束后,符合 where 条件的数量;


_streamLength(index) async {  Duration interval = Duration(seconds: 1);  Stream<int> streamData = Stream<int>.periodic(interval, (data) => data + 1);  streamData = streamData.takeWhile((element) {    print('Stream.periodic.takeWhile -> $element');    return element <= 6;  }).where((event) {    print('Stream.periodic.where -> $event');    return event > 2;  }).skipWhile((element) {    print('Stream.periodic.skipWhile -> $element');    return element <= 4;  });  switch (index) {    case 1:      var length = await streamData.length;      print('Stream.length -> $length');      break;    case 2:      var isEmpty = await streamData.isEmpty;      print('Stream.isEmpty -> $isEmpty');      break;    case 3:      var isBroadcast = await streamData.isBroadcast;      print('Stream.isBroadcast -> $isBroadcast');      break;    case 4:      var first = await streamData.first;      print('Stream.first -> $first');      break;    case 5:      var last = await streamData.last;      print('Stream.last -> $last');      break;  }}
复制代码


2.10 Future<bool> get isEmpty

      Stream 监听订阅事件结束后,统计是否符合 where 条件的订阅数据是否为空;


_streamLength(2);
复制代码


2.11 Future<T> get first

      获取 Stream 符合条件的第一个订阅数据;


_streamLength(4);
复制代码


2.12 Future<bool> get last

      获取 Stream 符合条件的最后一个订阅数据;


_streamLength(5);
复制代码


2.13 Future<List<T>> toList()

      在 Stream 监听结束之后,将订阅数据存储在 List 中,该操作为异步操作;


_streamToList() async {  var data = [1, 2, '3.toString()', true, true, false, true, 6];  Stream stream = Stream.fromIterable(data).distinct();  List list = await stream.toList();  if (list != null) {    print('Stream.toList -> ${list}');    for (int i = 0; i < list.length; i++) {      print('Stream.toList -> ${i + 1} -> ${list[i]}');    }  }}
复制代码


2.14 Future<Set<T>> toSet()

      在 Stream 监听结束之后,将订阅数据存储在 Set 中,Set 可以过滤重复数据;


_streamToSet() async {  var data = [1, 2, '3.toString()', true, true, false, true, 6];  Stream stream = Stream.fromIterable(data);  Set set = await stream.toSet();  if (set != null) {    print('Stream.toSet -> ${set}');  }}
复制代码



2.15 Future forEach(void action(T element))

      监听 Stream 中订阅数据,是对 listen 方式的一种监听;


_streamForEach() {  var data = [1, 2, '3.toString()', true, true, false, true, 6];  Stream stream = Stream.fromIterable(data).distinct();  stream.forEach((element) => print('Stream.forEach -> $element'));}
复制代码





      小菜对 Stream 的尝试才刚刚开始,还有众多方法未曾尝试,对 Stream 的理解还很浅显,如有错误请多多指导!


来源: 阿策小和尚

发布于: 3 小时前阅读数: 4
用户头像

还未添加个人签名 2021.05.13 加入

Android / Flutter 小菜鸟~

评论

发布
暂无评论
【Flutter 专题】80 初识 Flutter Stream (一)