ZeroMQ,史上最快的消息队列 —– ZMQ的学习和研究

ZeroMQ 的模式

[架构] ZeroMQ 深度探索(一)

 消息队列ZeroMQ

服务端使用流程:

  

void* m_Context;
void* m_sktMsgVideoFrame; m_sktMsgVideoFrame = zmq_socket(m_Context,ZMQ_PUB);
int ret = zmq_bind(m_sktMsgVideoFrame,bytesMsgVideoFrameAddress.data()); zmq_msg_t msg; zmq_msg_init_size(&msg,frameBuffSize+MIN_MSG_LEN);
memcpy(zmq_msg_data(&msg),MSG_VIDEO_FRAME,5); memcpy((char*)zmq_msg_data(&msg)+5, frameBuff, frameBuffSize); // int iRet = zmq_msg_send(&msg, (zmq_msg_t*)m_sktMsgVideoFrame, 0); zmq_msg_close(&msg); zmq_close(m_sktMsgVideoFrame);
zmq_ctx_shutdown(m_Context);

  

客户端流程:

QByteArray bytesMsgControlAddress = g_strMsgControlAddress.toUtf8();
QByteArray bytesMsgReqCaptureRetAddress = g_strMsgReqCaptureRetAddress.toUtf8();
QByteArray bytesMsgVideoFrameAddress = g_strMsgVideoFrameAddress.toUtf8(); void* context = zmq_ctx_new(); void* m_pSktMsgControl = zmq_socket(context, ZMQ_SUB);
int ret = zmq_connect(m_pSktMsgControl,bytesMsgControlAddress.data());
qDebug()<<"MSG_CONTROL PULL 连接地址:"<<g_strMsgControlAddress;
qDebug()<<"MSG_CONTROL PULL 连接结果:"<<ret;
ret = zmq_setsockopt(m_pSktMsgControl, ZMQ_SUBSCRIBE, "", );/// 必须添加该语句对消息滤波,否则接受不到消息 void* m_pSktMsgReqCaptureRet = zmq_socket(context, ZMQ_SUB);
ret = zmq_connect(m_pSktMsgReqCaptureRet, bytesMsgReqCaptureRetAddress.data());
qDebug()<<"MSG_REQ_CAPUTRE_RET PULL 连接地址:"<<g_strMsgReqCaptureRetAddress;
qDebug()<<"MSG_REQ_CAPUTRE_RET PULL 连接结果:"<<ret; ret = zmq_setsockopt(m_pSktMsgReqCaptureRet, ZMQ_SUBSCRIBE, "", );/// 必须添加该语句对消息滤波,否则接受不到消息 void* m_pSktVideoFrame = zmq_socket(context, ZMQ_SUB);
ret = zmq_connect(m_pSktVideoFrame, bytesMsgVideoFrameAddress.data()); qDebug()<<"MSG_VIDEO_FRAME PULL 连接地址:"<<g_strMsgVideoFrameAddress;
qDebug()<<"MSG_VIDEO_FRAME PULL 连接结果:"<<ret; ret = zmq_setsockopt(m_pSktVideoFrame, ZMQ_SUBSCRIBE, "", );/// 必须添加该语句对消息滤波,否则接受不到消息 while (true)
{
//一直监听来自其他模块的PUSH消息,采用非阻塞模式
zmq_msg_t msg;
int responseLen = zmq_msg_init(&msg);
responseLen = zmq_msg_recv(&msg,receiver, ZMQ_DONTWAIT);
if(responseLen<MIN_MSG_LEN)
{
//qDebug()<<"收到消息长度小于最小消息长度,本次消息无效.";
zmq_msg_close(&msg);
return;
} BYTE* pStr = (BYTE*)malloc(responseLen);
memcpy(pStr, zmq_msg_data(&msg), responseLen); char head[MIN_MSG_LEN+] = {};
memcpy(head, pStr, MIN_MSG_LEN); QString headStr = QString::fromUtf8(head);
if (headStr.indexOf(MSG_GPS) != -)
{
//dosomething.....
} zmq_msg_close(&msg); QThread::msleep();
} zmq_close(m_pSktMsgControl);
zmq_close(m_pSktMsgReqCaptureRet);
zmq_close(m_pSktVideoFrame); zmq_ctx_shutdown(context);

最新文章

  1. DevExpress免费公开课,讲解即将发布的16.2新版功能
  2. Sublime Text 3 3126 注册码 + 下载地址
  3. CORS(跨域资源共享)
  4. iOS后台定位,实时向服务器发送最新位置
  5. JavaScript事件---事件绑定和深入
  6. 针对ASP.NET页面实时进行GZIP压缩优化的几款压缩模块的使用简介及应用测试!(附源码)
  7. Hibernate 实体关联关系映射【转】
  8. C++中的链表节点用模板类和用普通类来实现的区别
  9. Grunt 认识
  10. java:比较对象
  11. Topology拓扑
  12. python 读入
  13. angular的数据双向绑定秘密
  14. Appium 服务命令行参数
  15. github部分有意思的库记录
  16. 吐槽CSDN--想钱想疯了--阅读全文需要关闭广告屏蔽
  17. python名片管理
  18. rbac(基于角色权限控制)-------权限管理
  19. Codeforces Round #470 (rated, Div. 2, based on VK Cup 2018 Round 1) C.Producing Snow
  20. (转)mysql百万级以上查询优化

热门文章

  1. react+spring 记录跨域问题的解决方法
  2. 【python】python中的json、字典dict
  3. 从零自学Java-2.初步理解Java程序使如何工作的
  4. python常用模块之string
  5. 使用SQL Server Management Studio操作replication时,要用机器名登录,不要用IP地址
  6. NPOI 导出Excel 数据方式
  7. QT学习2
  8. 【转】Mybatis学习---MyBatis知识、原始Dao开发和mapper代理开发
  9. 3.2Python的循环结构语句:
  10. div设置contenteditable=&quot;true&quot; 光标消失:原因