SRS 流媒体服务器源码分析 --RTMP 消息 play
推荐视频
SRS 源码 Play 流程图:
进入 play 流程
由于之前的博客中已经梳理了系统启动监听以及推流客户端连接流程、如何创建协程、如何进入 stream_service_cycle。所以本章内容直接从 SrsRtmpConn::stream_service_cycle()方法中进入开始梳理。
switch (type) {
case SrsRtmpConnPlay: {
srs_verbose("start to play stream %s.", req->stream.c_str());
// response connection start play
if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) {
srs_error("start to play stream failed. ret=%d", ret);
return ret;
}
// 回调接口通知vhost开始play
if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) {
srs_error("http hook on_play failed. ret=%d", ret);
return ret;
}
srs_info("start to play stream %s success", req->stream.c_str());
ret = playing(source);
// 回调接口通知vhost play停止
http_hooks_on_stop();
return ret;
}
}
在接受流程处理当中,客户类型是:SrsRtmpConnFMLEPublish “fmle publish”。在转发流程当中,客户类型是:SrsRtmpConnPlay。
在 http_hooks_on_play()方法中回调 on_play()方法通知 vhost,xxx 用户已经开始 play。
在 http_hooks_on_stop()方法中回调 on_stop()方法通知 vhost,xxx 用户已经停止 play。
最重要的是:
ret = playing(source);
进入该函数。
int SrsRtmpConn::playing(SrsSource* source)
{
int ret = ERROR_SUCCESS;
// create consumer of souce.
SrsConsumer* consumer = NULL;
if ((ret = source->create_consumer(this, consumer)) != ERROR_SUCCESS) {
srs_error("create consumer failed. ret=%d", ret);
return ret;
}
SrsAutoFree(SrsConsumer, consumer);
srs_verbose("consumer created success.");
// use isolate thread to recv,
// @see: refine the recv message for performance #217 · Issue #217 · ossrs/srs
SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP);
// start isolate recv thread.
if ((ret = trd.start()) != ERROR_SUCCESS) {
srs_error("start isolate recv thread failed. ret=%d", ret);
return ret;
}
// delivery messages for clients playing stream.
wakable = consumer;
ret = do_playing(source, consumer, &trd);
wakable = NULL;
// stop isolate recv thread
trd.stop();
// warn for the message is dropped.
if (!trd.empty()) {
srs_warn("drop the received %d messages", trd.size());
}
return ret;
}
在函数中
1.1、根据客户端,创建消费者对象
create_consumer(this, consumer)
1.2、为该消费者开启一个独立协程
trd.start() //此处一直不太明白,在 play 流程中创建一个协程用来做什么?
1.3、进入 play 主流程
do_playing(source, consumer, &trd);
进入主 play 循环
进入该函数,do_playing()函数内容非常多,也是非常重要,所以将该函数内容全部列出。
int SrsRtmpConn::do_playing(SrsSource* source, SrsConsumer* consumer, SrsQueueRecvThread* trd)
{
int ret = ERROR_SUCCESS;
srs_assert(consumer != NULL);
if ((ret = refer->check(req->pageUrl, _srs_config->get_refer_play(req->vhost))) != ERROR_SUCCESS) {
srs_error("check play_refer failed. ret=%d", ret);
return ret;
}
srs_verbose("check play_refer success.");
// initialize other components
SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_play();
SrsAutoFree(SrsPithyPrint, pprint);
SrsMessageArray msgs(SRS_PERF_MW_MSGS);
bool user_specified_duration_to_stop = (req->duration > 0);
int64_t starttime = -1;
// setup the realtime.
realtime = _srs_config->get_realtime_enabled(req->vhost);
// setup the mw config.
// when mw_sleep changed, resize the socket send buffer.
mw_enabled = true;
change_mw_sleep(_srs_config->get_mw_sleep_ms(req->vhost));
// initialize the send_min_interval
send_min_interval = _srs_config->get_send_min_interval(req->vhost);
// set the sock options.
set_sock_options();
srs_trace("start play smi=%.2f, mw_sleep=%d, mw_enabled=%d, realtime=%d, tcp_nodelay=%d",
send_min_interval, mw_sleep, mw_enabled, realtime, tcp_nodelay);
// 连接是否已经断开
while (!disposed) {
// collect elapse for pithy print.
pprint->elapse();
//连接是否已经过期
// when source is set to expired, disconnect it.
if (expired) {
ret = ERROR_USER_DISCONNECT;
srs_error("connection expired. ret=%d", ret);
return ret;
}
// 使用单独协程去接受,能提高大约33%的性能
// to use isolate thread to recv, can improve about 33% performance.
// @see: RTMP protocol stack, recv never send, send never recv #196 · Issue #196 · ossrs/srs
// @see: refine the recv message for performance #217 · Issue #217 · ossrs/srs
while (!trd->empty()) {
//获取Message,
SrsCommonMessage* msg = trd->pump();
srs_verbose("pump client message to process.");
// 执行播放控制信息, 比如开始播放,暂停播放等等
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
srs_error("process play control message failed. ret=%d", ret);
}
return ret;
}
}
// quit when recv thread error.
if ((ret = trd->error_code()) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret) && !srs_is_system_control_error(ret)) {
srs_error("recv thread failed. ret=%d", ret);
}
return ret;
}
#ifdef SRS_PERF_QUEUE_COND_WAIT
// for send wait time debug
srs_verbose("send thread now=%"PRId64"us, wait %dms", srs_update_system_time_ms(), mw_sleep);
// wait for message to incoming.
// @see refine the send msg performance · Issue #251 · ossrs/srs
// @see 降低服务器RTMP的最低延迟 · Issue #257 · ossrs/srs
if (realtime) {
// for realtime, min required msgs is 0, send when got one+ msgs.
consumer->wait(0, mw_sleep);
} else {
// for no-realtime, got some msgs then send.
consumer->wait(SRS_PERF_MW_MIN_MSGS, mw_sleep);
}
// for send wait time debug
srs_verbose("send thread now=%"PRId64"us wakeup", srs_update_system_time_ms());
#endif
//从消费者列表当中获取接受到的rtmp信息
// get messages from consumer.
// each msg in msgs.msgs must be free, for the SrsMessageArray never free them.
// @remark when enable send_min_interval, only fetch one message a time.
int count = (send_min_interval > 0)? 1 : 0;
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}
// reportable
if (pprint->can_print()) {
kbps->sample();
srs_trace("caojj_player-> "SRS_CONSTS_LOG_PLAY
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mw=%d",
pprint->age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(),
mw_sleep
);
}
// we use wait timeout to get messages,
// for min latency event no message incoming,
// so the count maybe zero.
if (count > 0) {
srs_verbose("mw wait %dms and got %d msgs %d(%"PRId64"-%"PRId64")ms",
mw_sleep, count,
(count > 0? msgs.msgs[count - 1]->timestamp - msgs.msgs[0]->timestamp : 0),
(count > 0? msgs.msgs[0]->timestamp : 0),
(count > 0? msgs.msgs[count - 1]->timestamp : 0));
}
if (count <= 0) {
#ifndef SRS_PERF_QUEUE_COND_WAIT
srs_info("mw sleep %dms for no msg", mw_sleep);
st_usleep(mw_sleep * 1000);
#else
srs_verbose("mw wait %dms and got nothing.", mw_sleep);
#endif
// ignore when nothing got.
continue;
}
srs_info("got %d msgs, min=%d, mw=%d", count, SRS_PERF_MW_MIN_MSGS, mw_sleep);
// only when user specifies the duration,
// we start to collect the durations for each message.
if (user_specified_duration_to_stop) {
for (int i = 0; i < count; i++) {
SrsSharedPtrMessage* msg = msgs.msgs[i];
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->timestamp) {
starttime = msg->timestamp;
}
duration += msg->timestamp - starttime;
starttime = msg->timestamp;
}
}
//发送message,这里play总出口。
// sendout messages, all messages are freed by send_and_free_messages().
// no need to assert msg, for the rtmp will assert it.
printf("send message output gate\n");
if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send messages to client failed. ret=%d", ret);
}
return ret;
}
//如果指定的持续时间超过它,停止播放
// if duration specified, and exceed it, stop play live.
// @see: rtmpdump指定duration后SRS没有断开连接 · Issue #45 · ossrs/srs
if (user_specified_duration_to_stop) {
if (duration >= (int64_t)req->duration) {
ret = ERROR_RTMP_DURATION_EXCEED;
srs_trace("stop live for duration exceed. ret=%d", ret);
return ret;
}
}
// apply the minimal interval for delivery stream in ms.
if (send_min_interval > 0) {
st_usleep((int64_t)(send_min_interval * 1000));
}
}
return ret;
}
该函数有非常重要的 3 点:
通知消费者准备 play
// 执行播放控制信息, 比如开始播放,暂停播放等等
if ((ret = process_play_control_msg(consumer, msg)) != ERROR_SUCCESS) {
if (!srs_is_system_control_error(ret) && !srs_is_client_gracefully_close(ret)) {
srs_error("process play control message failed. ret=%d", ret);
}
return ret;
}
从消费者列表中取出 Rtmp 信息(SrsMessageQueue)
if ((ret = consumer->dump_packets(&msgs, count)) != ERROR_SUCCESS) {
srs_error("get messages from consumer failed. ret=%d", ret);
return ret;
}
进入 play 入口
/发送message,这里play总出口。
// sendout messages, all messages are freed by send_and_free_messages().
// no need to assert msg, for the rtmp will assert it.
//printf("send message output gate\n");
if (count > 0 && (ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send messages to client failed. ret=%d", ret);
}
return ret;
}
进入 SRS 发送接口(play)
在 int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)函数中,
进入 int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs),该函数有一个 #ifdef SRS_PERF_COMPLEX_SEND 宏定义,一般 rtmp 协议都是要混合音视频数据,在做转发。在往后面看,
// when c0c3 cache dry,
// sendout all messages and reset the cache, then send again.
//printf("do_send_messages do_iovs_send: iov_index = %d", iov_index);
if ((ret = do_iovs_send(out_iovs, iov_index)) != ERROR_SUCCESS) {
return ret;
}
最后进入
int srs_write_large_iovs(ISrsProtocolReaderWriter* skt, iovec* iovs, int size, ssize_t* pnwrite)
{
int ret = ERROR_SUCCESS;
// the limits of writev iovs.
// for srs-librtmp, @see srs-librtmp supports windows #213 · Issue #213 · ossrs/srs
#ifndef _WIN32
// for linux, generally it's 1024.
static int limits = (int)sysconf(_SC_IOV_MAX);
#else
static int limits = 1024;
#endif
// send in a time.
if (size < limits) {
if ((ret = skt->writev(iovs, size, pnwrite)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret);
}
return ret;
}
return ret;
}
// send in multiple times.
int cur_iov = 0;
while (cur_iov < size) {
int cur_count = srs_min(limits, size - cur_iov);
if ((ret = skt->writev(iovs + cur_iov, cur_count, pnwrite)) != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("send with writev failed. ret=%d", ret);
}
return ret;
}
cur_iov += cur_count;
}
return ret;
}
在该函数中,最重要的一点是 send message 总出口 writen()函数。它负责将转发给直播用户的流转发出去。
最后:play 总结
(1)通知 client 开始 play
(2)从消费者列表中取出 Rtmp 数据
(3)从总出口 writev()函数中转发出去
Linux、C/C++技术交流群:【960994558】整理了一些个人觉得比较好的学习书籍、大厂面试题、和热门技术教学视频资料共享在里面(包括 C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK 等等.),有需要的可以自行添加哦!~
以上有不足的地方欢迎指出讨论,同时可以持续关注我,每天分享干货内容!
赖猫
还未添加个人签名 2020.11.28 加入
纸上得来终觉浅,绝知此事要躬行
评论