写点什么

DSS 如何实现 RTSP 协议支持与分发处理

用户头像
Changing Lin
关注
发布于: 2021 年 06 月 15 日
DSS如何实现RTSP协议支持与分发处理

1.项目概要:

本文主要分析DSS流媒体服务的如何实现RTSP协议支持与分发处理,主要过程分为:开始对RTSP的端口监听->等待客户端请求、组装请求信息->分发给匹配的Module处理,从代码层面来分析RTSP协议建立流程,帮助开发者大致了解其工作原理。
复制代码

2.背景和需求

IPC,作为一种视频输入设备,被广泛应用于各行各业,在科技与互联网普及的时代,随处可见它的身影。这其中市场占有率较高的应该是:海康威视、大华。我们公司是一家通信行业的企业,有许许多多IPC的应用场景。因此,对我们的系统软件提出了较高的要求,如何满足多路高并发、低时延、可靠传输是不断研究的一个话题。而传统的IPC大都支持RTSP协议。DSS作为一个优秀的流媒体平台,已经应用在我们的多个项目实践中。因此,分析DSS如何实现RTSP协议支持与分发处理很有必要,可以帮助我们加深对DSS工作原理的理解。
复制代码

3.实现原理

3.1 实时流媒体协议(Real Time Streaming Protocol,简称 RTSP)

  • 是一种常见的网络应用协议,被广泛用于创建和控制终端之间的媒体会话;以便于实现实时控制从流媒体服务到客户端(视频点播)或从客户端到流媒体服务(语音录音)的媒体流状态;

  • 基本的 RTSP 请求包括:OPTIONS 请求,DESCRIBE 请求,SETUP 请求,Play 播放请求,PAUSE 暂停请求,ANNOUNCE 发布请求,TEARDOWN 停止发布流请求等;

  • 如下图所示,RTSP 简单流程


3.2 查看配置文件 streamingserver.xml 中的 RTSP 端口字段:

<CONFIGURATION>    <SERVER>        <LIST-PREF NAME="rtsp_port" TYPE="UInt16" >            <VALUE>7070</VALUE>            <VALUE>10554</VALUE>            <VALUE>8000</VALUE>            <VALUE>8001</VALUE>        </LIST-PREF>        ...
复制代码

3.3 根据字段名称 rtsp_port,在工程项目中搜索相关文件代码:


3.4 查找 rtsp_port 字段相对应的代码

在QTSS.h头文件中可以找到qtssPrefsRTSPPorts关联字段,根据该字段可以找到QTSServer::GetRTSPPorts方法,在该方法中DSS会从缓存中读取rtsp监听端口列表,因为DSS启动会解析配置文件,也就意味这间接使用streamingserver.xml中的指定的信息,如下所示
复制代码


// QTSS.hqtssPrefsRTSPPorts                      = 32,   //"rtsp_ports"          // UInt16   
// QTSServer.cppUInt16 *QTSServer::GetRTSPPorts(QTSServerPrefs *inPrefs, UInt32 *outNumPortsPtr){ *outNumPortsPtr = inPrefs->GetNumValues(qtssPrefsRTSPPorts);
if (*outNumPortsPtr == 0) return NULL;
UInt16 *thePortArray = NEW UInt16[*outNumPortsPtr];
for (UInt32 theIndex = 0; theIndex < *outNumPortsPtr; theIndex++) { // Get the ip addr out of the prefs dictionary UInt32 theLen = sizeof(UInt16); QTSS_Error theErr = QTSS_NoErr; theErr = inPrefs->GetValue(qtssPrefsRTSPPorts, theIndex, &thePortArray[theIndex], &theLen); Assert(theErr == QTSS_NoErr); }
return thePortArray;}
复制代码

3.5 DSS 服务主线程

根据QTSServer对象的GetRTSPPorts方法找到引用入口是 QTSServer::CreateListeners,QTSServer是DSS服务的主线程,负责整个应用的启动、关闭、异常等的处理,因此,该方法应该是应用启动初始化的时候会被执行,开始端口的监听。从字面意思来看,就是创建监听器,在Linux网络编程中,可以监听的端口的就是TCP,难道RTSP的底层是TCP么?
复制代码


Bool16 QTSServer::CreateListeners(Bool16 startListeningNow, QTSServerPrefs *inPrefs, UInt16 inPortOverride){    struct PortTracking    {        PortTracking() : fPort(0), fIPAddr(0), fNeedsCreating(true) {}
UInt16 fPort; UInt32 fIPAddr; Bool16 fNeedsCreating; };
PortTracking *thePortTrackers = NULL; UInt32 theTotalPortTrackers = 0;
PortTracking* theHTTPPortTrackers = nullptr; UInt32 theTotalHTTPPortTrackers = 0;
// Get the IP addresses from the pref UInt32 theNumAddrs = 0; UInt32 *theIPAddrs = this->GetRTSPIPAddrs(inPrefs, &theNumAddrs); UInt32 index = 0;
if (inPortOverride != 0) { theTotalPortTrackers = theNumAddrs; // one port tracking struct for each IP addr thePortTrackers = NEW PortTracking[theTotalPortTrackers]; for (index = 0; index < theNumAddrs; index++) { thePortTrackers[index].fPort = inPortOverride; thePortTrackers[index].fIPAddr = theIPAddrs[index]; } } else { UInt32 theNumPorts = 0; UInt16 *thePorts = GetRTSPPorts(inPrefs, &theNumPorts); theTotalPortTrackers = theNumAddrs * theNumPorts; thePortTrackers = NEW PortTracking[theTotalPortTrackers];
UInt32 currentIndex = 0;
for (index = 0; index < theNumAddrs; index++) { for (UInt32 portIndex = 0; portIndex < theNumPorts; portIndex++) { currentIndex = (theNumPorts * index) + portIndex;
thePortTrackers[currentIndex].fPort = thePorts[portIndex]; thePortTrackers[currentIndex].fIPAddr = theIPAddrs[index]; } }
delete[] thePorts; }
// Stat Total Num of HTTP Port { theTotalHTTPPortTrackers = theNumAddrs; theHTTPPortTrackers = new PortTracking[theTotalHTTPPortTrackers];
UInt16 theHTTPPort = inPrefs->GetServiceLanPort(); UInt32 currentIndex = 0;
for (index = 0; index < theNumAddrs; index++) { theHTTPPortTrackers[index].fPort = theHTTPPort; theHTTPPortTrackers[index].fIPAddr = theIPAddrs[index]; } }
delete[] theIPAddrs; // // Now figure out which of these ports we are *already* listening on. // If we already are listening on that port, just move the pointer to the // listener over to the new array TCPListenerSocket **newListenerArray = NEW TCPListenerSocket * [theTotalPortTrackers]; UInt32 curPortIndex = 0;
for (UInt32 count = 0; count < theTotalPortTrackers; count++) { for (UInt32 count2 = 0; count2 < fNumListeners; count2++) { if ((fListeners[count2]->GetLocalPort() == thePortTrackers[count].fPort) && (fListeners[count2]->GetLocalAddr() == thePortTrackers[count].fIPAddr)) { thePortTrackers[count].fNeedsCreating = false; newListenerArray[curPortIndex++] = fListeners[count2]; Assert(curPortIndex <= theTotalPortTrackers); break; } } }
// HTTPPortTrackers check for (UInt32 count = 0; count < theTotalHTTPPortTrackers; count++) { for (UInt32 count2 = 0; count2 < fNumListeners; count2++) { if ((fListeners[count2]->GetLocalPort() == theHTTPPortTrackers[count].fPort) && (fListeners[count2]->GetLocalAddr() == theHTTPPortTrackers[count].fIPAddr)) { theHTTPPortTrackers[count].fNeedsCreating = false; newListenerArray[curPortIndex++] = fListeners[count2]; Assert(curPortIndex <= theTotalPortTrackers + theTotalHTTPPortTrackers); break; } } }
// // Create any new listeners we need 创建我们需要的RTSP端口监听器 for (UInt32 count3 = 0; count3 < theTotalPortTrackers; count3++) { if (thePortTrackers[count3].fNeedsCreating) { newListenerArray[curPortIndex] = NEW RTSPListenerSocket(); // 新建对象,用来接收处理RTSP请求 QTSS_Error err = newListenerArray[curPortIndex]->Initialize(thePortTrackers[count3].fIPAddr, thePortTrackers[count3].fPort); // 初始化 RTSPListenerSocket 对象,绑定本地IP地址和端口
char thePortStr[20]; qtss_sprintf(thePortStr, "%hu", thePortTrackers[count3].fPort);
// // If there was an error creating this listener, destroy it and log an error if ((startListeningNow) && (err != QTSS_NoErr)) delete newListenerArray[curPortIndex];
if (err == EADDRINUSE) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssListenPortInUse, 0, thePortStr); else if (err == EACCES) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssListenPortAccessDenied, 0, thePortStr); else if (err != QTSS_NoErr) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssListenPortError, 0, thePortStr); else { // // This listener was successfully created. if (startListeningNow) newListenerArray[curPortIndex]->RequestEvent(EV_RE); curPortIndex++; } } }
// Create any new <HTTP> listeners we need for (UInt32 count3 = 0; count3 < theTotalHTTPPortTrackers; count3++) { if (theHTTPPortTrackers[count3].fNeedsCreating) { newListenerArray[curPortIndex] = new HTTPListenerSocket(); QTSS_Error err = newListenerArray[curPortIndex]->Initialize(theHTTPPortTrackers[count3].fIPAddr, theHTTPPortTrackers[count3].fPort);
char thePortStr[20]; qtss_sprintf(thePortStr, "%hu", theHTTPPortTrackers[count3].fPort);
// // If there was an error creating this listener, destroy it and log an error if ((startListeningNow) && (err != QTSS_NoErr)) delete newListenerArray[curPortIndex];
if (err == EADDRINUSE) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssListenPortInUse, 0, thePortStr); else if (err == EACCES) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssListenPortAccessDenied, 0, thePortStr); else if (err != QTSS_NoErr) QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssListenPortError, 0, thePortStr); else { // // This listener was successfully created. if (startListeningNow) newListenerArray[curPortIndex]->RequestEvent(EV_RE); curPortIndex++; } } } // // Kill any listeners that we no longer need for (UInt32 count4 = 0; count4 < fNumListeners; count4++) { Bool16 deleteThisOne = true;
for (UInt32 count5 = 0; count5 < curPortIndex; count5++) { if (newListenerArray[count5] == fListeners[count4]) deleteThisOne = false; }
if (deleteThisOne) fListeners[count4]->Signal(Task::kKillEvent); }
// // Finally, make our server attributes and fListener privy to the new... fListeners = newListenerArray; fNumListeners = curPortIndex; UInt32 portIndex = 0;
for (UInt32 count6 = 0; count6 < fNumListeners; count6++) { if (fListeners[count6]->GetLocalAddr() != INADDR_LOOPBACK) { UInt16 thePort = fListeners[count6]->GetLocalPort(); (void)this->SetValue(qtssSvrRTSPPorts, portIndex, &thePort, sizeof(thePort), QTSSDictionary::kDontObeyReadOnly); portIndex++; } } this->SetNumValues(qtssSvrRTSPPorts, portIndex);
delete[] thePortTrackers; delete[] theHTTPPortTrackers; return (fNumListeners > 0);}
复制代码

3.6 核心对象:RTSPListenerSocket

通过分析,RTSPListenerSocket是TCPListenerSocket的子类,而TCPListenerSocket是TCPSocket和IdleTask的子类;这说明它同时具备了TCPSocket和IdleTask的特性;
复制代码


class RTSPListenerSocket : public TCPListenerSocket{public:    RTSPListenerSocket() {}    virtual ~RTSPListenerSocket() {}
//sole job of this object is to implement this function virtual Task *GetSessionTask(TCPSocket **outSocket);
//check whether the Listener should be idling Bool16 OverMaxConnections(UInt32 buffer);};class TCPListenerSocket : public TCPSocket, public IdleTask{ public:
TCPListenerSocket() : TCPSocket(NULL, Socket::kNonBlockingSocketType), IdleTask(), fAddr(0), fPort(0), fOutOfDescriptors(false), fSleepBetweenAccepts(false) {this->SetTaskName("TCPListenerSocket");} virtual ~TCPListenerSocket() {} // // Send a TCPListenerObject a Kill event to delete it. //addr = listening address. port = listening port. Automatically //starts listening OS_Error Initialize(UInt32 addr, UInt16 port);
//You can query the listener to see if it is failing to accept //connections because the OS is out of descriptors. Bool16 IsOutOfDescriptors() { return fOutOfDescriptors; }
void SlowDown() { fSleepBetweenAccepts = true; } void RunNormal() { fSleepBetweenAccepts = false; } //derived object must implement a way of getting tasks & sockets to this object virtual Task* GetSessionTask(TCPSocket** outSocket) = 0; virtual SInt64 Run(); private: enum { kTimeBetweenAcceptsInMsec = 1000, //UInt32 kListenQueueLength = 128 //UInt32 };
virtual void ProcessEvent(int eventBits); OS_Error Listen(UInt32 queueLength);
UInt32 fAddr; UInt16 fPort; Bool16 fOutOfDescriptors; Bool16 fSleepBetweenAccepts;};
复制代码

3.7 DSS 中 TCPSocket 和 Socket

TCPSocket是Socket的子类,而Socket又是EventContext的子类,这其实已经属于DSS的异步模型的范畴了;在EventContext中只是封装了几个重要抽象方法,如字面意思理解即可,并不需要继续跟进;但在Socket类中有两个重要的方法:Socket::Initialize、Socket::StartThread;我们可以看到EventThread类,它是OSThread的子类,这就表明EventThread是一个处理事件的独立线程。
复制代码


class Socket : public EventContext{    public:            enum        {            // Pass this in on socket constructors to specify whether the            // socket should be non-blocking or blocking            kNonBlockingSocketType = 1        };
// This class provides a global event thread. static void Initialize() { sEventThread = new EventThread(); } // 静态方法,用于初始化事件线程 static void StartThread() { sEventThread->Start(); } // 静态方法,用于启动事件线程 static EventThread* GetEventThread() { return sEventThread; } // 静态方法,用于返回事件线程对象 //Binds the socket to the following address. //Returns: QTSS_FileNotOpen, QTSS_NoErr, or POSIX errorcode. OS_Error Bind(UInt32 addr, UInt16 port,Bool16 test = false); //The same. but in reverse void Unbind(); void ReuseAddr(); void NoDelay(); void KeepAlive(); void SetSocketBufSize(UInt32 inNewSize);
// // Returns an error if the socket buffer size is too big OS_Error SetSocketRcvBufSize(UInt32 inNewSize); //Send //Returns: QTSS_FileNotOpen, QTSS_NoErr, or POSIX errorcode. OS_Error Send(const char* inData, const UInt32 inLength, UInt32* outLengthSent);
//Read //Reads some data. //Returns: QTSS_FileNotOpen, QTSS_NoErr, or POSIX errorcode. OS_Error Read(void *buffer, const UInt32 length, UInt32 *rcvLen); //WriteV: same as send, but takes an iovec //Returns: QTSS_FileNotOpen, QTSS_NoErr, or POSIX errorcode. OS_Error WriteV(const struct iovec* iov, const UInt32 numIOvecs, UInt32* outLengthSent); //You can query for the socket's state Bool16 IsConnected() { return (Bool16) (fState & kConnected); } Bool16 IsBound() { return (Bool16) (fState & kBound); } //If the socket is bound, you may find out to which addr it is bound UInt32 GetLocalAddr() { return ntohl(fLocalAddr.sin_addr.s_addr); } UInt16 GetLocalPort() { return ntohs(fLocalAddr.sin_port); } StrPtrLen* GetLocalAddrStr(); StrPtrLen* GetLocalPortStr(); StrPtrLen* GetLocalDNSStr(); enum { kMaxNumSockets = 4096 //UInt32 };
protected:
//TCPSocket takes an optional task object which will get notified when //certain events happen on this socket. Those events are: // //S_DATA: Data is currently available on the socket. //S_CONNECTIONCLOSING: Client is closing the connection. No longer necessary // to call Close or Disconnect, Snd & Rcv will fail. Socket(Task *notifytask, UInt32 inSocketType); virtual ~Socket() {}
//returns QTSS_NoErr, or appropriate posix error OS_Error Open(int theType); UInt32 fState; enum { kPortBufSizeInBytes = 8, //UInt32 kMaxIPAddrSizeInBytes = 20 //UInt32 }; #if SOCKET_DEBUG StrPtrLen fLocalAddrStr; char fLocalAddrBuffer[kMaxIPAddrSizeInBytes]; #endif //address information (available if bound) //these are always stored in network order. Conver struct sockaddr_in fLocalAddr; struct sockaddr_in fDestAddr; StrPtrLen* fLocalAddrStrPtr; StrPtrLen* fLocalDNSStrPtr; char fPortBuffer[kPortBufSizeInBytes]; StrPtrLen fPortStr; //State flags. Be careful when changing these values, as subclasses add their own enum { kBound = 0x0004, kConnected = 0x0008 }; static EventThread* sEventThread; };
复制代码

3.8 DSS 核心线程:EventThread

EventThread::Entry线程入口,其实通过epoll或者select模型,在一段指定的时间内,监听用户感兴趣的文件描述符上可读、可写和异常等事件。当发生事件时,会从缓存中根据文件描述符找到对应的EventContext,并且回调的EventContext.ProcessEvent方法。
复制代码


void EventThread::Entry(){    struct eventreq theCurrentEvent;    ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );        while (true)    {        int theErrno = EINTR;        while (theErrno == EINTR)        {#if MACOSXEVENTQUEUE            int theReturnValue = waitevent(&theCurrentEvent, NULL);#else        #if defined(_ENABLE_EPOLL_EVENT_)            int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);        #else            int theReturnValue = select_waitevent(&theCurrentEvent, NULL);        #endif // _ENABLE_EPOLL_EVENT_#endif              //Sort of a hack. In the POSIX version of the server, waitevent can return            //an actual POSIX errorcode.            if (theReturnValue >= 0)                theErrno = theReturnValue;            else                theErrno = OSThread::GetErrno();        }                AssertV(theErrno == 0, theErrno);                //ok, there's data waiting on this socket. Send a wakeup.        if (theCurrentEvent.er_data != NULL)        {            //The cookie in this event is an ObjectID. Resolve that objectID into            //a pointer.            StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));            OSRef* ref = fRefTable.Resolve(&idStr);            if (ref != NULL)            {                EventContext* theContext = (EventContext*)ref->GetObject();#if DEBUG                theContext->fModwatched = false;#endif                theContext->ProcessEvent(theCurrentEvent.er_eventbits);                fRefTable.Release(ref);                                            }        }
#if EVENT_CONTEXT_DEBUG SInt64 yieldStart = OS::Milliseconds();#endif
this->ThreadYield();
#if EVENT_CONTEXT_DEBUG SInt64 yieldDur = OS::Milliseconds() - yieldStart; static SInt64 numZeroYields; if ( yieldDur > 1 ) { qtss_printf( "EventThread time in OSTHread::Yield %i, numZeroYields %i\n", (SInt32)yieldDur, (SInt32)numZeroYields ); numZeroYields = 0; } else numZeroYields++;#endif }}
复制代码

3.9 TCPListenerSocket 工作原理

EventContext.ProcessEvent方法,主动发送一个Task::kReadEvent事件给外部传递进来的Notified Task;这个Notified Task其实是在初始化Socket的时候,外部传递进来的。这里可以得出结论,前面的TCPListenerSocket既是TCPSocket又是IdleTask的子类,也就是每一个TCPListenerSocket,当发生可读、可写事件时,EventThread会主动回调的父类EventContext.ProcessEvent方法,再通过Task::kReadEvent来触发TCPListenerSocket::Run()方法。
复制代码


virtual void EventContext::ProcessEvent(int /*eventBits*/)         {               if (EVENTCONTEXT_DEBUG)            {                if (fTask== NULL)                      qtss_printf("EventContext::ProcessEvent context=%p task=NULL\n",(void *) this);                 else                     qtss_printf("EventContext::ProcessEvent context=%p task=%p TaskName=%s\n",(void *)this,(void *) fTask, fTask->fTaskName);             }
if (fTask != NULL) fTask->Signal(Task::kReadEvent); }
Socket::Socket(Task *notifytask, UInt32 inSocketType) // Socket的构造方法: EventContext(EventContext::kInvalidFileDesc, sEventThread), fState(inSocketType), fLocalAddrStrPtr(NULL), fLocalDNSStrPtr(NULL), fPortStr(fPortBuffer, kPortBufSizeInBytes){ fLocalAddr.sin_addr.s_addr = 0; fLocalAddr.sin_port = 0; fDestAddr.sin_addr.s_addr = 0; fDestAddr.sin_port = 0; this->SetTask(notifytask);
#if SOCKET_DEBUG fLocalAddrStr.Set(fLocalAddrBuffer,sizeof(fLocalAddrBuffer));#endif}
复制代码

3.10 TCPListenerSocket 是如何处理与客户端请求相对应的 TCP 连接

TCPListenerSocket::Run方法,执行TCPListenerSocket::ProcessEvent方法,进行accept接收一个新的socket连接。这里是一个标准的Linux网络编程的基础知识了。
复制代码


void TCPListenerSocket::ProcessEvent(int /*eventBits*/){    //we are executing on the same thread as every other    //socket, so whatever you do here has to be fast.        struct sockaddr_in addr;#if __Win32__ || __osf__ || __sgi__ || __hpux__        int size = sizeof(addr);#else    socklen_t size = sizeof(addr);#endif    Task* theTask = NULL;    TCPSocket* theSocket = NULL;        //fSocket data member of TCPSocket.    int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
//test osSocket = -1; if (osSocket == -1) { //take a look at what this error is. int acceptError = OSThread::GetErrno(); if (acceptError == EAGAIN) { //If it's EAGAIN, there's nothing on the listen queue right now, //so modwatch and return this->RequestEvent(EV_RE); return; } //test acceptError = ENFILE;//test acceptError = EINTR;//test acceptError = ENOENT; //if these error gets returned, we're out of file desciptors, //the server is going to be failing on sockets, logs, qtgroups and qtuser auth file accesses and movie files. The server is not functional. if (acceptError == EMFILE || acceptError == ENFILE) { #ifndef __Win32__
QTSSModuleUtils::LogErrorStr(qtssFatalVerbosity, "Out of File Descriptors. Set max connections lower and check for competing usage from other processes. Exiting.");#endif
exit (EXIT_FAILURE); } else { char errStr[256]; errStr[sizeof(errStr) -1] = 0; qtss_snprintf(errStr, sizeof(errStr) -1, "accept error = %d '%s' on socket. Clean up and continue.", acceptError, strerror(acceptError)); WarnV( (acceptError == 0), errStr); theTask = this->GetSessionTask(&theSocket); if (theTask == NULL) { close(osSocket); } else { theTask->Signal(Task::kKillEvent); // just clean up the task } if (theSocket) theSocket->fState &= ~kConnected; // turn off connected state return; } } theTask = this->GetSessionTask(&theSocket); if (theTask == NULL) { //this should be a disconnect. do an ioctl call? close(osSocket); if (theSocket) theSocket->fState &= ~kConnected; // turn off connected state } else { Assert(osSocket != EventContext::kInvalidFileDesc); //set options on the socket //we are a server, always disable nagle algorithm int one = 1; int err = ::setsockopt(osSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int)); AssertV(err == 0, OSThread::GetErrno()); err = ::setsockopt(osSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&one, sizeof(int)); AssertV(err == 0, OSThread::GetErrno()); int sndBufSize = 96L * 1024L; err = ::setsockopt(osSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sndBufSize, sizeof(int)); AssertV(err == 0, OSThread::GetErrno()); //setup the socket. When there is data on the socket, //theTask will get an kReadEvent event theSocket->Set(osSocket, &addr); theSocket->InitNonBlocking(osSocket); theSocket->SetTask(theTask); theSocket->RequestEvent(EV_RE); theTask->SetThreadPicker(Task::GetBlockingTaskThreadPicker()); //The RTSP Task processing threads }

if (fSleepBetweenAccepts) { // We are at our maximum supported sockets // slow down so we have time to process the active ones (we will respond with errors or service). // wake up and execute again after sleeping. The timer must be reset each time through //qtss_printf("TCPListenerSocket slowing down\n"); this->SetIdleTimer(kTimeBetweenAcceptsInMsec); //sleep 1 second } else { // sleep until there is a read event outstanding (another client wants to connect) //qtss_printf("TCPListenerSocket normal speed\n"); this->RequestEvent(EV_RE); }
fOutOfDescriptors = false; // always false for now we don't properly handle this elsewhere in the code}
SInt64 TCPListenerSocket::Run(){ EventFlags events = this->GetEvents(); // // ProcessEvent cannot be going on when this object gets deleted, because // the resolve / release mechanism of EventContext will ensure this thread // will block before destructing stuff. if (events & Task::kKillEvent) return -1; //This function will get called when we have run out of file descriptors. //All we need to do is check the listen queue to see if the situation has //cleared up. (void)this->GetEvents(); this->ProcessEvent(Task::kReadEvent); return 0;}
复制代码

3.11 RTSPListenerSocket 工作原理

回到我们的主题:RTSPListenerSocket,它是TCPListenerSocket,用于监听RTSP的端口,有客户端连接过来时,会被系统感知并通知应用,并新建一个TCPSocket表示这个连接,后续通信的读写都可以通过这个对象进行操作。但RTSPListenerSocket是如何工作的呢?回到TCPListenerSocket::ProcessEvent方法中,有一个GetSessionTask方法。间接地可以找到 RTSPListenerSocket::GetSessionTask 方法,也就是 TCPListenerSocket 存在一个虚函数GetSessionTask,由子类提供具体的实现方法。
复制代码


void TCPListenerSocket::ProcessEvent(int /*eventBits*/){        ...       //derived object must implement a way of getting tasks & sockets to this object         virtual Task*   GetSessionTask(TCPSocket** outSocket) = 0;        ...}
Task *RTSPListenerSocket::GetSessionTask(TCPSocket **outSocket){ Assert(outSocket != NULL);
// when the server is behing a round robin DNS, the client needs to knwo the IP address ot the server // so that it can direct the "POST" half of the connection to the same machine when tunnelling RTSP thru HTTP Bool16 doReportHTTPConnectionAddress = QTSServerInterface::GetServer()->GetPrefs()->GetDoReportHTTPConnectionAddress();
RTSPSession *theTask = NEW RTSPSession(doReportHTTPConnectionAddress); *outSocket = theTask->GetSocket(); // out socket is not attached to a unix socket yet.
if (this->OverMaxConnections(0)) this->SlowDown(); else this->RunNormal();
return theTask;}
复制代码

3.12 RTSPSession 的作用

RTSPSession,代表一个RTSP会话,它定义了一个完整的TCP连接生命周期,从连接到 FIN 或 RESET 终止。这个对象是被安排并完成工作的活动元素,它创建请求并在数据到达时处理它们。它会自己判断什么时候关闭连接。RTSPSession是RTSPSessionInterface的子类;同时,也是QTSSDictionary和Task的子类,从RTSPSession对象的成员来看,有一个TCPSocket成员,也就意味着,该对象对应着一个客户端的TCPSocket连接,可以通过其与远程的客户端进行通信;可以在空闲线程或者定时线程中执行。因此,我们可以直接分析其中的RTSPSession::Run()来分析其工作流程。其实,内部是一个RTSP Session状态机,主要是从TCPSocket连接中读取完整的客户端请求数据,并分发给匹配的Module进行处理。例如:状态机处于kPreprocessingRequest时,分发给QTSSModule::kRTSPPreProcessorRole进行内部处理。
复制代码


SInt64 RTSPSession::Run(){...    case kPreprocessingRequest:            {                // Invoke preprocessor modules                numModules = QTSServerInterface::GetNumModulesInRole(QTSSModule::kRTSPPreProcessorRole);                {                    // Manipulation of the RTPSession from the point of view of                    // a module is guarenteed to be atomic by the API.                    Assert(fRTPSession != NULL);                    OSMutexLocker   locker(fRTPSession->GetSessionMutex());                                            for (; (fCurrentModule < numModules) && ((!fRequest->HasResponseBeenSent()) || fModuleState.eventRequested); fCurrentModule++)                    {                        fModuleState.eventRequested = false;                        fModuleState.idleTime = 0;                        if (fModuleState.globalLockRequested )                        {   fModuleState.globalLockRequested = false;                            fModuleState.isGlobalLocked = true;                        }                                                 theModule = QTSServerInterface::GetModule(QTSSModule::kRTSPPreProcessorRole, fCurrentModule);                        (void)theModule->CallDispatch(QTSS_RTSPPreProcessor_Role, &fRoleParams);                        fModuleState.isGlobalLocked = false;
// The way the API is set up currently, the first module that adds a stream // to the session is responsible for sending RTP packets for the session. if (fRTPSession->HasAnRTPStream() && (fRTPSession->GetPacketSendingModule() == NULL)) fRTPSession->SetPacketSendingModule(theModule); if (fModuleState.globalLockRequested) // call this request back locked return this->CallLocked();
// If this module has requested an event, return and wait for the event to transpire if (fModuleState.eventRequested) { this->ForceSameThread(); // We are holding mutexes, so we need to force // the same thread to be used for next Run() return fModuleState.idleTime; // If the module has requested idle time... } } } fCurrentModule = 0; if (fRequest->HasResponseBeenSent()) { fState = kPostProcessingRequest; break; } fState = kProcessingRequest; }...}
复制代码

3.13 找到与 QTSSModule::kRTSPPreProcessorRole 匹配的 Module


3.14 梳理整个分析流程对应的文件:

streamingserver.xml->

QTSS.h->

QTSServer.cpp->

RTSPListenerSocket.h->

TCPListenerSocket.h->

Socket.h->

EventThread.cpp->

TCPListenerSocket.cpp->

RTSPSession.cpp

4.总结:

通过本文从DSS服务启动的时序作为切入点,同时,根据配置文件中的字段,找到与之对应的相关代码,根据方法名称,反向的推导找到初始化入口,了解启动流程;再逐个分析几个重要的类的工作原理和DSS异步模型处理机制。至此,DSS是通过建立TCPSocket实现对端口的监听,当有客户端建立连接时,会在服务端建立一个RTSPSession对象,维护之间的连接关系,后续可以通过RTSPSession对象来与客户端通信,从RTSPSession中读取客户端发送的请求方法,进而通过kRTSPPreProcessorRole找打匹配的Module,并把请求交由下层Module内部自行处理。
复制代码

5.参考文献

https://www.cnblogs.com/skyfsm/p/7079458.html

发布于: 2021 年 06 月 15 日阅读数: 30
用户头像

Changing Lin

关注

获得机遇的手段远超于固有常规之上~ 2020.04.29 加入

我能做的,就是调整好自己的精神状态,以最佳的面貌去面对那些未曾经历过得事情,对生活充满热情和希望。

评论

发布
暂无评论
DSS如何实现RTSP协议支持与分发处理