live555已经发展了十几年了,不得不钦佩作者坚持不懈的奉献和国外的开源生态环境,live555可以说是大部分的安防从业者的入门之选,尤其是在嵌入式或者Linux系统上,其应用还是蛮广泛的,主要是其兼容性和稳定性;

但是随着live555十几年的不断迭代,很多开发者反复向作者Ross提到的多线程和IPv6的功能,作者也一直都没有去尝试,可能是这样会对live555的架构产生比较大的改动和影响,作者为了稳妥,选择了小改动、稳定、逐步迭代的方式, 虽然是性能稳定,但支持的路数有限,不能多线程工作始终是个坎; 网上找到几篇live555多线程的博客, 基本上大同小异,就是创建独立的UsageEnvironment和TaskSchedule, 由独立的线程分工协作; 本人也是这个思路,创建多个工作线程,每个工作线程内创建UsageEnvironment和TaskSchedule,然后各自开启EventLoop;

今天我们抛砖引玉,先大概聊一下主体思路,在后续的博客中将尽力完整地汇总这些思路和开发的过程:

目标

将live555修改为多线程, 每个通道对应一个工作线程,由工作线程对该通道进行独立处理;

大体修改点

修改支持多线程, 主要涉及到以下类的修改

  • GenericMediaServer
  • RTSPServer

GenericMediaServer.cpp
在GenericMediaServer的构造函数中, 创建工作线程个数,即最大支持的通道数;

GenericMediaServer
::GenericMediaServer(UsageEnvironment& env, int ourSocketV4, int ourSocketV6, Port ourPort,
unsigned reclamationSeconds)
: Medium(env),
fServerSocket4(ourSocketV4), fServerSocket6(ourSocketV6),
fServerPort(ourPort), fReclamationSeconds(reclamationSeconds),
fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)),
fClientConnections(HashTable::create(ONE_WORD_HASH_KEYS)),
fClientSessions(HashTable::create(STRING_HASH_KEYS)) {
ignoreSigPipeOnSocket(fServerSocket4); // so that clients on the same host that are killed don't also kill us
ignoreSigPipeOnSocket(fServerSocket6); // so that clients on the same host that are killed don't also kill us #ifdef LIVE_MULTI_THREAD_ENABLE InitMutex(&mutexClientConnection); memset(&multiThreadCore, 0x00, sizeof(MultiThread_CORE_T));
multiThreadCore.threadNum = MAX_DEFAULT_MULTI_THREAD_NUM;
multiThreadCore.threadTask = new LIVE_THREAD_TASK_T[multiThreadCore.threadNum];
memset(&multiThreadCore.threadTask[0], 0x00, sizeof(LIVE_THREAD_TASK_T) * multiThreadCore.threadNum);
for (int i=0; i<multiThreadCore.threadNum; i++)
{
char szName[36] = {0};
sprintf(szName, "worker thread %d", i);
multiThreadCore.threadTask[i].id = i;
multiThreadCore.threadTask[i].extPtr = this;
multiThreadCore.threadTask[i].pSubScheduler = BasicTaskScheduler::createNew();
multiThreadCore.threadTask[i].pSubEnv = BasicUsageEnvironment::createNew(*multiThreadCore.threadTask[i].pSubScheduler, i+1, szName); CreateOSThread( &multiThreadCore.threadTask[i].osThread, __OSThread_Proc, (void *)&multiThreadCore.threadTask[i] ); }
#endif // Arrange to handle connections from others:
env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket4, incomingConnectionHandler4, this);
env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket6, incomingConnectionHandler6, this);
}

接受客户端连接

按原有流程接受客户端连接;

分配客户端请求

在收到客户端发送的DESCRIBE命令后,
在通道列表中找出空闲的通道,将该客户端关联到该通道, 然后从主线程中移除该socket, 由工作线程接管该socket的操作;
后续有客户端如访问已经存在的通道,则主线程会将该请求直接分配给对应的工作线程处理;

注意: 主线程的工作到此结束,不要执行lookupServerMediaSession的操作;

在工作线程中, 接管客户端的socket后, 马上执行lookupServerMediaSession, 在该函数中,将后缀回调给上层调用程序, 由上层调用程序判断是否存在该通道,如不存在则返回失败,如存在则向前端取流,然后填充媒体信息返回成功, 库内部则创建相应的mediasession, 再回应客户端, 后续的则完成整个rtsp流程的交互;

注意: 创建MediaSession时,必须将工作线程中的UsageEnvironment传进去, 不能使用主线程中的envir();

int RTSPServer::RTSPClientConnection
::handleCmd_DESCRIBE(UsageEnvironment *pEnv, char const* urlPreSuffix, char const* urlSuffix, char const* fullRequestStr, LIVE_THREAD_TASK_T **pThreadTask)
{
int handleCmdRet = 0; ServerMediaSession* session = NULL;
char* sdpDescription = NULL;
char* rtspURL = NULL;
do {
char urlTotalSuffix[2*RTSP_PARAM_STRING_MAX];
// enough space for urlPreSuffix/urlSuffix'\0'
urlTotalSuffix[0] = '\0';
if (urlPreSuffix[0] != '\0') {
strcat(urlTotalSuffix, urlPreSuffix);
strcat(urlTotalSuffix, "/");
}
strcat(urlTotalSuffix, urlSuffix); if (!authenticationOK("DESCRIBE", urlTotalSuffix, fullRequestStr)) break; // We should really check that the request contains an "Accept:" #####
// for "application/sdp", because that's what we're sending back ##### _TRACE(TRACE_LOG_DEBUG, "handleCmd_DESCRIBE socket[%d]\n", this->fOurSocket); #ifdef LIVE_MULTI_THREAD_ENABLE //如果当前是主线程,则进入到查找通道流程
if (pEnv->GetEnvirId() == 1000)
{
fOurServer.LockClientConnection(); //Lock UsageEnvironment *pChEnv = fOurServer.GetEnvBySuffix(urlSuffix, this, pThreadTask);
if (NULL == pChEnv)
{
fOurServer.UnlockClientConnection(); //Unlock handleCmdRet = -1; this->assignSink = False;
this->pEnv = NULL;
handleCmd_notFound();
break;
}
else
{
_TRACE(TRACE_LOG_DEBUG, "将socket[%d] 关联到[%s]\n", this->fOurSocket, pChEnv->GetEnvirName()); //将socket从主线程移到工作线程中
UsageEnvironment *pMainEnv = &envir();
envir().taskScheduler().disableBackgroundHandling(fOurSocket); fOurServer.UnlockClientConnection(); //Unlock return 1000;
} break;
} #endif // Begin by looking up the "ServerMediaSession" object for the specified "urlTotalSuffix": //在工作线程中执行 lookupServerMediaSession
session = fOurServer.lookupServerMediaSession(pEnv, 1, this, urlTotalSuffix);
if (session == NULL) { //pChEnv->taskScheduler().disableBackgroundHandling(fOurSocket); _TRACE(TRACE_LOG_DEBUG, "socket[%d] 在[%s]中, 源未就绪[%s]\n", this->fOurSocket, pEnv->GetEnvirName(), urlTotalSuffix); this->assignSink = False;
this->pEnv = NULL; handleCmdRet = -1; //envir().taskScheduler().disableBackgroundHandling(fOurSocket); //fOurServer.ResetEnvBySuffix(urlSuffix, this); handleCmd_notFound(); break;
} session->incrementReferenceCount(); // Then, assemble a SDP description for this session:
sdpDescription = session->generateSDPDescription(fOurIPVer);
if (sdpDescription == NULL) {
// This usually means that a file name that was specified for a
// "ServerMediaSubsession" does not exist.
setRTSPResponse("404 File Not Found, Or In Incorrect Format"); break;
} unsigned sdpDescriptionSize = strlen(sdpDescription); // Also, generate our RTSP URL, for the "Content-Base:" header
// (which is necessary to ensure that the correct URL gets used in subsequent "SETUP" requests).
rtspURL = fOurRTSPServer.rtspURL(session, fOurIPVer, fClientInputSocket); snprintf((char*)fResponseBuffer, sizeof fResponseBuffer,
"RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
"%s"
"Content-Base: %s/\r\n"
"Content-Type: application/sdp\r\n"
"Content-Length: %d\r\n\r\n"
"%s",
fCurrentCSeq,
dateHeader(),
rtspURL,
sdpDescriptionSize,
sdpDescription);
} while (0); if (session != NULL) {
// Decrement its reference count, now that we're done using it:
session->decrementReferenceCount();
if (session->referenceCount() == 0 && session->deleteWhenUnreferenced()) {
fOurServer.removeServerMediaSession(pEnv, session, True);
} session->SetStreamStatus(1); //置标志,让后续访问该通道的客户端可以得到迅速响应
} delete[] sdpDescription;
delete[] rtspURL; return handleCmdRet;
}

历经2个多月,终于将多线程问题搞定. 在此记录一下, 欢迎探讨;

live555技术交流

邮件:289042893@qq.com

live555技术交流群:475947825

最新文章

  1. VS2010/VS2013中ashx代码折叠的问题
  2. [JSP]获取时间
  3. .net 控件
  4. Velocity(2)——引用
  5. chubu
  6. STL容器迭代器失效分析
  7. Web Api 返回参数,实现统一标准化!
  8. VIM学习1
  9. magento寄存器的使用
  10. TempDB问题定位与解决
  11. 解决ssm项目表单数据提交到数据库乱码问题
  12. ViewPager实现首次进入软件时左右滑屏的软件展示效果
  13. 带问号的括号匹配问题918C 1153C
  14. 腾讯、爱奇艺、优酷等vip视频在线解析
  15. redis远程连接不上解决办法
  16. 线程安全之CAS机制详解(分析详细,通俗易懂)
  17. matplotlib-区域填充
  18. LoadRunner11录制脚本出现的问题
  19. 软件常用设置(VC, eclipse ,nodejs)---自己备用
  20. asp.net微信支付发起页面jsapi.aspx

热门文章

  1. Java程序中的Log文件配置
  2. [转载]JAVA调用Shell脚本
  3. MySQL 事件EVENT
  4. Pixhawk---fatal: Not a git repository (or any of the parent directories)
  5. 原始Ajax
  6. Dungeon Master ZOJ 1940【优先队列+广搜】
  7. UIScrollView翻书效果
  8. 安装Glass Box代理程序
  9. Android布局中 android:layout_gravity=&quot;bottom&quot;为何不起作用?
  10. vim 基本常用设置