前面深入理解 rtmp(1)之开发环境搭建中我们已经搭建好服务器,并且利用一些现成的工具可以推送直播流,播放直播流了.这篇文章我们开始搭建从零开发一套 rtmp 推流拉流 sdk,对着协议实现,达到真正的"深入理解".
作为一个码农,搬砖搬到一定高度就需要"脚手架"来支撑我们"够得住".为了方面我们把 rtmp 推拉流 sdk 实现为一个 PC 上的命令行程序,当开发调试稳定后,我们可以快速的通过交叉编译工具编译到 Android/iOS 等移动端设备.
1.创建工程
我们使用 cmake 作为安装编译工具,需要安装 cmake,mac 下执行 brew install cmake.在我们的 rtmpsdk 路径下创建 CMakeLists.txt:
//指定cmake最低版本
cmake_minimum_required (VERSION 3.6)
set(CMAKE_INSTALL_PREFIX "${CMAKE_BINARY_DIR}" CACHE PATH "Installation directory" FORCE)
message(STATUS "CMAKE_INSTALL_PREFIX=${CMAKE_INSTALL_PREFIX}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -fPIC -ffunction-sections -fdata-sections -Os")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fPIC -ffunction-sections -fdata-sections -Os")
project (rtmpsdk)
set(SRC_PREFIX "src")
set(SELF_LIBS_OUT ${CMAKE_SYSTEM_NAME}.out)
file(GLOB SELF_SRC_FILES
${SRC_PREFIX}/main.cpp
)
add_executable(${PROJECT_NAME} ${SELF_SRC_FILES})
复制代码
创建 src 目录,创建 main.cpp 文件:
#include <iostream>
int main(int argc,char* argv[])
{
//标准输出到控制台
std::cout << "Hello rtmp server!" << std::endl;
return 0;
}
复制代码
在 rtmpsdk 下创建 cmake_build 文件夹作为我们的输出路径在控制台,我们进入我们的工程路径后执行:
然后执行:
在 camke 下面生成了编译中间文件和最终的 rtmpsdk 文件:
现在执行一下./rtmpsdk:
$ ./rtmpsdk
Hello rtmp server!
复制代码
可以看到我们打印的"Hello rtmp server!",编译环境已经搭建好了,可以继续往下实现我们的功能了.
注:我的开发环境是 mac,windows 环境后面我提供一个 docker 的 centos 镜像作为我们工程的编译环境.
2.封装接口
我们想象一下,我们的 rtmp 应该对外提供什么接口?封装什么数据结构?
我们要连接我们的服务器,rtmp 是基于 tcp,那么我们要创建一个 socket 网络套接字,那么我们需要一个根据 url 创建对象的接口 rtmp_t rtmp_create(const char* url)
创建 socket 后我们还需要做一些配置,最基本的我们要配置读写超时时间,如果我们的 socket 没有超时,我们的读写函数一直没有返回,会导致无法退出的问题,所以我们需要提供一个设置读写超时的接口:int rtmp_set_timeout(rtmp_t rtmp, int recv_timeout_ms, int send_timeout_ms)
rtmp 有握手过程,接下来需要一个握手接口:int rtmp_handshake(rtmp_t rtmp)
握手成功后开始连接服务器,提供连接接口:int rtmp_connect_app(rtmp_t rtmp)
连接成功后通知服务器是拉流还是推流,提供两个函数:int rtmp_play_stream(rtmp_t rtmp),int rtmp_publish_stream(rtmp_t rtmp)
可以开始拉流或推流了:int rtmp_read_packet(rtmp_t rtmp, char* type, uint32_t* timestamp, char** data, int* size),int rtmp_write_packet(rtmp_t rtmp, char type, uint32_t timestamp, char* data, int size)
拉推流结束后,销毁对象释放资源:void rtmp_destroy(rtmp_t rtmp)
以播放为例用一个图表示:
接口定义好了,我们在 src 下新建 libs 目录,创建我们对外暴露的 rtmpsdk.hpp 文件:
#ifndef LIB_RTMP_HPP
#define LIB_RTMP_HPP
/**
* rtmpsdk is a librtmp like library,
* used to play/publish rtmp stream from/to rtmp server.
* socket: use sync and block socket to connect/recv/send data with server.
* depends: no need other libraries; depends on ssl if use complex_handshake.
* thread-safe: no
*/
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <stdint.h>
#include <sys/types.h>
#ifdef __cplusplus
extern "C"{
#endif
/*************************************************************
**************************************************************
* RTMP protocol context
**************************************************************
*************************************************************/
// the RTMP handler.
typedef void* rtmp_t;
/**
* Create a RTMP handler.
* @param url The RTMP url, for example, rtmp://localhost/live/livestream
* @remark default timeout to 30s if not set by rtmp_set_timeout.
* @remark default schema to url_schema_normal, use rtmp_set_schema to change it.
*
* @return a rtmp handler, or NULL if error occured.
*/
extern rtmp_t rtmp_create(const char* url);
/**
* set socket timeout
* @param recv_timeout_ms the timeout for receiving messages in ms.
* @param send_timeout_ms the timeout for sending message in ms.
* @remark user can set timeout once rtmp_create,
* or before rtmp_handshake or rtmp_dns_resolve to connect to server.
* @remark default timeout to 30s if not set by rtmp_set_timeout.
*
* @return 0, success; otherswise, failed.
*/
extern int rtmp_set_timeout(rtmp_t rtmp, int recv_timeout_ms, int send_timeout_ms);
/**
* close and destroy the rtmp stack.
* @remark, user should never use the rtmp again.
*/
extern void rtmp_destroy(rtmp_t rtmp);
/*************************************************************
**************************************************************
* RTMP protocol stack
**************************************************************
*************************************************************/
/**
* connect and handshake with server
* category: publish/play
* previous: rtmp-create
* next: connect-app
*
* @return 0, success; otherswise, failed.
*/
/**
* simple handshake specifies in rtmp 1.0,
* not depends on ssl.
*/
/**
* rtmp_handshake equals to invoke:
* rtmp_dns_resolve()
* rtmp_connect_server()
* rtmp_do_simple_handshake()
* user can use these functions if needed.
*/
extern int rtmp_handshake(rtmp_t rtmp);
/**
* Connect to RTMP tcUrl(Vhost/App), similar to flash AS3 NetConnection.connect(tcUrl).
* @remark When connected to server, user can retrieve informations from RTMP handler,
* for example, use rtmp_get_server_id to get server ip/pid/cid.
* @return 0, success; otherswise, failed.
*/
extern int rtmp_connect_app(rtmp_t rtmp);
/**
* play a live/vod stream.
* category: play
* previous: connect-app
* next: destroy
* @return 0, success; otherwise, failed.
*/
extern int rtmp_play_stream(rtmp_t rtmp);
/**
* publish a live stream.
* category: publish
* previous: connect-app
* next: destroy
* @return 0, success; otherwise, failed.
*/
extern int rtmp_publish_stream(rtmp_t rtmp);
/**
* E.4.1 FLV Tag, page 75
*/
// 8 = audio
#define RTMP_TYPE_AUDIO 8
// 9 = video
#define RTMP_TYPE_VIDEO 9
// 18 = script data
#define RTMP_TYPE_SCRIPT 18
/**
* read a audio/video/script-data packet from rtmp stream.
* @param type, output the packet type, macros:
* RTMP_TYPE_AUDIO, FlvTagAudio
* RTMP_TYPE_VIDEO, FlvTagVideo
* RTMP_TYPE_SCRIPT, FlvTagScript
* otherswise, invalid type.
* @param timestamp, in ms, overflow in 50days
* @param data, the packet data, according to type:
* FlvTagAudio, @see "E.4.2.1 AUDIODATA"
* FlvTagVideo, @see "E.4.3.1 VIDEODATA"
* FlvTagScript, @see "E.4.4.1 SCRIPTDATA"
* User can free the packet by rtmp_free_packet.
* @param size, size of packet.
* @return the error code. 0 for success; otherwise, error.
*
* @remark: for read, user must free the data.
* @remark: for write, user should never free the data, even if error.
*
* @return 0, success; otherswise, failed.
*/
extern int rtmp_read_packet(rtmp_t rtmp, char* type, uint32_t* timestamp, char** data, int* size);
// @param data User should never free it anymore.
extern int rtmp_write_packet(rtmp_t rtmp, char type, uint32_t timestamp, char* data, int size);
#ifdef __cplusplus
}
#endif
#endif
复制代码
接口定义好后,我们开始按步骤实现接口,下面我们开始实现第一步 rtmp_create,通过 url 创建 socket.
3.封装网络接口
封装网络接口前,我们先对 linux c 网络编程做一个回顾
3.1linux c socket 编程基本流程
我们先来一张图:
我们的 rtmpsdk 作为 tcp 客户端,我们再一起了解一下 linux c 关于 socket 的 api
3.1.1 socket()
函数原型
int socket(int domain, int type, int protocol);
复制代码
参数说明
domain: 协议域,又称协议族(family)。常用的协议族有 AF_INET 、 AF_INET6 、 AF_LOCAL(或称 AF_UNIX,Unix 域 Socket)、AF_ROUTE 等。协议族决定了 socket 的地址类型,在通信中必须采用对应的地址,如 AF_INET 决定了要用 ipv4 地址(32 位的)与端口号(16 位的)的组合、AF_UNIX 决定了要用一个绝对路径名作为地址。
type: 指定 Socket 类型。常用的 socket 类型有 SOCK_STREAM 、 SOCK_DGRAM 、 SOCK_RAW 、 SOCK_PACKET 、 SOCK_SEQPACKET 等。流式 Socket(SOCK_STREAM)是一种面向连接的 Socket,针对于面向连接的 TCP 服务应用。数据报式 Socket(SOCK_DGRAM)是一种无连接的 Socket,对应于无连接的 UDP 服务应用。
protocol: 指定协议。常用协议有 IPPROTO_TCP 、 IPPROTO_UDP 、 IPPROTO_STCP 、 IPPROTO_TIPC 等,分别对应 TCP 传输协议、UDP 传输协议、STCP 传输协议、TIPC 传输协议。
注意:1.type 和 protocol 不可以随意组合,如 SOCK_STREAM 不可以跟 IPPROTO_UDP 组合。当第三个参数为 0 时,会自动选择第二个参数类型对应的默认协议。
返回值
如果调用成功就返回新创建的套接字的描述符,如果失败就返回 INVALID_SOCKET(Linux 下失败返回-1)。
套接字描述符是一个整数类型的值。每个进程的进程空间里都有一个套接字描述符表,该表中存放着套接字描述符和套接字数据结构的对应关系。该表中有一个字段存放新创建的套接字的描述符,另一个字段存放套接字数据结构的地址,因此根据套接字描述符就可以找到其对应的套接字数据结构。每个进程在自己的进程空间里都有一个套接字描述符表但是套接字数据结构都是在操作系统的内核缓冲里。
3.1.2 bind()
bind()函数把一个地址族中的特定地址赋给 socket。例如对应 AF_INET、AF_INET6 就是把一个 ipv4 或 ipv6 地址和端口号组合赋给 socket。
函数原型
int bind(int socketfd, const struct sockaddr *addr, socklen_t addrlen);
复制代码
参数说明
socketfd: 一个标识已连接套接口的描述字。
address: 是一个 sockaddr 结构指针,该结构中包含了要结合的地址和端口号。
address_len: 确定 address 缓冲区的长度。
其中,sockaddr 这个地址结构根据地址创建 socket 时的地址协议族的不同而不同。
如 ipv4 对应的是:
struct sockaddr_in {
sa_family_t sin_family; /* address family: AF_INET */
in_port_t sin_port; /* port in network byte order */
struct in_addr sin_addr; /* internet address */
};
/* Internet address. */
struct in_addr {
uint32_t s_addr; /* address in network byte order */
};
复制代码
ipv6 对应的是:
struct sockaddr_in6 {
sa_family_t sin6_family; /* AF_INET6 */
in_port_t sin6_port; /* port number */
uint32_t sin6_flowinfo; /* IPv6 flow information */
struct in6_addr sin6_addr; /* IPv6 address */
uint32_t sin6_scope_id; /* Scope ID (new in 2.4) */
};
struct in6_addr {
unsigned char s6_addr[16]; /* IPv6 address */
};
复制代码
Unix 域对应的是:
#define UNIX_PATH_MAX 108
struct sockaddr_un {
sa_family_t sun_family; /* AF_UNIX */
char sun_path[UNIX_PATH_MAX]; /* pathname */
};
复制代码
返回值
如果函数执行成功,返回值为 0,否则为 SOCKET_ERROR。
3.1.3listen()
如果作为一个服务器,在调用 socket()、bind()之后就会调用 listen()来监听这个 socket,如果客户端这时调用 connect()发出连接请求,服务器端就会接收到这个请求。
函数原型
int listen(int socketfd, int backlog);
复制代码
参数说明
socket()函数创建的 socket 默认是一个主动类型的,listen 函数将 socket 变为被动类型的,等待客户的连接请求。
3.1.4connect()
函数原型
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
复制代码
参数说明
3.1.5. accept()
TCP 服务器端依次调用 socket()、bind()、listen() 之后,就会监听指定的 socket 地址了。TCP 客户端依次调用 socket()、connect() 之后就向 TCP 服务器发送了一个连接请求。TCP 服务器监听到这个请求之后,就会调用 accept() 函数取接收请求,这样连接就建立好了。之后就可以开始网络 I/O 操作了,即类同于普通文件的读写 I/O 操作。
函数原型
int accept(int socketfd, struct sockaddr *addr, socklen_t *addrlen); //返回连接connect_fd
复制代码
参数说明
socketfd: 就是上面解释中的监听套接字,这个套接字用来监听一个端口,当有一个客户与服务器连接时,它使用这个一个端口号,而此时这个端口号正与这个套接字关联。当然客户不知道套接字这些细节,它只知道一个地址和一个端口号。
sockaddr: 结果参数,它用来接受一个返回值,这返回值指定客户端的地址,当然这个地址是通过某个地址结构来描述的,用户应该知道这一个什么样的地址结构。如果对客户的地址不感兴趣,那么可以把这个值设置为 NULL。
len: 它也是结果的参数,用来接受上述 addr 的结构的大小的,它指明 addr 结构所占有的字节个数。同样的,它也可以被设置为 NULL。
如果 accept 成功返回,则服务器与客户已经正确建立连接了,此时服务器通过 accept 返回的套接字来完成与客户的通信。
accept 默认会阻塞进程,直到有一个客户连接建立后返回,它返回的是一个新可用的套接字,这个套接字是连接套接字。
一个服务器通常通常仅仅只创建一个监听 socket 描述字,它在该服务器的生命周期内一直存在。内核为每个由服务器进程接受的客户连接创建了一个已连接 socket 描述字,当服务器完成了对某个客户的服务,相应的已连接 socket 描述字就被关闭。
连接套接字 socketfd_new 并没有占用新的端口与客户端通信,依然使用的是与监听套接字 socketfd 一样的端口号
3.1.6. read()、write()等
当服务器与客户端已经建立好连接,可以调用网络 I/O 进行读写操作了,即实现了网咯中不同进程之间的通信!网络 I/O 操作有下面几组:
read()/write()
recv()/send()
readv()/writev()
recvmsg()/sendmsg()
recvfrom()/sendto()
复制代码
函数原型 1
int recv(SOCKET socket, char FAR* buf, int len, int flags);
复制代码
参数说明 1
函数原型 2
ssize_t recvfrom(int sockfd, void buf, int len, unsigned int flags, struct socketaddr* from, socket_t* fromlen);
复制代码
参数说明 2
sockfd: 标识一个已连接套接口的描述字。
buf: 接收数据缓冲区。
len: 缓冲区长度。
flags: 调用操作方式。是以下一个或者多个标志的组合体,可通过 or 操作连在一起:
MSG_DONTWAIT:操作不会被阻塞;
MSG_ERRQUEUE: 指示应该从套接字的错误队列上接收错误值,依据不同的协议,错误值以某种辅佐性消息的方式传递进来,使用者应该提供足够大的缓冲区。导致错误的原封包通过 msg_iovec 作为一般的数据来传递。导致错误的数据报原目标地址作为 msg_name 被提供。错误以 sock_extended_err 结构形态被使用。
MSG_PEEK:指示数据接收后,在接收队列中保留原数据,不将其删除,随后的读操作还可以接收相同的数据。
MSG_TRUNC:返回封包的实际长度,即使它比所提供的缓冲区更长, 只对 packet 套接字有效。
MSG_WAITALL:要求阻塞操作,直到请求得到完整的满足。然而,如果捕捉到信号,错误或者连接断开发生,或者下次被接收的数据类型不同,仍会返回少于请求量的数据。
MSG_EOR:指示记录的结束,返回的数据完成一个记录。
MSG_TRUNC:指明数据报尾部数据已被丢弃,因为它比所提供的缓冲区需要更多的空间。
MSG_CTRUNC:指明由于缓冲区空间不足,一些控制数据已被丢弃。(MSG_TRUNC 使用错误,4 才是 MSG_TRUNC 的正确解释)
MSG_OOB:指示接收到 out-of-band 数据(即需要优先处理的数据)。
MSG_ERRQUEUE:指示除了来自套接字错误队列的错误外,没有接收到其它数据。
from:(可选)指针,指向装有源地址的缓冲区。
fromlen:(可选)指针,指向 from 缓冲区长度值。
函数原型 3
int sendto( SOCKET s, const char FAR* buf, int size, int flags, const struct sockaddr FAR* to, int tolen);
复制代码
参数说明 3
s: 套接字
buf: 待发送数据的缓冲区
size: 缓冲区长度
flags: 调用方式标志位, 一般为 0, 改变 Flags,将会改变 Sendto 发送的形式
addr: (可选)指针,指向目的套接字的地址
tolen: addr 所指地址的长度如果成功,则返回发送的字节数,失败则返回 SOCKET_ERROR。
函数原型 4
int accept( int fd, struct socketaddr* addr, socklen_t* len);
复制代码
参数说明 4
3.1.7. close()
在服务器与客户端建立连接之后,会进行一些读写操作,完成了读写操作就要关闭相应的 socket 描述字。
函数原型
close 一个 TCP socket 的缺省行为时把该 socket 标记为以关闭,然后立即返回到调用进程。该描述字不能再由调用进程使用,也就是说不能再作为 read 或 write 的第一个参数。
注意:close 操作只是使相应 socket 描述字的引用计数-1,只有当引用计数为 0 的时候,才会触发 TCP 客户端向服务器发送终止连接请求。
3.2 封装 socket
我们把 socket 和超时配置等封装到一个结构体:
struct BlockSyncSocket
{
SOCKET fd;
int family;
int64_t rbytes;
int64_t sbytes;
// The send/recv timeout in ms.
int64_t rtm;
int64_t stm;
BlockSyncSocket() {
stm = rtm = UTIME_NO_TIMEOUT;
rbytes = sbytes = 0;
SOCKET_RESET(fd);
SOCKET_SETUP();
}
virtual ~BlockSyncSocket() {
SOCKET_CLOSE(fd);
SOCKET_CLEANUP();
}
};
复制代码
通过上面分析知,我们需要设计 socket 创建,连接,读写,设置超时等:
/**
* simple socket stream,
* use tcp socket, sync block mode
*/
class SimpleSocketStream
{
private:
BlockSyncSocket* io;
public:
SimpleSocketStream();
virtual ~SimpleSocketStream();
public:
virtual BlockSyncSocket* hijack_io();
virtual int create_socket(std::string url);
virtual int connect(const char* server, int port);
public:
virtual error_t read(void* buf, size_t size, ssize_t* nread);
public:
virtual void set_recv_timeout(utime_t tm);
virtual utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
public:
virtual void set_send_timeout(utime_t tm);
virtual utime_t get_send_timeout();
virtual int64_t get_send_bytes();
virtual error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
public:
virtual error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual error_t write(void* buf, size_t size, ssize_t* nwrite);
};
复制代码
接下来我们实现网络封装接口:
#include <netinet/tcp.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <errno.h>
#include <stdio.h>
#include <netdb.h>
#include <bs_socket.hpp>
BlockSyncSocket* hijack_io_create()
{
BlockSyncSocket* skt = new BlockSyncSocket();
return skt;
}
void hijack_io_destroy(BlockSyncSocket* ctx)
{
freep(ctx);
}
int hijack_io_create_socket(BlockSyncSocket* skt,std::string url)
{
skt->family = AF_INET6;
skt->fd = ::socket(skt->family, SOCK_STREAM, 0); // Try IPv6 first.
if (!SOCKET_VALID(skt->fd)) {
skt->family = AF_INET;
skt->fd = ::socket(skt->family, SOCK_STREAM, 0); // Try IPv4 instead, if IPv6 fails.
}
if (!SOCKET_VALID(skt->fd)) {
return ERROR_SOCKET_CREATE;
}
// No TCP cache.
int v = 1;
setsockopt(skt->fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
return ERROR_SUCCESS;
}
int hijack_io_connect(BlockSyncSocket* skt, const char* server_ip, int port)
{
char sport[8];
snprintf(sport, sizeof(sport), "%d", port);
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = skt->family;
hints.ai_socktype = SOCK_STREAM;
addrinfo* r = NULL;
AutoFree(addrinfo, r);
if(getaddrinfo(server_ip, sport, (const addrinfo*)&hints, &r)) {
return ERROR_SOCKET_CONNECT;
}
if(::connect(skt->fd, r->ai_addr, r->ai_addrlen) < 0){
return ERROR_SOCKET_CONNECT;
}
return ERROR_SUCCESS;
}
int hijack_io_read(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
ssize_t nb_read = ::recv(skt->fd, (char*)buf, size, 0);
if (nread) {
*nread = nb_read;
}
// On success a non-negative integer indicating the number of bytes actually read is returned
// (a value of 0 means the network connection is closed or end of file is reached).
if (nb_read <= 0) {
if (nb_read < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
if (nb_read == 0) {
errno = SOCKET_ECONNRESET;
}
return ERROR_SOCKET_READ;
}
skt->rbytes += nb_read;
return ret;
}
int hijack_io_set_recv_timeout(BlockSyncSocket* skt, int64_t tm)
{
// The default for this option is zero,
// which indicates that a receive operation shall not time out.
int32_t sec = 0;
int32_t usec = 0;
if (tm != UTIME_NO_TIMEOUT) {
sec = (int32_t)(tm / 1000);
usec = (int32_t)((tm % 1000)*1000);
}
struct timeval tv = { sec , usec };
if (setsockopt(skt->fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) {
return SOCKET_ERRNO();
}
skt->rtm = tm;
return ERROR_SUCCESS;
}
int hijack_io_set_send_timeout(BlockSyncSocket* skt, int64_t tm)
{
// The default for this option is zero,
// which indicates that a receive operation shall not time out.
int32_t sec = 0;
int32_t usec = 0;
if (tm != UTIME_NO_TIMEOUT) {
sec = (int32_t)(tm / 1000);
usec = (int32_t)((tm % 1000)*1000);
}
struct timeval tv = { sec , usec };
if (setsockopt(skt->fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) {
return SOCKET_ERRNO();
}
skt->stm = tm;
return ERROR_SUCCESS;
}
int hijack_io_writev(BlockSyncSocket* skt, const iovec *iov, int iov_size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write = ::writev(skt->fd, iov, iov_size);
if (nwrite) {
*nwrite = nb_write;
}
// On success, the readv() function returns the number of bytes read;
// the writev() function returns the number of bytes written. On error, -1 is
// returned, and errno is set appropriately.
if (nb_write <= 0) {
if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
return ERROR_SOCKET_WRITE;
}
skt->sbytes += nb_write;
return ret;
}
int hijack_io_read_fully(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nread)
{
int ret = ERROR_SUCCESS;
size_t left = size;
ssize_t nb_read = 0;
while (left > 0) {
char* this_buf = (char*)buf + nb_read;
ssize_t this_nread;
if ((ret = hijack_io_read(skt, this_buf, left, &this_nread)) != ERROR_SUCCESS) {
return ret;
}
nb_read += this_nread;
left -= (size_t)this_nread;
}
if (nread) {
*nread = nb_read;
}
skt->rbytes += nb_read;
return ret;
}
int hijack_io_write(BlockSyncSocket* skt, void* buf, size_t size, ssize_t* nwrite)
{
int ret = ERROR_SUCCESS;
ssize_t nb_write = ::send(skt->fd, (char*)buf, size, 0);
if (nwrite) {
*nwrite = nb_write;
}
if (nb_write <= 0) {
if (nb_write < 0 && SOCKET_ERRNO() == SOCKET_ETIME) {
return ERROR_SOCKET_TIMEOUT;
}
return ERROR_SOCKET_WRITE;
}
skt->sbytes += nb_write;
return ret;
}
error_t SimpleSocketStream::read(void* buf, size_t size, ssize_t* nread)
{
assert(io);
int ret = hijack_io_read(io, buf, size, nread);
if (ret != ERROR_SUCCESS) {
return error_new(ret, "read");
}
return success;
}
复制代码
接下来我们就可以在我们的 main 函数里面创建 SimpleSocketStream,然后创建 socket 了.下一篇我们开始通过创建的 socket 进行 rtmp 握手.
3.3 测试
在我们的 main.cpp 中:
#include <iostream>
#include <bs_socket.hpp>
int main(int argc,char* argv[])
{
std::cout << "Hello rtmp server!" << std::endl;
SimpleSocketStream *sss = new SimpleSocketStream();
if(sss->create_socket("rtmp://127.0.0.1:1935/live/livestream") != 0){
printf("create socket error!");
return -1;
}
std::cout<< "create fd = " << sss->hijack_io()->fd << std::endl;
free(sss);
return 0;
}
复制代码
输出结果:
$ ./rtmpsdk Hello rtmp server!create fd = 3
复制代码
我们成功创建了句柄为 3 的 socket.
题外话
linux 网络编程中有同步/异步,阻塞/非阻塞,由于我们现在 sdk 是客户端,没有并发连接的问题,所以我们的实现使用阻塞同步 socket.我们在创建 socket 时兼容了 ipv6,先尝试 ipv6,如果失败了再尝试 ipv4:
int hijack_io_create_socket(BlockSyncSocket* skt,std::string url)
{
skt->family = AF_INET6;
skt->fd = ::socket(skt->family, SOCK_STREAM, 0); // Try IPv6 first.
if (!SOCKET_VALID(skt->fd)) {
skt->family = AF_INET;
skt->fd = ::socket(skt->family, SOCK_STREAM, 0); // Try IPv4 instead, if IPv6 fails.
}
if (!SOCKET_VALID(skt->fd)) {
return ERROR_SOCKET_CREATE;
}
// No TCP cache.
int v = 1;
setsockopt(skt->fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
return ERROR_SUCCESS;
}
复制代码
setsockopt 可以对 socket 进行设置,这里:IPPROTO_TCP 和 IPPROTO_IP 代表两种不同的协议,分别代表 IP 协议族里面的 TCP 协议和 IP 协议 TCP_NODELAY 是什么呢?TCP/IP 协议中针对 TCP 默认开启了 Nagle 算法。Nagle 算法通过减少需要传输的数据包,来优化网络。在内核实现中,数据包的发送和接受会先做缓存,分别对应于写缓存和读缓存。
启动 TCP_NODELAY,就意味着禁用了 Nagle 算法,允许小包的发送。对于延时敏感型,同时数据传输量比较小的应用,开启 TCP_NODELAY 选项无疑是一个正确的选择。rtmp 是直播流式传输,对延时比较敏感,所以我们关闭了 NODELAY.同时比如,对于 SSH 会话,用户在远程敲击键盘发出指令的速度相对于网络带宽能力来说,绝对不是在一个量级上的,所以数据传输非常少;而又要求用户的输入能够及时获得返回,有较低的延时。如果开启了 Nagle 算法,就很可能出现频繁的延时,导致用户体验极差。当然,你也可以选择在应用层进行 buffer,比如使用 java 中的 buffered stream,尽可能地将大包写入到内核的写缓存进行发送;vectored I/O(writev 接口)也是个不错的选择。
对于关闭 TCP_NODELAY,则是应用了 Nagle 算法。数据只有在写缓存中累积到一定量之后,才会被发送出去,这样明显提高了网络利用率(实际传输数据 payload 与协议头的比例大大提高)。但是这又不可避免地增加了延时;与 TCP delayed ack 这个特性结合,这个问题会更加显著,延时基本在 40ms 左右。当然这个问题只有在连续进行两次写操作的时候,才会暴露出来。
连续进行多次对小数据包的写操作,然后进行读操作,本身就不是一个好的网络编程模式;在应用层就应该进行优化。对于既要求低延时,又有大量小数据传输,还同时想提高网络利用率的应用,大概只能用 UDP 自己在应用层来实现可靠性保证了。
评论