前面的三篇文章已经从开发环境搭建实现到了握手协议:
为了方便我们后续的分析,先了解两个概念:
1.准备知识
1.1 大小端
举个🌰说明:
假设某段内存中存放以下这样的数据低地址 高地址
------------------------------------>
+--------+--------+--------+--------+
| 11 | 66 | 85 | 27 |
+--------+--------+--------+--------+
复制代码
1.2AMF
Action Message Format: A compact binary format that is used to serialize ActionScript object graphs.有两个版本:AMF0 和 AMF3.AMF3 用作 Flash Playe 9 的 ActionScript 3.0 的默认序列化格式,而 AMF0 则用作旧版的 ActionScript 1.0 和 2.0 的序列化格式。 在网络传输数据方面,AMF3 比 AMF0 更有效率。AMF3 能将 int 和 uint 对象作为整数(integer)传输,并且能序列化 ActionScript 3.0 才支持的数据类型, 比如 ByteArray,XML 和 Iexternalizable。
对应类型有:
rtmp 协议中数据都是大端的,所以在放数据前都要将数据转成大端的形式。
1.2.1 number 类型
其实就是 double,占 8bytes。比如:00 00 00 00 00 00 00 00,第一个 byte 为 amf 类型,其后 8bytes 为 double 值 0.0。
1.2.2 boolean
就是布尔类型,占用 1byte 比如:01 00,第一个 byte 为 amf 类型,其后 1byte 是值,false。
1.2.3 string
就是字符类型,一个 byte 的 amf 类型,两个 bytes 的字符长度,和 N 个 bytes 的数据。比如:02 00 02 33 22,第一个 byte 为 amf 类型,其后两个 bytes 为长度,注意这里的 00 02 是大端模式,33 22 是字符数据。
1.2.4 null
是空,只有一个 byte,0x05。
1.2.5 object 类型
object 要复杂点,第一个 byte 是 03 表示 object,其后跟的是 N 个(key+value)。最后以 00 00 09 表示 object 结束。
key 是一个字符串组成:2bytes 的长度,N bytes 的数据,就是表示 value 的作用,相当于 value 的名字。
value 可以使 amf 任意一种类型,包括 object。格式和单独的 amf type 一样,如果是 object 的话,相当于在里面再嵌套一个 object。
level(key)后面的 status 就是 value,此 value 是 string 类型,所以格式是上面提到的 string 类型(0x02)。
1.2.6 ECMA_ARRAY(0x08)
实际上和 object 差不多,只是在 0x08 类型后面多了 4 个 bytes 的记录总共多少 items 的东西,目测全部填 00 也可以,也是以 00 00 09 结束。
0x11 类型是 amf3 的类型,amf3 实际上外层是封了一层 amf0,为了与 amf0 兼容
1.3 rtmp url 说明
下面四种都是合法的 rtmp url:
url_schema_normal: rtmp://vhost:port/app/stream, the vhost put in host field, using DNS to resolve the server ip.
url_schema_via : rtmp://ip:port/vhost/app/stream, VIA(vhost in app), the vhost put in app field.
url_schema_vis : rtmp://ip:port/app/stream?vhost=xxx, VIS(vhost in stream), the vhost put in query string, keyword use vhost=xxx.
url_schema_vis2 : rtmp://ip:port/app/stream?domain=xxx, keyword use domain=xxx.
在进行握手前,要对 rtmp url 进行解析,解析出 host, vhost,app, stream,param 等字段.
我们之前深入理解rtmp(三)之手把手实现握手协议介绍到,握手是发送一定"格式"的固定字节,我们接下来的 connect,createStream 以及音视频数据的内容也都是要有"格式"的,这个"格式"我们在 RTMP 里面是以"消息"出现的.
2.RTMP 消息
我们先看一下 rtmp 消息格式:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Message Type | Payload length |
| (1 byte) | (3 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Timestamp |
| (4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Stream ID |
| (3 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Message Header
复制代码
其中:
1 字节消息类型
3 字节负载消息长度
4 字节时间戳
3 字节 Stream ID,区分消息流
Message Header 第一个字节 Message Type(消息类型)很重要,它代表了这个消息是什么类型,当写程序的时候需要根据不同的消息,做不同的处理.我们接下来先了解下 RTMP 有哪些消息类型.
2.1 RTMP 消息分类
消息类型 Message Type 下面都简写为 MT
2.1.1 协议控制消息
协议控制消息是用来与对端协调控制的
MT 的范围 1~7:
Message Stream ID=0 和 Chunk Stream ID=2
MT=1, Set Chunk Size 设置块的大小,通知对端用使用新的块大小,共 4 bytes。默认大小是 128 字节
MT=2, Abort Message 取消消息,用于通知正在等待接收块以完成消息的对等端,丢弃一个块流中已经接收的部分并且取消对该消息的处理,共 4 bytes。
MT=3, Acknowledgement 确认消息,客户端或服务端在接收到数量与窗口大小相等的字节后发送确认消息到对方。窗口大小是在没有接收到接收者发送的确认消息之前发送的字节数的最大值。服务端在建立连接之后发送窗口大小。本消息指定序列号。序列号,是到当前时间为止已经接收到的字节数。共 4 bytes。
MT=4, User Control Message 用户控制消息,客户端或服务端发送本消息通知对方用户的控制事件。本消息承载事件类型和事件数据。消息数据的头两个字节用于标识事件类型。事件类型之后是事件数据。事件数据字段是可变长的。
MT=5, Window Acknowledgement Size 确认窗口大小,客户端或服务端发送本消息来通知对方发送确认消息的窗口大小,共 4 bytes.
MT=6, Set Peer Bandwidth 设置对等端带宽,客户端或服务端发送本消息更新对等端的输出带宽。发送者可以在限制类型字段(1 bytes)把消息标记为硬(0),软(1),或者动态(2)。如果是硬限制对等端必须按提供的带宽发送数据。如果是软限制,对等端可以灵活决定带宽,发送端可以限制带宽?。如果是动态限制,带宽既可以是硬限制也可以是软限制。
2.1.2 音频数据消息
2.1.3 视频数据消息
2.1.4 元数据消息
MT=15 或 18, Data message, 客户端或服务端通过本消息向对方发送元数据和用户数据。元数据包括数据的创建时间、时长、主题等细节。消息类型为 18 的用 AMF0 编码,消息类型为 15 的用 AMF3 编码。
2.1.5 共享对象消息
2.1.6 命令消息
命令消息的类型有:
2.1.6.1 NetConnection Commands(连接层的命令)
代表服务端和客户端之间连接的更高层的对象。包含 4 个命令类型。
connect:该命令是 client 先发送给 server,意思是我要连接,能建立连接吗?server 返回含“_result”或者“_error”命令名, 返回“_result”,表示 server 能提供服务,client 可以进行下一步。“_error”,很明显 server 端不能提供服务。
call:NetConnection 对象的调用方法在接收端运行远程过程调用。远程方法的名作为调用命令的参数。
close:不知道为何协议里没写这个命令的内容,我猜应该是 close connect。
createStream:客户端发送本命令到服务端创建一个消息通讯的逻辑通道。音频,视频和元数据的发布是由创建流命令建立的流通道承载的。
RTMP 握手之后先发送一个 connect 命令消息,命令里面包含什么东西,协议中没有具体规定,实际通信中要指定一些编解码的信息,并以 AMF 格式发送, 下面是用 wireshake 抓取 connect 命令需要包含的参数信息:
这些信息协议中并没有特别详细说明, 在 librtmp,srs-librtmp 这些源码中,以及用 wireshark 抓包的时候可以看到。
服务器返回的是一个_result 命令类型消息,这个消息的 payload length 一般不会大于 128 字节,但是在最新的 nginx-rtmp 中返回的消息长度会大于 128 字节。
消息的 transactionID 是用来标识 command 类型的消息的,服务器返回的_result 消息可以通过 transactionID 来区分是对哪个命令的回应,connect 命令发完之后还要发送其他命令消息,要保证他们的 transactionID 不相同。
发送完 connect 命令之后一般会发一个 set chunk size 消息来设置 chunk size 的大小,也可以不发。
Window Acknowledgement Size 是设置接收端消息窗口大小,一般是 2500000 字节,即告诉对端在收到设置的窗口大小长度的数据之后要返回一个 ACK 消息。在实际做推流的时候推流端要接收很少的服务器数据,远远到达不了窗口大小,所以这个消息可以不发。而对于服务器返回的 ACK 消息一般也不做处理,默认服务器都已经收到了所有消息了。
之后要等待服务器对于 connect 消息的回应的,一般是把服务器返回的 chunk 都读完,组包成完整的 RTMP 消息,没有错误就可以进行下一步了。
connect 消息结构如下:
call 消息结构如下:
call 消息中的 TransactionID 不为 0 的话,对端需要对该命令做出响应,响应的消息结构如下:
创建完 RTMP 连接之后就可以创建 RTMP 流,客户端要想服务器发送一个 releaseStream 命令消息,之后是 FCPublish 命令消息,在之后是 createStream 命令消息。当发送完 createStream 消息之后,解析服务器返回的消息会得到一个 stream ID。这个 ID 也就是以后和服务器通信的 message stream ID, 一般返回的是 1,不固定。
createStream 消息结构如下:
NetConnection 本身是默认的流通道,具有流 ID 0。协议和一少部分命令消息,包括创建流,就使用默认的通讯通道。
2.1.6.2 NetStream Commands(流连接上的命令)
Netstream 建立在 NetConnection 之上,通过 NetConnection 的 createStream 命令创建,用于传输具体的音频、视频等信息。在传输层协议之上只能连接一个 NetConnection,但一个 NetConnection 可以建立多个 NetStream 来建立不同的流通道传输数据。以下会列出一些常用的 NetStream Commands,服务端收到命令后会通过 onStatus 的命令来响应客户端,表示当前 NetStream 的状态。onStatus 命令的消息结构如下:
NetStream Commands1:play(播放)
+-------------+ +----------+
| Play Client | | | Server |
+-------------+ | +----------+
| |Handshaking and Application| |
| | connect done | |
| | |
---+---- |---------Command Message(createStream) --------->|
Create | |
Stream | |
---+---- |<-------------- Command Message -----------------|
| (_result- createStream response) |
| |
---+---- |------------ Command Message (play) ------------>|
play | |
| |<---------------- SetChunkSize ------------------|
| | |
| |<----- User Control (StreamIsRecorded) ----------|
| | |
| |<-------- UserControl (StreamBegin) -------------|
| | |
| |<---- Command Message(onStatus-play reset) ------|
| | |
| |<---- Command Message(onStatus-play start) ------|
| | |
| |------------------ Audio Message---------------->|
| | |
| |------------------ Video Message---------------->|
| | |
|
|
Keep receiving audio and video stream till finishes
复制代码
客户端从服务端接收到流创建成功消息,发送播放命令到服务端。
接收到播放命令后,服务端发送协议消息设置块大小。
服务端发送另一个协议消息(用户控制消息),并且在消息中指定事件” streamisrecorded” 和流 ID 。消息承载的头 2 个字,为事件类型,后 4 个字节为流 ID 。
服务端发送事件” streambegin” 的协议消息(用户控制),告知客户端流 ID 。
服务端发送响应状态命令消息NetStream.Play.Start
&NetStream.Play.reset
, 如果客户端发送的播放命令成功的话。只有当客户端发送的播放命令设置了 reset
命令的条件下,服务端才发送NetStream.Play.reset
消息。如果要发送的流 没有找的话,服务端发送NetStream.Play.StreamNotFound
消息。在此之后服务端发送客户端要播放的音频和视频数据。
play 命令的结构如下:
NetStream Commands2:play2(播放)
和播放命令不同,play2 命令可以切换到不同的码率,而不用改变已经播放的内容的时间线。服务端对播放 2 命令可以请求的多个码率维护多个文件。
NetStream Commands3:deleteStream(删除流)
当 NetStream 对象销毁的时候发送删除流命令。
NetStream Commands4:closeStream
NetStream Commands5:receiveAudio(接收音频)
NetStream 对象发送接收音频消息通知服务端发送还是不发送音频到客户端。
NetStream Commands6:receiveVideo(接收视频)
NetStream 对象发送 receiveVideo 消息通知服务端是否发送视频到客户端。
NetStream Commands7:publish(推送数据)
推流准备工作的最后一步是 Publish Stream,即向服务器发一个 publish 命令消息,这个命令的 message stream ID 就是上面 create stream 之后服务器返回的 stream ID,发完这个命令一般不用等待服务器返回的回应,直接发送音视频类型的 RTMP 数据包即可。有些 rtmp 库还会发 setMetaData 消息,这个消息可以发也可以不发,里面包含了一些音视频 meta data 的信息,如视频的分辨率等等。
+-------------+ +----------+
| Client | | | Server |
+-------------+ | +----------+
| | Handshaking Done | |
| | |
| | |
---+---- |--------- Command Message(connect) --------->|
| | |
Connect |<---------- Window Acknowledge Size -------------|
| | |
| |<------------- Set Peer BandWidth ---------------|
| | |
| |----------- Window Acknowledge Size ------------>|
| | |
| |<--------- User Control(StreamBegin) ------------|
| | |
---+---- |--------------- Command Message ---------------->|
| (_result- connect response) |
| |
---+---- |---------Command Message(createStream) --------->|
Create | |
Stream | |
---+---- |<-------------- Command Message -----------------|
| (_result- createStream response) |
| |
---+---- |--------- Command Message (publish) ------------>|
| | |
publish |<-------- UserControl (StreamBegin) -------------|
| | |
| |---------- Data Message (Metadata) ------------->|
| | |
| |------------------ Audio Message---------------->|
| | |
| |----------------- SetChunkSize ----------------->|
| | |
| |<--------------- Command Message ----------------|
| | (_result- publish result) |
| |------------------ Video Message---------------->|
|
|
Until the stream is complete
复制代码
publish 命令结构如下:
NetStream Commands8:seek(定位流的位置)
定位到视频或音频的某个位置,以毫秒为单位。客户端发送搜寻命令在一个媒体文件中或播放列表中搜寻偏移。seek 命令的结构如下:
NetStream Commands9:pause(暂停)
客户端告知服务端停止或恢复播放, 客户端发送暂停命令告诉服务端暂停或开始一个命令。pause 命令的结构如下:
2.1.7 聚合消息
+---------+-------------------------+
| Header | Aggregate Message body |
+---------+-------------------------+
聚合消息的格式
+--------+--------------+--------------+--------+-------------+---------------+ - - - -
|Header 0|Message Data 0|Back Pointer 0|Header 1|Message Data 1|Back Pointer 1|
+--------+--------------+--------------+--------+--------------+--------------+ - - - -
聚合消息的body
复制代码
Back Pointer 包含了前面消息的大小(包括 Header 的大小)。这个设置匹配了 flv 文件格式,可用于后向搜索。
2.2 接收命令消息反馈结果 ResponseCommand
通过块消息携带的数据,拼接成消息内容,通过 AMF 解读消息内容
#define RTMPConnectSuccess @"NetConnection.Connect.Success"
#define RTMPPublishStart @"NetStream.Publish.Start"
#define RTMPPublishBadName @"NetStream.Publish.BadName"
#define RTMPPlayStart @"NetStream.Play.Start"
#define RTMPPlayReset @"NetStream.Play.Reset"
#define RTMPPlayStreamNotFound @"NetStream.Play.StreamNotFound"
typedef enum : char {
RTMPResponseCommand_Result = 0x1, //_Result命令
RTMPResponseCommandOnBWDone = 0x2, //OnBWDone命令
RTMPResponseCommandOnFCPublish = 0x3, //OnFCPublish命令
RTMPResponseCommandOnStatus = 0x4, //OnStatus命令
RTMPResponseCommandOnFCUnpublish = 0x5, //OnFCUnpublish命令
RTMPResponseCommandOnMetaData = 0x6, //OnMetaData命令
RTMPResponseCommandUnkonwn = 0x7f,//未知类型
} RTMPResponseCommandType;
复制代码
上面分析完了 rtmp 消息类型,但实际 RTMP 通信中并未按照上述格式去发送 RTMP 消息,而是将 RTMP 消息分块发送,而且必须在一个 Chunk 发送完成之后才能开始发送下一个 Chunk。每个 Chunk 中带有 MessageID 代表属于哪个 Message,接收端也会按照这个 id 来将 chunk 组装成 Message。为什么 RTMP 要将 Message 拆分成不同的 Chunk 呢?通过拆分,数据量较大的 Message 可以被拆分成较小的“Message”,这样就可以避免优先级低的消息持续发送阻塞优先级高的数据,比如在视频的传输过程中,会包括视频帧,音频帧和 RTMP 控制信息,如果持续发送音频数据或者控制数据的话可能就会造成视频帧的阻塞,然后就会造成看视频时最烦人的卡顿现象。同时对于数据量较小的 Message,可以通过对 Chunk Header 的字段来压缩信息,从而减少信息的传输量。
下面了解下 RTMP 消息分块
2.3 RTMP 消息分块
每个 Chunk 都由 <font color=red>Chunk Header + Chunk Data </font>组成:
+-------+ +--------------+----------------+
| Chunk | = | Chunk Header | Chunk Data |
+-------+ +--------------+----------------+
复制代码
2.3.1 Chunk Header
Chunk Header 由 Basic Header + Message Header + ExtendedTimestamp(不一定存在)组成:
+--------------+ +-------------+----------------+-------------------+
| Chunk Header | = | Basic header| Message Header |Extended Timestamp |
+--------------+ +-------------+----------------+-------------------+
复制代码
2.3.1.1 Basic Header(基本的头信息) (1~3 byte)
+-+-+-+-+-+-+-+-+
|fmt| cs id |
+-+-+-+-+-+-+-+-+
复制代码
<font color=red>chuck stream = cs</font>
0 1
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt|0 0 0 0 0 0|the second byte|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
复制代码
②csid 在 64~65599 的范围内时(①与②存在交集,原则上交集部分选择①),csidTS=0x3f,bit 位全为 1 时,csid=(第二个字节的值×256)+(第三个字节的值)+64
0 1 2
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|fmt|1 1 1 1 1 1|the second byte| the third byte|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
复制代码
③csid 在 3~63 的范围内时,csidTS=1~0x3e,即 6 位 bit 非全 0 也非全 1 时,csid=csidTS
0
0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
|fmt|0<csid<0x3f|
+-+-+-+-+-+-+-+-+
复制代码
注意:这里第一个字节的 2~7bit 的值暂时称之为 csidTS 由此可见:Basic Header 的长度范围为 1~3 byte,具体多少 byte 是 csidTS 决定的,csid 的值范围 3~65599,0~2 作为保留。
2.3.1.2 Chuck Message Header(块消息的消息头信息)(0、3、7、11 byte)
包含了要发送的实际信息(可能是完整的,也可能是一部分)的描述信息。Message Header 的格式和长度取决于 Basic Header 的 chunk type,共有 4 种不同的格式,由上面所提到的 Basic Header 中的 fmt 字段控制。其中第一种格式可以表示其他三种表示的所有数据,但由于其他三种格式是基于对之前 chunk 的差量化的表示,因此可以更简洁地表示相同的数据,实际使用的时候还是应该采用尽量少的字节表示相同意义的数据。以下按照字节数从多到少的顺序分别介绍这 4 种格式的 Chuck Message Header
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp | message length:
+-------------------------------+---------------+---------------+
: |message type id| :
+-------------------------------+---------------+---------------+
: message stream id |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
复制代码
timestamp(时间戳): 占 3 byte 最大表示 16777215=0xFFFFFF=2^24-1,超出这个值,这 3 个字节置为 1,将实际数据转存到 Extended Timestamp 字段中。
message length(长度): 占 3 byte 表示实际发送的消息的数据如音频帧、视频帧等数据的长度,单位是字节。注意这里是 Message 的长度,也就是 chunk 属于的 Message 的总数据长度,而不是 chunk 本身 Data 的数据的长度。
message type id(消息的类型 id): 占 1 byte 表示实际发送的数据的类型,如 8 代表音频数据、9 代表视频数据。
message stream id(消息的流 id): 占 4byte 表示该 chunk 所在的流的 ID,和 Basic Header 的 CSID 一样,它采用小端存储的方式。
fmt=0:长度 11 byte,其他三种能表示的数据它都能表示 , 在一个块流的开始和时间戳返回的时候必须有这种块: Chuck Message Header = timestamp + message length + message type id + message stream id
fmt=1:长度为 7 byte,与 fmt=0 时,相比,该类型少了 Message Stream Id,具有可变大小消息的流,在第一个消息之后的每个消息的第一个块应该使用这个格式:Chuck Message Header = timestamp + message length + message type id
fmt=2:长度 3 byte,不包含 Message Stream Id 和 Message Length 、Message type Id,具有固定大小消息的流,在第一个消息之后的每个消息的第一个块应该使用这个格式:Chuck Message Header = timestamp
fmt=3:长度为 0 byte,当一个消息被分成多个块,除了第一块以外,所有的块都应使用这种类型 : Chuck Message Header = 0 byte
注意:message type id 发送音视频数据的时候
如果包头 MessageTypeID 为 0x8 或 0x9,数据(chunk data)是 flv 的 tag data(没有 tag header),flv 格式封装请见FLV格式解析
也可以用新类型 MessageTypeID 为 0x16,数据(chunk data)是一个完整 flv 的 tag(tag header + tag data)
message stream id 采用<font color=red>小端</font>存储
RTMP 都是大端模式,所以发送数据,包头,交互消息都要填写大端模式的,但是只有 streamID 是小端模式
2.3.1.3 ExtendedTimestamp(扩展时间) (0、4 byte)
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| timestamp |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
复制代码
只有当块消息头中的普通时间戳设置为 0xffffff 时,本字段才被传送。
如果普通时间戳的值小于 0x00ffffff ,那么本字段一定不能出现。
如果块消息头中时间戳字段不出现本字段也一定不能出现。
类型 3 的块一定不能含有本字段。
本字段在块消息头之后,块数据之前。
2.3.2 Chunk Data
+-----------+
|Chunk Data |
+-----------+
复制代码
用户层面上真正想要发送的与协议无关的数据,长度在(0,chunkSize]之间, chunk size 默认为 128 字节。
2.3.3 RTMP 消息分块注意事项
Chunk Size: RTMP 是按照 chunk size 进行分块,chunk size 指的是 chunk 的 payload 部分的大小,不包括 chunk basic header 和 chunk message header 长度。客户端和服务器端各自维护了两个 chunk size, 分别是自身分块的 chunk size 和 对端 的 chunk size, 默认的这两个 chunk size 都是 128 字节。通过向对端发送 set chunk size 消息可以告知对方更改了 chunk size 的大小。
Chunk Type: RTMP 消息分成的 Chunk 有 4 种类型,可以通过 chunk basic header 的高两位(fmt)指定,一般在拆包的时候会把一个 RTMP 消息拆成以格式 0 开始的 chunk,之后的包拆成格式 3 类型的 chunk,我查看了有不少代码也是这样实现的,这样也是最简单的实现。
如果第二个 message 和第一个 message 的 message stream ID 相同,并且第二个 message 的长度也大于了 chunk size,那么该如何拆包?当时查了很多资料,都没有介绍。后来看了一些源码,如 SRS,FFMPEG 中的实现,发现第二个 message 可以拆成Type_1
类型一个 chunk, message 剩余的部分拆成 Type_3 类型的 chunk。FFMPEG 中就是这么做的。
srs-librtmp 分块结构封装:
class SrsChunkStream
{
public:
// Represents the basic header fmt,
// which used to identify the variant message header type.
char fmt;
// Represents the basic header cid,
// which is the chunk stream id.
int cid;
// Cached message header
SrsMessageHeader header;
// Whether the chunk message header has extended timestamp.
bool extended_timestamp;
// The partially read message.
SrsCommonMessage* msg;
// Decoded msg count, to identify whether the chunk stream is fresh.
int64_t msg_count;
public:
SrsChunkStream(int _cid);
virtual ~SrsChunkStream();
};
复制代码
3.代码分析
我们先看几个 srs-librtmp 封装的结构:runtime context, 我们初始化配置及传入的 url 以及 url 解析结果等:
struct Context
{
// The original RTMP url.
std::string url;
// Parse from url.
std::string tcUrl;
std::string host;
std::string vhost;
std::string app;
std::string stream;
std::string param;
// Parse ip:port from host.
std::string ip;
int port;
// The URL schema, about vhost/app/stream?param
srs_url_schema schema;
// The server information, response by connect app.
SrsServerInfo si;
// The extra request object for connect to server, NULL to ignore.
SrsRequest* req;
// the message received cache,
// for example, when got aggregate message,
// the context will parse to videos/audios,
// and return one by one.
std::vector<SrsCommonMessage*> msgs;
SrsRtmpClient* rtmp;
SimpleSocketStream* skt;
int stream_id;
// the remux raw codec.
SrsRawH264Stream avc_raw;
SrsRawAacStream aac_raw;
// about SPS, @see: 7.3.2.1.1, ISO_IEC_14496-10-AVC-2012.pdf, page 62
std::string h264_sps;
std::string h264_pps;
// whether the sps and pps sent,
// @see https://github.com/ossrs/srs/issues/203
bool h264_sps_pps_sent;
// only send the ssp and pps when both changed.
// @see https://github.com/ossrs/srs/issues/204
bool h264_sps_changed;
bool h264_pps_changed;
// the aac sequence header.
std::string aac_specific_config;
// user set timeout, in ms.
int64_t stimeout;
int64_t rtimeout;
// The RTMP handler level buffer, can used to format packet.
char buffer[1024];
Context() : port(0) {
rtmp = NULL;
skt = NULL;
req = NULL;
stream_id = 0;
h264_sps_pps_sent = false;
h264_sps_changed = false;
h264_pps_changed = false;
rtimeout = stimeout = SRS_UTIME_NO_TIMEOUT;
schema = srs_url_schema_normal;
}
virtual ~Context() {
srs_freep(req);
srs_freep(rtmp);
srs_freep(skt);
std::vector<SrsCommonMessage*>::iterator it;
for (it = msgs.begin(); it != msgs.end(); ++it) {
SrsCommonMessage* msg = *it;
srs_freep(msg);
}
msgs.clear();
}
};
复制代码
里面又涉及到了四个重要结构:
SrsServerInfo:The server information, response by connect app.
SrsRequest:The extra request object for connect to server, NULL to ignore.
SrsRtmpClient:rtmp 客户端相关信息
SimpleSocketStream: 真正的 socket 封装,跟我们之前文章深入理解rtmp(二)之脚手架搭建中的 SimpleSocketStream 一样
SrsServerInfo 结构,里面信息对于我们客户端用处不是特别大:
struct SrsServerInfo
{
std::string ip;
std::string sig;
int pid;
int cid;
int major;
int minor;
int revision;
int build;
SrsServerInfo();
};
复制代码
SrsRequest 结构:
class SrsRequest
{
public:
// The client ip.
std::string ip;
public:
// The tcUrl: rtmp://request_vhost:port/app/stream
// support pass vhost in query string, such as:
// rtmp://ip:port/app?vhost=request_vhost/stream
// rtmp://ip:port/app...vhost...request_vhost/stream
std::string tcUrl;
std::string pageUrl;
std::string swfUrl;
double objectEncoding;
// The data discovery from request.
public:
// Discovery from tcUrl and play/publish.
std::string schema;
// The vhost in tcUrl.
std::string vhost;
// The host in tcUrl.
std::string host;
// The port in tcUrl.
int port;
// The app in tcUrl, without param.
std::string app;
// The param in tcUrl(app).
std::string param;
// The stream in play/publish
std::string stream;
// For play live stream,
// used to specified the stop when exceed the duration.
// @see https://github.com/ossrs/srs/issues/45
// in srs_utime_t.
srs_utime_t duration;
// The token in the connect request,
// used for edge traverse to origin authentication,
// @see https://github.com/ossrs/srs/issues/104
SrsAmf0Object* args;
public:
SrsRequest();
virtual ~SrsRequest();
public:
// Deep copy the request, for source to use it to support reload,
// For when initialize the source, the request is valid,
// When reload it, the request maybe invalid, so need to copy it.
virtual SrsRequest* copy();
// update the auth info of request,
// To keep the current request ptr is ok,
// For many components use the ptr of request.
virtual void update_auth(SrsRequest* req);
// Get the stream identify, vhost/app/stream.
virtual std::string get_stream_url();
// To strip url, user must strip when update the url.
virtual void strip();
public:
// Transform it as HTTP request.
virtual SrsRequest* as_http();
};
复制代码
SrsRtmpClient:
// implements the client role protocol.
class SrsRtmpClient
{
private:
SrsHandshakeBytes* hs_bytes;//跟我们之前封装的握手接口类似
protected:
SrsProtocol* protocol;//rtmp协议封装,就是封装了我们的connect, createStream,publish等接口
ISrsProtocolReadWriter* io;//就是SimpleSocketStream
public:
SrsRtmpClient(ISrsProtocolReadWriter* skt);
virtual ~SrsRtmpClient();
// Protocol methods proxy
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual void set_send_timeout(srs_utime_t tm);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg, int stream_id);
virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
virtual srs_error_t send_and_free_packet(SrsPacket* packet, int stream_id);
public:
// handshake with server, try complex, then simple handshake.
virtual srs_error_t handshake();
// only use simple handshake
virtual srs_error_t simple_handshake();
// only use complex handshake
virtual srs_error_t complex_handshake();
// Connect to RTMP tcUrl and app, get the server info.
//
// @param app, The app to connect at, for example, live.
// @param tcUrl, The tcUrl to connect at, for example, rtmp://ossrs.net/live.
// @param req, the optional req object, use the swfUrl/pageUrl if specified. NULL to ignore.
// @param dsu, Whether debug SRS upnode. For edge, set to true to send its info to upnode.
// @param si, The server information, retrieve from response of connect app request. NULL to ignore.
virtual srs_error_t connect_app(std::string app, std::string tcUrl, SrsRequest* r, bool dsu, SrsServerInfo* si);
// Create a stream, then play/publish data over this stream.
virtual srs_error_t create_stream(int& stream_id);
// start play stream.
virtual srs_error_t play(std::string stream, int stream_id, int chunk_size);
// start publish stream. use flash publish workflow:
// connect-app => create-stream => flash-publish
virtual srs_error_t publish(std::string stream, int stream_id, int chunk_size);
// start publish stream. use FMLE publish workflow:
// connect-app => FMLE publish
virtual srs_error_t fmle_publish(std::string stream, int& stream_id);
public:
template<class T>
srs_error_t expect_message(SrsCommonMessage** pmsg, T** ppacket)
{
return protocol->expect_message<T>(pmsg, ppacket);
}
};
复制代码
SimpleSocketStream 和我们之前的类似,只是 srs 作者做了一层接口抽象:
/**
* The system io reader/writer architecture:
* +---------------+ +---------------+
* | IStreamWriter | | IVectorWriter |
* +---------------+ +---------------+
* | + write() | | + writev() |
* +-------------+-+ ++--------------+
* +----------+ +--------------------+ /\ /\
* | IReader | | IStatistic | \ /
* +----------+ +--------------------+ V
* | + read() | | + get_recv_bytes() | +------+----+
* +------+---+ | + get_send_bytes() | | IWriter |
* / \ +---+--------------+-+ +-------+---+
* | / \ / \ / \
* | | | |
* +------+-------------+------+ ++---------------------+--+
* | IProtocolReader | | IProtocolWriter |
* +---------------------------+ +-------------------------+
* | + readfully() | | + set_send_timeout() |
* | + set_recv_timeout() | +-------+-----------------+
* +------------+--------------+ / \
* / \ |
* | |
* +--+-----------------------------+-+
* | IProtocolReadWriter |
* +----------------------------------+
*/
复制代码
我们再看一下 SrsProtocol:
// The protocol provides the rtmp-message-protocol services,
// To recv RTMP message from RTMP chunk stream,
// and to send out RTMP message over RTMP chunk stream.
class SrsProtocol
{
private:
class AckWindowSize
{
public:
uint32_t window;
// number of received bytes.
int64_t nb_recv_bytes;
// previous responsed sequence number.
uint32_t sequence_number;
AckWindowSize();
};
// For peer in/out
private:
// The underlayer socket object, send/recv bytes.
ISrsProtocolReadWriter* skt;
// The requests sent out, used to build the response.
// key: transactionId
// value: the request command name
std::map<double, std::string> requests;
// For peer in
private:
// The chunk stream to decode RTMP messages.
std::map<int, SrsChunkStream*> chunk_streams;
// Cache some frequently used chunk header.
// cs_cache, the chunk stream cache.
SrsChunkStream** cs_cache;
// The bytes buffer cache, recv from skt, provide services for stream.
SrsFastStream* in_buffer;
// The input chunk size, default to 128, set by peer packet.
int32_t in_chunk_size;
// The input ack window, to response acknowledge to peer,
// For example, to respose the encoder, for server got lots of packets.
AckWindowSize in_ack_size;
// The output ack window, to require peer to response the ack.
AckWindowSize out_ack_size;
// The buffer length set by peer.
int32_t in_buffer_length;
// Whether print the protocol level debug info.
// Generally we print the debug info when got or send first A/V packet.
bool show_debug_info;
// Whether auto response when recv messages.
bool auto_response_when_recv;
// When not auto response message, manual flush the messages in queue.
std::vector<SrsPacket*> manual_response_queue;
// For peer out
private:
iovec* out_iovs;
int nb_out_iovs;
char* out_c0c3_caches;
// Whether warned user to increase the c0c3 header cache.
bool warned_c0c3_cache_dry;
// The output chunk size, default to 128, set by config.
int32_t out_chunk_size;
public:
SrsProtocol(ISrsProtocolReadWriter* io);
virtual ~SrsProtocol();
public:
// Set the auto response message when recv for protocol stack.
virtual void set_auto_response(bool v);
virtual srs_error_t manual_response_flush();
public:
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
// Get recv/send bytes.
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
public:
virtual srs_error_t set_in_window_ack_size(int ack_size);
public:
virtual srs_error_t recv_message(SrsCommonMessage** pmsg);
virtual srs_error_t decode_message(SrsCommonMessage* msg, SrsPacket** ppacket);
virtual srs_error_t send_and_free_message(SrsSharedPtrMessage* msg, int stream_id);
virtual srs_error_t send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
virtual srs_error_t send_and_free_packet(SrsPacket* packet, int stream_id);
private:
virtual srs_error_t do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
virtual srs_error_t do_iovs_send(iovec* iovs, int size);
virtual srs_error_t do_send_and_free_packet(SrsPacket* packet, int stream_id);
virtual srs_error_t do_decode_message(SrsMessageHeader& header, SrsBuffer* stream, SrsPacket** ppacket);
virtual srs_error_t recv_interlaced_message(SrsCommonMessage** pmsg);
virtual srs_error_t read_basic_header(char& fmt, int& cid);
virtual srs_error_t read_message_header(SrsChunkStream* chunk, char fmt);
virtual srs_error_t read_message_payload(SrsChunkStream* chunk, SrsCommonMessage** pmsg);
virtual srs_error_t on_recv_message(SrsCommonMessage* msg);
virtual srs_error_t on_send_packet(SrsMessageHeader* mh, SrsPacket* packet);
private:
virtual srs_error_t response_acknowledgement_message();
virtual srs_error_t response_ping_message(int32_t timestamp);
private:
virtual void print_debug_info();
};
复制代码
了解完主要的结构,继续看 connect app:
connect
最终调用到了 SrsRtmpClient 里面的 connect_app:
context->rtmp->connect_app(c->app, tcUrl, c->req, true, &c->si))
复制代码
tcUrl 格式:rtmp://host或vhost:port/app
,req 可以调用 srs_rtmp_set_connect_args 创建,默认为空,&c->si
是要存储服务端返回的 SrsServerInfo
在 connect_app 中首先构造了 SrsConnectAppPacket:
SrsConnectAppPacket* pkt = new SrsConnectAppPacket();
pkt->command_object->set("app", SrsAmf0Any::str(app.c_str()));
pkt->command_object->set("flashVer", SrsAmf0Any::str("WIN 15,0,0,239"));
if (r) {
pkt->command_object->set("swfUrl", SrsAmf0Any::str(r->swfUrl.c_str()));
} else {
pkt->command_object->set("swfUrl", SrsAmf0Any::str());
}
if (r && r->tcUrl != "") {
pkt->command_object->set("tcUrl", SrsAmf0Any::str(r->tcUrl.c_str()));
} else {
pkt->command_object->set("tcUrl", SrsAmf0Any::str(tcUrl.c_str()));
}
pkt->command_object->set("fpad", SrsAmf0Any::boolean(false));
pkt->command_object->set("capabilities", SrsAmf0Any::number(239));
pkt->command_object->set("audioCodecs", SrsAmf0Any::number(3575));
pkt->command_object->set("videoCodecs", SrsAmf0Any::number(252));
pkt->command_object->set("videoFunction", SrsAmf0Any::number(1));
if (r) {
pkt->command_object->set("pageUrl", SrsAmf0Any::str(r->pageUrl.c_str()));
} else {
pkt->command_object->set("pageUrl", SrsAmf0Any::str());
}
pkt->command_object->set("objectEncoding", SrsAmf0Any::number(0));
if (dsu && r && r->args) {
srs_freep(pkt->args);
pkt->args = r->args->copy()->to_object();
}
if ((err = protocol->send_and_free_packet(pkt, 0)) != srs_success) {//发送packet
return srs_error_wrap(err, "send packet");
}
复制代码
SrsConnectAppPacket 结构:
class SrsConnectAppPacket : public SrsPacket
{
public:
// Name of the command. Set to "connect".
std::string command_name;
// Always set to 1.
double transaction_id;
SrsAmf0Object* command_object;
SrsAmf0Object* args;
public:
SrsConnectAppPacket();
virtual ~SrsConnectAppPacket();
// Decode functions for concrete packet to override.
public:
virtual srs_error_t decode(SrsBuffer* stream);
// Encode functions for concrete packet to override.
public:
virtual int get_prefer_cid();
virtual int get_message_type();
protected:
virtual int get_size();
virtual srs_error_t encode_packet(SrsBuffer* stream);
};
复制代码
对应我们前面 connect 消息结构封装,command_name 固定为"connect",transaction_id 固定为 1 发送 packet 调用 SrsProtocol 的 send_and_free_packet,最终调用到:
srs_error_t SrsProtocol::do_send_and_free_packet(SrsPacket* packet, int stream_id)
{
srs_error_t err = srs_success;
srs_assert(packet);
SrsAutoFree(SrsPacket, packet);
SrsCommonMessage* msg = new SrsCommonMessage();
SrsAutoFree(SrsCommonMessage, msg);
if ((err = packet->to_msg(msg, stream_id)) != srs_success) {
return srs_error_wrap(err, "to message");
}
SrsSharedPtrMessage* shared_msg = new SrsSharedPtrMessage();
if ((err = shared_msg->create(msg)) != srs_success) {
srs_freep(shared_msg);
return srs_error_wrap(err, "create message");
}
if ((err = send_and_free_message(shared_msg, stream_id)) != srs_success) {
return srs_error_wrap(err, "send packet");
}
if ((err = on_send_packet(&msg->header, packet)) != srs_success) {
return srs_error_wrap(err, "on send packet");
}
return err;
}
复制代码
先将 packet 转换为 message,主要是构造 MessageHeader,最终在do_send_messages
将消息拆分成块,再通过 socket 将块发送出去
srs_error_t SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
srs_error_t err = srs_success;
#ifdef SRS_PERF_COMPLEX_SEND
int iov_index = 0;
iovec* iovs = out_iovs + iov_index;
int c0c3_cache_index = 0;
char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
// try to send use the c0c3 header cache,
// if cache is consumed, try another loop.
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
if (!msg) {
continue;
}
// ignore empty message.
if (!msg->payload || msg->size <= 0) {
continue;
}
// p set to current write position,
char* p = msg->payload;
char* pend = msg->payload + msg->size;
// always write the header event payload is empty.
while (p < pend) {
// always has header
int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
srs_assert(nbh > 0);
// header iov
iovs[0].iov_base = c0c3_cache;
iovs[0].iov_len = nbh;
// payload iov
int payload_size = srs_min(out_chunk_size, (int)(pend - p));
iovs[1].iov_base = p;
iovs[1].iov_len = payload_size;
// consume sendout bytes.
p += payload_size;
if (iov_index >= nb_out_iovs - 2) {
int ov = nb_out_iovs;
nb_out_iovs = 2 * nb_out_iovs;
int realloc_size = sizeof(iovec) * nb_out_iovs;
out_iovs = (iovec*)realloc(out_iovs, realloc_size);
srs_warn("resize iovs %d => %d, max_msgs=%d", ov, nb_out_iovs, SRS_PERF_MW_MSGS);
}
// to next pair of iovs
iov_index += 2;
iovs = out_iovs + iov_index;
// to next c0c3 header cache
c0c3_cache_index += nbh;
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
// the cache header should never be realloc again,
// for the ptr is set to iovs, so we just warn user to set larger
// and use another loop to send again.
int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
// only warn once for a connection.
if (!warned_c0c3_cache_dry) {
srs_warn("c0c3 cache header too small, recoment to %d", SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);
warned_c0c3_cache_dry = true;
}
// when c0c3 cache dry,
// sendout all messages and reset the cache, then send again.
if ((err = do_iovs_send(out_iovs, iov_index)) != srs_success) {
return srs_error_wrap(err, "send iovs");
}
// reset caches, while these cache ensure
// atleast we can sendout a chunk.
iov_index = 0;
iovs = out_iovs + iov_index;
c0c3_cache_index = 0;
c0c3_cache = out_c0c3_caches + c0c3_cache_index;
}
}
}
// maybe the iovs already sendout when c0c3 cache dry,
// so just ignore when no iovs to send.
if (iov_index <= 0) {
return err;
}
return do_iovs_send(out_iovs, iov_index);
#else
// try to send use the c0c3 header cache,
// if cache is consumed, try another loop.
for (int i = 0; i < nb_msgs; i++) {
SrsSharedPtrMessage* msg = msgs[i];
if (!msg) {
continue;
}
// ignore empty message.
if (!msg->payload || msg->size <= 0) {
continue;
}
// p set to current write position,
char* p = msg->payload;
char* pend = msg->payload + msg->size;
// always write the header event payload is empty.
while (p < pend) {
// for simple send, send each chunk one by one
iovec* iovs = out_iovs;
char* c0c3_cache = out_c0c3_caches;
int nb_cache = SRS_CONSTS_C0C3_HEADERS_MAX;
// always has header
int nbh = msg->chunk_header(c0c3_cache, nb_cache, p == msg->payload);
srs_assert(nbh > 0);
// header iov
iovs[0].iov_base = c0c3_cache;
iovs[0].iov_len = nbh;
// payload iov
int payload_size = srs_min(out_chunk_size, pend - p);
iovs[1].iov_base = p;
iovs[1].iov_len = payload_size;
// consume sendout bytes.
p += payload_size;
if ((er = skt->writev(iovs, 2, NULL)) != srs_success) {
return srs_error_wrap(err, "writev");
}
}
}
return err;
#endif
}
复制代码
connect 后设置 SrsSetWindowAckSizePacket,然后从服务端读取 SrsConnectAppResPacket 并解析.
贴的代码有点多,篇幅太长了,这篇先分析到这里面,后面再写续一篇代码分析,分析一下音视频收发以及 srs-librtmp 的一些优化.
评论