1. TCP 数据传输粘包简介
在本系列的第 6 篇文章《鸿蒙网络编程系列 6-TCP 数据粘包表现及原因分析》中,我们演示了 TCP 数据粘包的表现,如图所示:
随后解释了粘包背后的可能原因,并给出了解决 TCP 传输粘包问题的两种思路,第一种是指定数据包结束标志,在本系列第 35 篇《鸿蒙网络编程系列 35-通过数据包结束标志解决 TCP 粘包问题》中给出了具体的实现,第二种是通过固定包头指定包的长度,本文将通过一个示例演示这种思路的实现。
2. 固定包头可变包体解决 TCP 粘包问题演示
本示例运行后的界面如图所示:
和上一篇文章类似,输入服务端的地址,这里可以使用本系列第 25 篇文章《鸿蒙网络编程系列 25-TCP 回声服务器的实现》中创建的 TCP 回声服务器,也可以使用其他类似的回声服务器;然后输入服务器端口,最后单击"测试"按钮循环发送 0 到 99 的数字字符串到服务端,服务端会回传收到的信息,本示例在收到服务器信息后在日志区域输出,如图所示:
从中可以看出,这次也彻底解决了数据粘包问题,收到的信息和发送时保持一致。
3. 固定包头可变包体解决 TCP 粘包问题示例编写
下面详细介绍创建该示例的步骤。
步骤 1:创建 Empty Ability 项目。
步骤 2:在 module.json5 配置文件加上对权限的声明:
"requestPermissions": [
{
"name": "ohos.permission.INTERNET"
}
]
复制代码
这里添加了访问互联网的权限。
步骤 3:在 Index.ets 文件里添加如下的代码:
import { socket } from '@kit.NetworkKit';
import { Decimal, util, buffer } from '@kit.ArkTS';
import { BusinessError } from '@kit.BasicServicesKit';
@Entry
@Component
struct Index {
@State title: string = '固定包头可变包体演示示例';
//服务端端口号
@State port: number = 9990
//服务端IP地址
@State serverIp: string = ""
//操作日志
@State msgHistory: string = ''
//最大缓存长度
maxBufSize: number = 1024 * 8
//接收数据缓冲区
receivedDataBuf: buffer.Buffer = buffer.alloc(this.maxBufSize)
//缓冲区已使用长度
receivedDataLen: number = 0
//日志显示区域的滚动容器
scroller: Scroller = new Scroller()
build() {
Row() {
Column() {
Text(this.title)
.fontSize(14)
.fontWeight(FontWeight.Bold)
.width('100%')
.textAlign(TextAlign.Center)
.padding(10)
Flex({ justifyContent: FlexAlign.Start, alignItems: ItemAlign.Center }) {
Text("服务端地址:")
.fontSize(14)
.width(90)
TextInput({ text: this.serverIp })
.onChange((value) => {
this.serverIp = value
})
.height(40)
.width(80)
.fontSize(14)
.flexGrow(1)
Text(":")
.fontSize(14)
TextInput({ text: this.port.toString() })
.onChange((value) => {
this.port = parseInt(value)
})
.height(40)
.width(70)
.fontSize(14)
Button("测试")
.onClick(() => {
this.test()
})
.height(40)
.width(60)
.fontSize(14)
}
.width('100%')
.padding(10)
Scroll(this.scroller) {
Text(this.msgHistory)
.textAlign(TextAlign.Start)
.padding(10)
.width('100%')
.backgroundColor(0xeeeeee)
}
.align(Alignment.Top)
.backgroundColor(0xeeeeee)
.height(300)
.flexGrow(1)
.scrollable(ScrollDirection.Vertical)
.scrollBar(BarState.On)
.scrollBarWidth(20)
}
.width('100%')
.justifyContent(FlexAlign.Start)
.height('100%')
}
.height('100%')
}
//测试
async test() {
//服务端地址
let serverAddress: socket.NetAddress = { address: this.serverIp, port: this.port, family: 1 }
//执行TCP通讯的对象
let tcpSocket: socket.TCPSocket = socket.constructTCPSocketInstance()
//收到消息时的处理
tcpSocket.on("message", (value: socket.SocketMessageInfo) => {
this.receiveMsgFromServer(value)
})
await tcpSocket.connect({ address: serverAddress })
.then(() => {
this.msgHistory += "连接成功\r\n";
})
.catch((e: BusinessError) => {
this.msgHistory += `连接失败 ${e.message} \r\n`;
})
//循环发送0到99的数字字符串到服务端
for (let i = 0; i < 100; i++) {
let msg = i.toString()
await this.sendMsg2Server(tcpSocket, msg)
let sleepTime = Decimal.random().toNumber() + 0.5
//休眠sleepTime时间,大概0.5毫秒到1.5毫秒
await sleep(sleepTime)
}
}
//发送数据到服务端
async sendMsg2Server(tcpSocket: socket.TCPSocket, msg: string) {
let textEncoder = new util.TextEncoder();
let encodeValue = textEncoder.encodeInto(msg)
let sendBuf = buffer.alloc(2 + encodeValue.byteLength)
//写入固定包头中的长度信息
sendBuf.writeUInt16LE(encodeValue.byteLength)
//写入可变包体信息
sendBuf.write(msg, 2)
await tcpSocket.send({ data: sendBuf.buffer })
}
//读取服务端发送过来的数据
receiveMsgFromServer(value: socket.SocketMessageInfo) {
//把接收到的数据复制到缓冲区有效数据尾部
let copyCount = buffer.from(value.message).copy(this.receivedDataBuf, this.receivedDataLen)
this.receivedDataLen += copyCount
//至少写入了3个字节才需要解析
if (this.receivedDataLen < 3) {
return;
}
//当前数据包长度
let packLen = this.receivedDataBuf.readUInt16LE()
let textDecoder = util.TextDecoder.create("utf-8");
//当前数据包长度加上固定包体的2字节,如果小于等于缓冲区已使用长度,就可以解析
while ((packLen + 2) <= this.receivedDataLen) {
//把可变包体中的数据转换为字符串
let msgArray = new Uint8Array(this.receivedDataBuf.subarray(2, packLen + 2).buffer);
let msg = textDecoder.decodeToString(msgArray)
//剩余的未解析数据
let leaveBufData = this.receivedDataBuf.subarray(packLen + 2, this.receivedDataLen)
//剩余的未解析数据移动到缓冲区头部
for (let pos = 0; pos < leaveBufData.length; pos++) {
this.receivedDataBuf.writeUInt8(leaveBufData.readUInt8(pos), pos)
}
//重新设置缓冲区已使用长度
this.receivedDataLen = leaveBufData.length
//输出接收的数据到日志
this.msgHistory += "S:" + msg + "\r\n"
//至少写入了3个字节才需要解析,否则跳出循环
if (this.receivedDataLen < 3) {
break;
}
//开始查找下一个固定包头中的可变包体长度
packLen = this.receivedDataBuf.readUInt16LE()
}
this.scroller.scrollEdge(Edge.Bottom)
}
}
//休眠指定的毫秒数
function sleep(time: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, time));
}
复制代码
步骤 4:编译运行,可以使用模拟器或者真机。
步骤 5:按照本文第 2 部分“数据包结束标志解决 TCP 粘包问题演示”操作即可。
4. 代码分析
本示例的关键点在于构造数据包的格式,具体数据包的格式是这样的,前两个字节为固定的包长度,使用小端的 16 位无符号整数表示,后面是包内容。以发送数据包为例,代码如下所示:
async sendMsg2Server(tcpSocket: socket.TCPSocket, msg: string) {
let textEncoder = new util.TextEncoder();
let encodeValue = textEncoder.encodeInto(msg)
let sendBuf = buffer.alloc(2 + encodeValue.byteLength)
//写入固定包头中的长度信息
sendBuf.writeUInt16LE(encodeValue.byteLength)
//写入可变包体信息
sendBuf.write(msg, 2)
await tcpSocket.send({ data: sendBuf.buffer })
}
复制代码
这里首先把要发送的内容编码为 Uint8Array 类型,然后为缓冲区分配长度,长度为内容编码后的长度加上 2,随后把内容长度作为无符号数写入缓冲区,然后把发送的内容也写入缓冲区,最后使用 TCP 客户端发送缓冲区到服务端。
接收时,首先把所有收到的数据都复制到接收缓冲区中,然后从缓冲区头部取两个字节作为数据包内容长度,然后判断接收缓冲区中已接收的数据是不是大于等于数据包内容长度加 2,如果是,说明接收到了完整的数据包,就可以从中提取内容了,提取完毕把剩下的缓冲区数据移动到缓冲区头部,继续下一次循环,从缓冲区中提取完整数据包的数据,知道已接收的缓冲区小于数据包长度加 2 为止。相关代码位于方法 receiveMsgFromServer 中,源码包含了详细的注释,这里就不再赘述了。
(本文作者原创,除非明确授权禁止转载)
本文源码地址:
https://gitee.com/zl3624/harmonyos_network_samples/tree/master/code/tcp/PacketHeadWithLen
本系列源码地址:
https://gitee.com/zl3624/harmonyos_network_samples
评论