开源 IM 项目 OpenIM 客户端 SDK 架构剖析 - 确保消息的有序性,以及消息百分百可达
开源 IM 项目 OpenIM 第二版对于客户端架构进行了局部重构,解决了消息触发时序等 bug,也梳理了内部模块。目前已经接近尾声,本文重点讲解 SDK 架构,以便大家深入了解 OpenIM,并希望大家能深度参与开发。很多开发者有个误区,认为 IM 的挑战主要在服务端,当然服务端有其挑战,包括性能、压力、时延等,但优秀的 IM 架构需要服务端和客户端完美配合,比如消息对齐机制,本地缓存和后台数据同步,app 多端如何实时同步。
github 6.5K star 具体地址:
OpenIM Corporationgithub.com/OpenIMSDK
客户端重点问题总结:
(1)如何确保消息有序性;
(2)如何确保消息百分百可达;
(2)如果确保本地 db 和服务端数据的一致性;
(3)如何高效地实现多端同步;
(4)如果确保消息即时达到;
(5)消息发送的异步性,如何确保消息发送的一致性;
本文从架构角度重点解答第 1,2 两个问题
客户端模块划分和协程模型
WsConn:ws 连接管理器。提供函数供其他方调用,具体包括:
(1)ws 连接服务端,和 OpenIM 服务端保持长连接;
(2)关闭 ws 连接;
(3)通过 ws 发送请求;
WsRespAsyn:ws 请求-响应同步器,因为 ws 是异步处理,需要把请求和响应关联起来,提供函数供其他方调用(消息发送,心跳发送,拉取历史消息等)
(1)getCh:为每个请求生成一个 channel 和 msgIncr,使用 map 关联起来 msgIncr->channel
(2)notifyResp:对于 ws 收到的每个响应,通过 msgIncr 找到 channel,并往 channel 发送响应,通知响应到达;
Ws:模块对 WsConn 和 WsRespAsyn 功能进行整合(1)请求响应同步化,提供函数 SendReqWaitResp,调用者通过 ws 发送请求后,等待此请求的响应达到。(2)对于接收到的推送消息,把消息写入 PushMsgAndMaxSeqCh channel,触发 MsgSync 消息同步协程。
具体实现:ReadData 协程:接收服务端 ws 数据,并根据收到的数据类型(心跳、推送、踢出登录、拉取历史消息等),触发不同的逻辑处理,(1)对于主动发送请求的响应,则调用 WsRespAsyn 的 notifyResp 响应触发接口;(2)对于 push 消息,写入 PushMsgAndMaxSeqCh ,触发 MsgSync 消息同步协程。
MsgSync:消息同步器;包含 Ws 和 conversationCh 、 PushMsgAndMaxSeqCh ,启动消息同步协程,对 PushMsgAndMaxSeqCh 中的读取的数据做处理,具体包括:
(1)从 PushMsgAndMaxSeqCh 读取服务端最大 seq:SvrMaxSeq(由 heartbeat 写入的),对比本地最大 seq:LocalMaxSeq 和服务端最大 seq: SvrMaxSeq,计算出缺失的 seq,从服务器拉取历史消息,放入 conversationCh ,触发 conversation 协程处理;
(2)从 PushMsgAndMaxSeqCh 读取 ws 推送消息(由 Ws 的 ReadData 写入的推送消息),如果消息中的 seq+1==LocalMaxSeq,则写入 conversationCh,触发 conversation 处理,否则从服务端拉取消息补齐[LocalMaxSeq+1, seq],放入 conversationCh ,触发 conversation 协程处理;
heartbeat:心跳管理器,包括 MsgSync
(1)心跳协程,从服务端定时获取最大 seq:SvrMaxSeq,然后把 SvrMaxSeq 让入 PushMsgAndMaxSeqCh ,触发 MsgSync 消息同步协程。
心跳和消息同步融合
在心跳逻辑中触发消息同步
(1)心跳协程每 30 秒通过 ws 从服务端获取最大 seq:SvrMaxSeq;
(2)心跳协程把 SvrMaxSeq 写入 PushMsgAndMaxSeqCh ,触发 MsgSync 消息同步协程;
(3)MsgSync 消息同步协程从 PushMsgAndMaxSeqCh 中读取 SvrMaxSeq,
(4)MsgSync 消息同步协程对比本地最大 seq: LocalMaxSeq 和 SvrMaxSeq,如果有缺失,则通过 ws 拉取历史消息,范围为:[localMaxSeq+1,SvrMaxSeq],
(6)MsgSync 消息同步协程把拉取到的缺失的历史消息写入 conversationCh 中;
(7)msg-conversation 消息会话协程从 conversationCh 中读取缺失的历史消息,按照消息类型做业务处理,具体包括消息落地本地 db,触发新消息回调,触发会话改变回调(或新增回调)
push 消息触发同步
以 push 消息触发同步:
(1)Ws 的 ReadData 协程收到服务端的推送消息,
(2)Ws 的 ReadData 协程把推送消息写入 PushMsgAndMaxSeqCh ,触发 MsgSync 消息同步协程。
(3)MsgSync 消息同步协程从 PushMsgAndMaxSeqCh 中读取推送消息,如果 msg 中的 seq 比本地最大 seq 大 1,则跳过第 4 步,直接写入 conversationCh,触发 conversation 处理;
(4)服务端拉取消息补齐[LocalMaxSeq+1, seq],放入 conversationCh ,触发 conversation 协程处理;
(5)以下基本与以心跳触发同步过程一样。
总结
由于 seq 是按照消息的客观事件递增生成的,对于推送消息,如果比本地最大 seq 大 1,则消息可以无缝对接。否则要么推送的是过时消息,要么推送消息和本地消息有差异,需要通过 ws 拉取后写入本地,并触发相应回调。至此,客户端消息和服务端消息完全同步,并保证新消息回调的有序性。
重点参考我们开发文档:https://doc.rentsoft.cn/
github 地址:OpenIM Corporation
评论