写点什么

SRS 流媒体服务器源码分析:Rtmp publish 流程

用户头像
赖猫
关注
发布于: 2021 年 03 月 03 日

推荐视频:


码云最有价值项目-SRS流媒体服务器源码剖析

线程模型

srs 使用了 state-threads 协程库,是单线程多协程模型。

这个协程的概念类似于 lua 的协程,都是单线程中可以创建多个协程。而 golang 中的 goroutine 协程是多线程并发的,goroutine 有可能运行在同一个线程也可能在不同线程,这样就有了线程安全问题,所以需要 chan 通信或者 mutex 加锁共享资源。

而 srs 因为是单线程多协程所以不用考虑线程安全,数据不用加锁。


主流程分析

撇掉程序启动的一些初始化和设置,直接进入:

int SrsServer::listen(){    int ret = ERROR_SUCCESS;        if ((ret = listen_rtmp()) != ERROR_SUCCESS) {        return ret;    }        if ((ret = listen_http_api()) != ERROR_SUCCESS) {        return ret;    }        if ((ret = listen_http_stream()) != ERROR_SUCCESS) {        return ret;    }        if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {        return ret;    }        return ret;}
复制代码

先看看listen_rtmp():

int SrsServer::listen_rtmp(){    int ret = ERROR_SUCCESS;        // stream service port.    std::vector<std::string> ip_ports = _srs_config->get_listens();    srs_assert((int)ip_ports.size() > 0);        close_listeners(SrsListenerRtmpStream);        for (int i = 0; i < (int)ip_ports.size(); i++) {        SrsListener* listener = new SrsStreamListener(this, SrsListenerRtmpStream);        listeners.push_back(listener);                std::string ip;        int port;        srs_parse_endpoint(ip_ports[i], ip, port);                if ((ret = listener->listen(ip, port)) != ERROR_SUCCESS) {            srs_error("RTMP stream listen at %s:%d failed. ret=%d", ip.c_str(), port, ret);            return ret;        }    }        return ret;}
复制代码

创建了SrsStreamListener,在SrsStreamListener::listen中又创建了SrsTcpListener进行listen

SrsTcpListener::SrsTcpListener(ISrsTcpHandler* h, string i, int p){    handler = h;    ip = i;    port = p;
_fd = -1; _stfd = NULL;
pthread = new SrsReusableThread("tcp", this);}
复制代码

SrsTcpListener中创建了pthread: SrsReusableThread

int SrsTcpListener::listen()中调用了pthread->start(),协程会回调到int SrsTcpListener::cycle()

int SrsTcpListener::cycle(){    int ret = ERROR_SUCCESS;        st_netfd_t client_stfd = st_accept(_stfd, NULL, NULL, ST_UTIME_NO_TIMEOUT);        if(client_stfd == NULL){        // ignore error.        if (errno != EINTR) {            srs_error("ignore accept thread stoppped for accept client error");        }        return ret;    }    srs_verbose("get a client. fd=%d", st_netfd_fileno(client_stfd));        if ((ret = handler->on_tcp_client(client_stfd)) != ERROR_SUCCESS) {        srs_warn("accept client error. ret=%d", ret);        return ret;    }        return ret;}
复制代码

accept连接后,回调到on_tcp_client

也就是SrsStreamListener::on_tcp_client

int SrsStreamListener::on_tcp_client(st_netfd_t stfd){    int ret = ERROR_SUCCESS;        if ((ret = server->accept_client(type, stfd)) != ERROR_SUCCESS) {        srs_warn("accept client error. ret=%d", ret);        return ret;    }
return ret;}
复制代码


int SrsServer::accept_client(SrsListenerType type, st_netfd_t client_stfd){...    SrsConnection* conn = NULL;    if (type == SrsListenerRtmpStream) {        conn = new SrsRtmpConn(this, client_stfd);    } else if (type == SrsListenerHttpApi) {#ifdef SRS_AUTO_HTTP_API        conn = new SrsHttpApi(this, client_stfd, http_api_mux);#else        srs_warn("close http client for server not support http-api");        srs_close_stfd(client_stfd);        return ret;#endif    } else if (type == SrsListenerHttpStream) {#ifdef SRS_AUTO_HTTP_SERVER        conn = new SrsResponseOnlyHttpConn(this, client_stfd, http_server);#else        srs_warn("close http client for server not support http-server");        srs_close_stfd(client_stfd);        return ret;#endif    } else {        // TODO: FIXME: handler others    }    srs_assert(conn);        // directly enqueue, the cycle thread will remove the client.    conns.push_back(conn);    srs_verbose("add conn to vector.");        // cycle will start process thread and when finished remove the client.    // @remark never use the conn, for it maybe destroyed.    if ((ret = conn->start()) != ERROR_SUCCESS) {        return ret;    }    srs_verbose("conn started success.");
srs_verbose("accept client finished. conns=%d, ret=%d", (int)conns.size(), ret); return ret;}
复制代码

在上面根据type创建不同的SrsConnectionRtmp创建了SrsRtmpConn,并且加入到std::vector<SrsConnection*> conns;中,然后执行conn->start()

SrsConnection基类创建了一个协程pthread: SrsOneCycleThread,上面的conn->start(),实际上是pthread->start():

SrsConnection::SrsConnection(IConnectionManager* cm, st_netfd_t c){    id = 0;    manager = cm;    stfd = c;    disposed = false;    expired = false;        // the client thread should reap itself,     // so we never use joinable.    // TODO: FIXME: maybe other thread need to stop it.    // @see: https://github.com/ossrs/srs/issues/78    pthread = new SrsOneCycleThread("conn", this);}
int SrsConnection::start(){ return pthread->start();}
复制代码

int SrsConnection::cycle()调用了do_cycle(),派生类实现了这个方法。

int SrsRtmpConn::do_cycle(){    int ret = ERROR_SUCCESS;        srs_trace("RTMP client ip=%s", ip.c_str());
rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); //正式进入rtmp握手。 if ((ret = rtmp->handshake()) != ERROR_SUCCESS) { srs_error("rtmp handshake failed. ret=%d", ret); return ret; } srs_verbose("rtmp handshake success"); if ((ret = rtmp->connect_app(req)) != ERROR_SUCCESS) { srs_error("rtmp connect vhost/app failed. ret=%d", ret); return ret; } srs_verbose("rtmp connect app success"); // set client ip to request. req->ip = ip; srs_trace("connect app, " "tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, args=%s", req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), (req->args? "(obj)":"null")); // show client identity if(req->args) { std::string srs_version; std::string srs_server_ip; int srs_pid = 0; int srs_id = 0; SrsAmf0Any* prop = NULL; if ((prop = req->args->ensure_property_string("srs_version")) != NULL) { srs_version = prop->to_str(); } if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) { srs_server_ip = prop->to_str(); } if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) { srs_pid = (int)prop->to_number(); } if ((prop = req->args->ensure_property_number("srs_id")) != NULL) { srs_id = (int)prop->to_number(); } srs_info("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id); if (srs_pid > 0) { srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d", srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id); } } ret = service_cycle(); http_hooks_on_close();
return ret;}
复制代码

Linux、C/C++技术交流群:【960994558】整理了一些个人觉得比较好的学习书籍、大厂面试题、和热门技术教学视频资料共享在里面(包括 C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK 等等.),有需要的可以自行添加哦!~



在这儿正式进入rtmp协议处理阶段。先进行握手:rtmp->handshake()等操作,然后进入service_cycle();

int SrsRtmpConn::service_cycle(){      ...    while (!disposed) {        ret = stream_service_cycle();                // stream service must terminated with error, never success.        // when terminated with success, it's user required to stop.        if (ret == ERROR_SUCCESS) {            continue;        }                // when not system control error, fatal error, return.        if (!srs_is_system_control_error(ret)) {            if (ret != ERROR_SOCKET_TIMEOUT && !srs_is_client_gracefully_close(ret)) {                srs_error("stream service cycle failed. ret=%d", ret);            }            return ret;        }                // for republish, continue service        if (ret == ERROR_CONTROL_REPUBLISH) {            // set timeout to a larger value, wait for encoder to republish.            rtmp->set_send_timeout(SRS_REPUBLISH_RECV_TIMEOUT_US);            rtmp->set_recv_timeout(SRS_REPUBLISH_SEND_TIMEOUT_US);                        srs_trace("control message(unpublish) accept, retry stream service.");            continue;        }                // for "some" system control error,         // logical accept and retry stream service.        if (ret == ERROR_CONTROL_RTMP_CLOSE) {            // TODO: FIXME: use ping message to anti-death of socket.            // @see: https://github.com/ossrs/srs/issues/39            // set timeout to a larger value, for user paused.            rtmp->set_recv_timeout(SRS_PAUSED_RECV_TIMEOUT_US);            rtmp->set_send_timeout(SRS_PAUSED_SEND_TIMEOUT_US);                        srs_trace("control message(close) accept, retry stream service.");            continue;        }                // for other system control message, fatal error.        srs_error("control message(%d) reject as error. ret=%d", ret, ret);        return ret;    }        return ret;}
复制代码

stream_service_cycle:

int SrsRtmpConn::stream_service_cycle(){    int ret = ERROR_SUCCESS;            SrsRtmpConnType type;    if ((ret = rtmp->identify_client(res->stream_id, type, req->stream, req->duration)) != ERROR_SUCCESS) {        if (!srs_is_client_gracefully_close(ret)) {            srs_error("identify client failed. ret=%d", ret);        }        return ret;    }        srs_discovery_tc_url(req->tcUrl, req->schema, req->host, req->vhost, req->app, req->stream, req->port, req->param);    req->strip();    srs_trace("client identified, type=%s, stream_name=%s, duration=%.2f, param=%s",        srs_client_type_string(type).c_str(), req->stream.c_str(), req->duration, req->param.c_str());        // discovery vhost, resolve the vhost from config    SrsConfDirective* parsed_vhost = _srs_config->get_vhost(req->vhost);    if (parsed_vhost) {        req->vhost = parsed_vhost->arg0();    }        if (req->schema.empty() || req->vhost.empty() || req->port.empty() || req->app.empty()) {        ret = ERROR_RTMP_REQ_TCURL;        srs_error("discovery tcUrl failed. "                  "tcUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, ret=%d",                  req->tcUrl.c_str(), req->schema.c_str(), req->vhost.c_str(), req->port.c_str(), req->app.c_str(), ret);        return ret;    }        if ((ret = check_vhost()) != ERROR_SUCCESS) {        srs_error("check vhost failed. ret=%d", ret);        return ret;    }        srs_trace("connected stream, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%s, app=%s, stream=%s, param=%s, args=%s",        req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),        req->schema.c_str(), req->vhost.c_str(), req->port.c_str(),        req->app.c_str(), req->stream.c_str(), req->param.c_str(), (req->args? "(obj)":"null"));        // do token traverse before serve it.    // @see https://github.com/ossrs/srs/pull/239    if (true) {        bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost);        bool edge_traverse = _srs_config->get_vhost_edge_token_traverse(req->vhost);        if (vhost_is_edge && edge_traverse) {            if ((ret = check_edge_token_traverse_auth()) != ERROR_SUCCESS) {                srs_warn("token auth failed, ret=%d", ret);                return ret;            }        }    }        // security check    if ((ret = security->check(type, ip, req)) != ERROR_SUCCESS) {        srs_error("security check failed. ret=%d", ret);        return ret;    }    srs_info("security check ok");        // Never allow the empty stream name, for HLS may write to a file with empty name.    // @see https://github.com/ossrs/srs/issues/834    if (req->stream.empty()) {        ret = ERROR_RTMP_STREAM_NAME_EMPTY;        srs_error("RTMP: Empty stream name not allowed, ret=%d", ret);        return ret;    }
// client is identified, set the timeout to service timeout. rtmp->set_recv_timeout(SRS_CONSTS_RTMP_RECV_TIMEOUT_US); rtmp->set_send_timeout(SRS_CONSTS_RTMP_SEND_TIMEOUT_US); // find a source to serve. SrsSource* source = NULL; if ((ret = SrsSource::fetch_or_create(req, server, &source)) != ERROR_SUCCESS) { return ret; } srs_assert(source != NULL); // update the statistic when source disconveried. SrsStatistic* stat = SrsStatistic::instance(); if ((ret = stat->on_client(_srs_context->get_id(), req, this, type)) != ERROR_SUCCESS) { srs_error("stat client failed. ret=%d", ret); return ret; }
bool vhost_is_edge = _srs_config->get_vhost_is_edge(req->vhost); bool enabled_cache = _srs_config->get_gop_cache(req->vhost); srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%d[%d]", req->get_stream_url().c_str(), ip.c_str(), enabled_cache, vhost_is_edge, source->source_id(), source->source_id()); source->set_cache(enabled_cache); client_type = type; //根据客户端类型进入不同分支 switch (type) { case SrsRtmpConnPlay: { srs_verbose("start to play stream %s.", req->stream.c_str()); // response connection start play if ((ret = rtmp->start_play(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to play stream failed. ret=%d", ret); return ret; } if ((ret = http_hooks_on_play()) != ERROR_SUCCESS) { srs_error("http hook on_play failed. ret=%d", ret); return ret; } srs_info("start to play stream %s success", req->stream.c_str()); ret = playing(source); http_hooks_on_stop(); return ret; } case SrsRtmpConnFMLEPublish: { srs_verbose("FMLE start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_fmle_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } case SrsRtmpConnHaivisionPublish: { srs_verbose("Haivision start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_haivision_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } case SrsRtmpConnFlashPublish: { srs_verbose("flash start to publish stream %s.", req->stream.c_str()); if ((ret = rtmp->start_flash_publish(res->stream_id)) != ERROR_SUCCESS) { srs_error("flash start to publish stream failed. ret=%d", ret); return ret; } return publishing(source); } default: { ret = ERROR_SYSTEM_CLIENT_INVALID; srs_info("invalid client type=%d. ret=%d", type, ret); return ret; } }
return ret;}
复制代码

先进行 tmp->identify_client 客户端身份识别。

然后根据根据客户端类型(type)进入不同分支。

SrsRtmpConnPlay 客户端播流。

SrsRtmpConnFMLEPublish Rtmp 推流到服务器。

SrsRtmpConnHaivisionPublish 应该是海康威视推流到服务器?

SrsRtmpConnFlashPublish Flash 推流到服务器。

这儿只看 SrsRtmpConnFMLEPublish:

进入 int SrsRtmpConn::publishing(SrsSource* source),然后 int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd),

int SrsRtmpConn::do_publishing(SrsSource* source, SrsPublishRecvThread* trd){...    // start isolate recv thread.    if ((ret = trd->start()) != ERROR_SUCCESS) {        srs_error("start isolate recv thread failed. ret=%d", ret);        return ret;    }    ...}
复制代码

trd 协程运行,协程循环:执行 rtmp->recv_message(&msg)后调用 int SrsPublishRecvThread::handle(SrsCommonMessage* msg)。

再回调到 int SrsRtmpConn::handle_publish_message(SrsSource* source, SrsCommonMessage* msg, bool is_fmle, bool vhost_is_edge)。

之后处理收到的数据:

int SrsRtmpConn::process_publish_message(SrsSource* source, SrsCommonMessage* msg, bool vhost_is_edge){    int ret = ERROR_SUCCESS;        // for edge, directly proxy message to origin.    if (vhost_is_edge) {        if ((ret = source->on_edge_proxy_publish(msg)) != ERROR_SUCCESS) {            srs_error("edge publish proxy msg failed. ret=%d", ret);            return ret;        }        return ret;    }        // process audio packet    if (msg->header.is_audio()) {        if ((ret = source->on_audio(msg)) != ERROR_SUCCESS) {            srs_error("source process audio message failed. ret=%d", ret);            return ret;        }        return ret;    }    // process video packet    if (msg->header.is_video()) {        if ((ret = source->on_video(msg)) != ERROR_SUCCESS) {            srs_error("source process video message failed. ret=%d", ret);            return ret;        }        return ret;    }        // process aggregate packet    if (msg->header.is_aggregate()) {        if ((ret = source->on_aggregate(msg)) != ERROR_SUCCESS) {            srs_error("source process aggregate message failed. ret=%d", ret);            return ret;        }        return ret;    }        // process onMetaData    if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {        SrsPacket* pkt = NULL;        if ((ret = rtmp->decode_message(msg, &pkt)) != ERROR_SUCCESS) {            srs_error("decode onMetaData message failed. ret=%d", ret);            return ret;        }        SrsAutoFree(SrsPacket, pkt);            if (dynamic_cast<SrsOnMetaDataPacket*>(pkt)) {            SrsOnMetaDataPacket* metadata = dynamic_cast<SrsOnMetaDataPacket*>(pkt);            if ((ret = source->on_meta_data(msg, metadata)) != ERROR_SUCCESS) {                srs_error("source process onMetaData message failed. ret=%d", ret);                return ret;            }            srs_info("process onMetaData message success.");            return ret;        }                srs_info("ignore AMF0/AMF3 data message.");        return ret;    }        return ret;}
复制代码

如果本服务器是 edge 边缘服务器(vhost_is_edge)直接推流回源到源服务器。

audio 和 video 分开处理。

这儿只看一下 video 的处理:

int SrsSource::on_video(SrsCommonMessage* shared_video){    int ret = ERROR_SUCCESS;        // monotically increase detect.    if (!mix_correct && is_monotonically_increase) {        if (last_packet_time > 0 && shared_video->header.timestamp < last_packet_time) {            is_monotonically_increase = false;            srs_warn("VIDEO: stream not monotonically increase, please open mix_correct.");        }    }    last_packet_time = shared_video->header.timestamp;        // drop any unknown header video.    // @see https://github.com/ossrs/srs/issues/421    if (!SrsFlvCodec::video_is_acceptable(shared_video->payload, shared_video->size)) {        char b0 = 0x00;        if (shared_video->size > 0) {            b0 = shared_video->payload[0];        }                srs_warn("drop unknown header video, size=%d, bytes[0]=%#x", shared_video->size, b0);        return ret;    }        // convert shared_video to msg, user should not use shared_video again.    // the payload is transfer to msg, and set to NULL in shared_video.    SrsSharedPtrMessage msg;    if ((ret = msg.create(shared_video)) != ERROR_SUCCESS) {        srs_error("initialize the video failed. ret=%d", ret);        return ret;    }    srs_info("Video dts=%"PRId64", size=%d", msg.timestamp, msg.size);        // directly process the audio message.    if (!mix_correct) {        return on_video_imp(&msg);    }        // insert msg to the queue.    mix_queue->push(msg.copy());        // fetch someone from mix queue.    SrsSharedPtrMessage* m = mix_queue->pop();    if (!m) {        return ret;    }        // consume the monotonically increase message.    if (m->is_audio()) {        ret = on_audio_imp(m);    } else {        ret = on_video_imp(m);    }    srs_freep(m);        return ret;}
复制代码

把 shared_video 转换为 SrsSharedPtrMessage。

调用 on_video_imp。

int SrsSource::on_video_imp(SrsSharedPtrMessage* msg){    int ret = ERROR_SUCCESS;        srs_info("Video dts=%"PRId64", size=%d", msg->timestamp, msg->size);        bool is_sequence_header = SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size);        // whether consumer should drop for the duplicated sequence header.    bool drop_for_reduce = false;    if (is_sequence_header && cache_sh_video && _srs_config->get_reduce_sequence_header(_req->vhost)) {        if (cache_sh_video->size == msg->size) {            drop_for_reduce = srs_bytes_equals(cache_sh_video->payload, msg->payload, msg->size);            srs_warn("drop for reduce sh video, size=%d", msg->size);        }    }        // cache the sequence header if h264    // donot cache the sequence header to gop_cache, return here.    if (is_sequence_header) {        srs_freep(cache_sh_video);        cache_sh_video = msg->copy();                // parse detail audio codec        SrsAvcAacCodec codec;                // user can disable the sps parse to workaround when parse sps failed.        // @see https://github.com/ossrs/srs/issues/474        codec.avc_parse_sps = _srs_config->get_parse_sps(_req->vhost);                SrsCodecSample sample;        if ((ret = codec.video_avc_demux(msg->payload, msg->size, &sample)) != ERROR_SUCCESS) {            srs_error("source codec demux video failed. ret=%d", ret);            return ret;        }                // when got video stream info.        SrsStatistic* stat = SrsStatistic::instance();        if ((ret = stat->on_video_info(_req, SrsCodecVideoAVC, codec.avc_profile, codec.avc_level)) != ERROR_SUCCESS) {            return ret;        }                srs_trace("%dB video sh,  codec(%d, profile=%s, level=%s, %dx%d, %dkbps, %dfps, %ds)",            msg->size, codec.video_codec_id,            srs_codec_avc_profile2str(codec.avc_profile).c_str(),            srs_codec_avc_level2str(codec.avc_level).c_str(), codec.width, codec.height,            codec.video_data_rate / 1000, codec.frame_rate, codec.duration);    }    #ifdef SRS_AUTO_HLS    if ((ret = hls->on_video(msg, is_sequence_header)) != ERROR_SUCCESS) {        // apply the error strategy for hls.        // @see https://github.com/ossrs/srs/issues/264        std::string hls_error_strategy = _srs_config->get_hls_on_error(_req->vhost);        if (srs_config_hls_is_on_error_ignore(hls_error_strategy)) {            srs_warn("hls process video message failed, ignore and disable hls. ret=%d", ret);                        // unpublish, ignore ret.            hls->on_unpublish();                        // ignore.            ret = ERROR_SUCCESS;        } else if (srs_config_hls_is_on_error_continue(hls_error_strategy)) {            if (srs_hls_can_continue(ret, cache_sh_video, msg)) {                ret = ERROR_SUCCESS;            } else {                srs_warn("hls continue video failed. ret=%d", ret);                return ret;            }        } else {            srs_warn("hls disconnect publisher for video error. ret=%d", ret);            return ret;        }    }#endif    #ifdef SRS_AUTO_DVR    if ((ret = dvr->on_video(msg)) != ERROR_SUCCESS) {        srs_warn("dvr process video message failed, ignore and disable dvr. ret=%d", ret);                // unpublish, ignore ret.        dvr->on_unpublish();                // ignore.        ret = ERROR_SUCCESS;    }#endif
#ifdef SRS_AUTO_HDS if ((ret = hds->on_video(msg)) != ERROR_SUCCESS) { srs_warn("hds process video message failed, ignore and disable dvr. ret=%d", ret); // unpublish, ignore ret. hds->on_unpublish(); // ignore. ret = ERROR_SUCCESS; }#endif // copy to all consumer if (!drop_for_reduce) { for (int i = 0; i < (int)consumers.size(); i++) { SrsConsumer* consumer = consumers.at(i); if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) { srs_error("dispatch the video failed. ret=%d", ret); return ret; } } srs_info("dispatch video success."); }
// copy to all forwarders. if (!forwarders.empty()) { std::vector<SrsForwarder*>::iterator it; for (it = forwarders.begin(); it != forwarders.end(); ++it) { SrsForwarder* forwarder = *it; if ((ret = forwarder->on_video(msg)) != ERROR_SUCCESS) { srs_error("forwarder process video message failed. ret=%d", ret); return ret; } } } // when sequence header, donot push to gop cache and adjust the timestamp. if (is_sequence_header) { return ret; }
// cache the last gop packets if ((ret = gop_cache->cache(msg)) != ERROR_SUCCESS) { srs_error("gop cache msg failed. ret=%d", ret); return ret; } srs_verbose("cache gop success."); // if atc, update the sequence header to abs time. if (atc) { if (cache_sh_video) { cache_sh_video->timestamp = msg->timestamp; } if (cache_metadata) { cache_metadata->timestamp = msg->timestamp; } } return ret;}
复制代码

以上进行了缓存 h264 sequence header,hls 分发,客户端消费者分发,forwarders 推流等等。

这里主要看一下消费者分发:

// copy to all consumer    if (!drop_for_reduce) {        for (int i = 0; i < (int)consumers.size(); i++) {            SrsConsumer* consumer = consumers.at(i);            if ((ret = consumer->enqueue(msg, atc, jitter_algorithm)) != ERROR_SUCCESS) {                srs_error("dispatch the video failed. ret=%d", ret);                return ret;            }        }        srs_info("dispatch video success.");    }
复制代码


int SrsConsumer::enqueue(SrsSharedPtrMessage* shared_msg, bool atc, SrsRtmpJitterAlgorithm ag){    int ret = ERROR_SUCCESS;      //这儿的copy操作只是增加引用计数,没有实际的内存拷贝。    SrsSharedPtrMessage* msg = shared_msg->copy();
if (!atc) { if ((ret = jitter->correct(msg, ag)) != ERROR_SUCCESS) { srs_freep(msg); return ret; } } if ((ret = queue->enqueue(msg, NULL)) != ERROR_SUCCESS) { return ret; } #ifdef SRS_PERF_QUEUE_COND_WAIT srs_verbose("enqueue msg, time=%"PRId64", size=%d, duration=%d, waiting=%d, min_msg=%d", msg->timestamp, msg->size, queue->duration(), mw_waiting, mw_min_msgs); // fire the mw when msgs is enough. if (mw_waiting) { int duration_ms = queue->duration(); bool match_min_msgs = queue->size() > mw_min_msgs; // For ATC, maybe the SH timestamp bigger than A/V packet, // when encoder republish or overflow. // @see https://github.com/ossrs/srs/pull/749 if (atc && duration_ms < 0) { st_cond_signal(mw_wait); mw_waiting = false; return ret; } // when duration ok, signal to flush. if (match_min_msgs && duration_ms > mw_duration) { st_cond_signal(mw_wait); mw_waiting = false; return ret; } }#endif return ret;}
复制代码

每个 SrsConsumer 消费者拥有独立的 SrsMessageQueue* queue 队列。内部队列实现实际上是 std::multimap<int64_t, SrsSharedPtrMessage*> msgs。

SrsMessageQueue 有数量大小限制,当队列满的时候删除丢弃旧的 messages:

队列大小限制 queue_size 设置为配置文件中的"queue_length"。如果没设置则默认 #define SRS_PERF_PLAY_QUEUE 30。

queue_size_ms = (int)(queue_size * 1000);

int SrsMessageQueue::enqueue(SrsSharedPtrMessage* msg, bool* is_overflow){    int ret = ERROR_SUCCESS;        if (msg->is_av()) {        if (av_start_time == -1) {            av_start_time = msg->timestamp;        }                av_end_time = msg->timestamp;    }        msgs.push_back(msg);
while (av_end_time - av_start_time > queue_size_ms) { // notice the caller queue already overflow and shrinked. if (is_overflow) { *is_overflow = true; } shrink(); } return ret;}
复制代码


void SrsMessageQueue::shrink(){    SrsSharedPtrMessage* video_sh = NULL;    SrsSharedPtrMessage* audio_sh = NULL;    int msgs_size = (int)msgs.size();        // remove all msg    // igone the sequence header    for (int i = 0; i < (int)msgs.size(); i++) {        SrsSharedPtrMessage* msg = msgs.at(i);
if (msg->is_video() && SrsFlvCodec::video_is_sequence_header(msg->payload, msg->size)) { srs_freep(video_sh); video_sh = msg; continue; } else if (msg->is_audio() && SrsFlvCodec::audio_is_sequence_header(msg->payload, msg->size)) { srs_freep(audio_sh); audio_sh = msg; continue; }
srs_freep(msg); } msgs.clear();
// update av_start_time av_start_time = av_end_time; //push_back secquence header and update timestamp if (video_sh) { video_sh->timestamp = av_end_time; msgs.push_back(video_sh); } if (audio_sh) { audio_sh->timestamp = av_end_time; msgs.push_back(audio_sh); } if (_ignore_shrink) { srs_info("shrink the cache queue, size=%d, removed=%d, max=%.2f", (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0); } else { srs_trace("shrink the cache queue, size=%d, removed=%d, max=%.2f", (int)msgs.size(), msgs_size - (int)msgs.size(), queue_size_ms / 1000.0); }}
复制代码

保存最近的 sequence_header,然后清除其他 messages。

AVC sequence header

AAC sequence header

这两个 header 非常重要,是客户端解码的必需部分,所以不能删除。

这个丢包策略没有根据整个 GOP 进行丢包,而是直接丢掉除 sequence_header 的包,有可能会造成客户端花屏。

总结

客户端 Rtmp 推流到服务器,服务器将消息缓存到各个客户端消费者自己的队列中,数据使用引用计数没有内存拷贝操作。过期数据将被清除。


用户头像

赖猫

关注

还未添加个人签名 2020.11.28 加入

纸上得来终觉浅,绝知此事要躬行

评论

发布
暂无评论
SRS流媒体服务器源码分析:Rtmp publish流程