写点什么

Flutter 利用 StreamProvider 一起玩 WebSocket

作者:岛上码农
  • 2022 年 5 月 29 日
  • 本文字数:2985 字

    阅读完需:约 10 分钟

Flutter 利用 StreamProvider 一起玩 WebSocket

前言

在实际的应用中,我们经常会遇到数据流的处理,比如监听实时位置时会产生实时的位置信息流,比如 Socket 通讯时需要监听Socket 数据流。对于数据流,在 Flutter 中称之为 Stream,而 ProviderStream专门设计了一个 StreamProvider 来监听数据流的变化,当数据流产生新的数据时就会通知监听组件刷新。本篇我们就来介绍如何利用 StreamProvider 监听 WebSocket 数据。


本篇设计的知识点如下:


  • WebSokcet 客户端封装插件 socket_io_client的使用。

  • StreamProvider 监听 Socket 数据。


为了能够让程序正常运行,已经在后端代码增加了简单的 socket,使用的是和 socket_io_client 配套的服务端socket.ionpm 地址为socket.io,版本为 4.1.3。注意 Flutter 的插件存在版本兼容性,配套该版本的当前处于开发中的版本:^2.0.0-beta.4-nullsafety.0(稳定版的 1.0.1 插件不支持服务端的 4.1.3 版本)。后端源码地址为:后端配套源码

socket_io_client 介绍

socket_io_client 是广受欢迎的 Socket 客户端插件,javascript 也有对应的版本。socket_io_client 对 Dart 的 Socket 进行了封装,从而简化了 socket 的使用。


创建连接的方式如下,其中$host$port对应主机名和端口,后面的是配置参数。使用websocket 时,需要指定 transports(数组)的元素包括 websocketautoConnect 为创建后是否自动连接,默认会自动连接。forceNew 为是否每次创建新的 Socket 对象,如果为 false(默认) 则会使用旧的连接(没有关闭的情况下)。这里我们使用的是每次使用新的 Socket 对象 这是因为退出页面后我们要关闭连接。


_socket = SocketIO.io('ws://$host:$port', <String, dynamic>{  'transports': ['websocket'],  'autoConnect': true,  'forceNew': true});
复制代码


创建好之后就可以设置 Socket的事件响应回调方法,常用的由如下方法:


  • onConnect(EventHandler handler):连接建立成功回调;

  • onConnectTimeout(EventHandler handler):连接超时回调;

  • onConnectError(EventHandler handler):连接错误回调;

  • onError(EventHandler handler):发生错误时回调;

  • on(String event, (EventHandler handler):订阅指定事件的消息,服务端发送该事件的消息时可以在该函数接收。

  • onDisconnect(EventHandler handler):断开连接时回调;

  • connect/disconnect:主动连接/断开连接方法;

  • open/close:打开和关闭方法。


重点是创建连接后,当接收到的数据加入到一个流处理对象中。Flutter 提供了StreamController 类处理 Stream 对象。StreamController 只允许有一个订阅者,可以使用 sink 属性的 add 方法添加新的流数据,完成添加后会通知其订阅者有新的数据产生。因此我们可以使用 StreamProvider 来订阅这个流数据。


我们构建一个 Socket 管理类 StreamSocket,代码如下:


class StreamSocket {  final _socketResponse = StreamController<String>();
Stream<String> get getResponse => _socketResponse.stream;
final String host; final int port; late final Socket _socket;
StreamSocket({required this.host, required this.port}) { _socket = SocketIO.io('ws://$host:$port', <String, dynamic>{ 'transports': ['websocket'], 'autoConnect': true, 'forceNew': true }); }
void connectAndListen() { _socket.onConnect((_) { debugPrint('connected'); });
_socket.onConnectTimeout((data) => debugPrint('timeout')); _socket.onConnectError((error) => debugPrint(error.toString())); _socket.onError((error) => debugPrint(error.toString())); _socket.on('msg', (data) { _socketResponse.sink.add(data); }); _socket.onDisconnect((_) => debugPrint('disconnect')); }
void sendTextMessage(String message) { _socket.emit('msg', message); }
void close() { _socketResponse.close(); _socket.disconnect().close(); }}
复制代码

StreamProvider 应用

StreamProvider 的应用和 FutureProvider,ChangeNotifierProvider 类似,也分为 create 和 value 两种形式:


// create 形式StreamProvider({  Key? key,  required Create<Stream<T>?> create,  required T initialData,  ErrorBuilder<T>? catchError,  UpdateShouldNotify<T>? updateShouldNotify,  bool? lazy,  TransitionBuilder? builder,  Widget? child,})   // value 形式StreamProvider.value({  Key? key,  required Stream<T>? value,  required T initialData,  ErrorBuilder<T>? catchError,  UpdateShouldNotify<T>? updateShouldNotify,  bool? lazy,  TransitionBuilder? builder,  Widget? child,}) 
复制代码


这里我们首先使用一个 StatefulWidget 来管理 StreamSocket 的初始化以及 Socket 的关闭,并将实际业务组件用两个 Provider 包裹,一个是 StreamProvider,一个是 MessageModelStreamProvider用于自动监听 Socket 的数据流,并显示在界面上;MessageModel 目前仅用于发送消息。


class _SocketClientWrapperState extends State<SocketClientWrapper> {  final StreamSocket streamSocket = StreamSocket(host: '127.0.0.1', port: 3001);  @override  Widget build(BuildContext context) {    return Scaffold(      appBar: AppBar(        title: Text('Stream Provicer'),      ),      body: Stack(        alignment: Alignment.bottomCenter,        children: [          StreamProvider<String>(            create: (context) => streamSocket.getResponse,            initialData: '',            child: StreamDemo(),          ),          ChangeNotifierProvider<MessageModel>(            child: MessageReplyBar(messageSendHandler: (message) {              streamSocket.sendTextMessage(message);            }),            create: (context) => MessageModel(),          ),        ],      ),    );  }
@override void initState() { streamSocket.connectAndListen(); super.initState(); }
@override void dispose() { streamSocket.close(); super.dispose(); }}
复制代码


这里我们使用了 Stack 将消息的输入条MessageReplyBar固定在底部,需要注意的是,如果想要输入条自动随着键盘浮动,则需要将其放入到 Scaffoldbody 里,否则会一直固定在底部而被遮挡住。


StreamDemoMessageReplyBar两个组件都很简单,业务逻辑为 MessageReplyBar点击发送按钮将消息通过Socket 发出到服务端。然后 StreamDemo 回显 StreamProviderSocket 接收到的服务端消息。

运行效果

我们配合服务端的打印日志来看运行效果:可以看到连接的建立、发送消息、接收消息和断开连接的过程都是正常的。



源码已上传至:状态管理相关代码 - null safety 版本

总结

本篇介绍了ProviderStreamProvider流状态管理,同时引入了 socket_io_client插件实现了与服务端的 WebSocket 通信,通过StreamProvider订阅流数据,并通知界面刷新,实现了界面层的无侵入Socket数据展示。这一篇比较简单,更多地是介绍两个工具的使用,下一篇我们用这两个工具来实现一个即时聊天应用。



发布于: 刚刚阅读数: 5
用户头像

岛上码农

关注

用代码连接孤岛,公众号@岛上码农 2022.03.03 加入

从南漂到北,从北漂到南的业余码农

评论

发布
暂无评论
Flutter 利用 StreamProvider 一起玩 WebSocket_flutter_岛上码农_InfoQ写作社区