一、消息队列服务器

这里我用到activemq,可到官网下载 http://activemq.apache.org/

1. 若遇到点击apache-activemq-5.16.2\bin\activemq.bat 出现闪退,64位系统请点击apache-activemq-5.16.2\bin\win64\activemq.bat,启动mqtt服务器
ActiveMQ启动闪退的问题可见 https://blog.csdn.net/pavel101/article/details/79460672
2. ActiveMQ 默认用户名和密码
用户名:admin 密码:admin
可以在/conf/users.properties中寻找。
默认登录地址:http://localhost:8161/admin/,这里mqtt默认端口为1883,ip为192.168.1.103

二、封装客户端

 1 import paho.mqtt.client as mqtt
2
3 import logging
4
5 class MqttClient(mqtt.Client):
6
7 def initClient(self, mqttServer, mqttPort, username, password, timeout=10000):
8 logging.basicConfig(level=logging.DEBUG)
9
10 self.mqttServer = mqttServer
11 self.mqttPort = mqttPort
12 self.username_pw_set(username, password=password)
13
14 self.connect(self.mqttServer, self.mqttPort, timeout) # keeplive仅为10000秒
15 self.on_connect = self.on_connect
16
17 def getClient(self):
18 return self.client
19
20 def on_connect(self, client, userdata, flags, rc):
21 linkAddr = client.mqttServer + ":"+ str(client.mqttPort)
22 if rc == 0:
23 logging.info("与mqtt服务器:"+ linkAddr +"连接成功")
24 elif rc == 1:
25 logging.error("协议版本错误")
26 elif rc == 2:
27 logging.error("无效的客户端标识")
28 elif rc == 3:
29 logging.error("服务端无法使用")
30 elif rc == 4:
31 logging.error("与mqtt服务器连接失败: 错误的用户名或密码 ")
32 elif rc == 5:
33 logging.error("登录用户未经授权 ")
34 else:
35 logging.error("与mqtt服务器:%s 连接返回异常结果:%s " % (linkAddr, str(rc)))
36
37 def on_subscribe(self, client, userdata, mid, granted_qos):
38 logging.info("订阅成功: " + str(mid) + " " + str(granted_qos))
39
40 def on_publish(self, client, userdata, mid):
41 logging.info("OnPublish, mid: " + str(mid))

三、下面模拟客户端1和客户端2通信,客户端1发布信息,客户端2接收信息

客户端1代码

 1 #!/usr/bin/python
2 import datetime
3 import json
4 import logging
5 import time
6
7 from mqttService.mqttClient import MqttClient
8
9 #------------------客户端1--------------------
10 # ======================================================
11
12
13 # 服务器地址
14 mqttServer = "192.168.1.103"
15 # 通信端口
16 mqttPort = 1883
17 # 订阅主题
18 devTopic = '/devices/dev1'
19 responseDevTopic= '/7.0/dev1'
20
21
22 # 接收客户端2响应信息
23 def on_message(client, userdata, message):
24 curtime = datetime.datetime.now()
25 strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S")
26 logging.info("%s: 接收 %s 响应信息:主题:%s 内容:%s" %(strcurtime, mqttServer+":"+str(mqttPort), message.topic, str(message.payload, encoding = "utf-8")))
27
28
29 if __name__ == '__main__':
30 logging.basicConfig(level=logging.DEBUG)
31
32 username = "UserName1"
33 password = "PassWord1"
34
35 client = MqttClient()
36 client.initClient(mqttServer, mqttPort, username, password)
37 client.subscribe(responseDevTopic, qos=0) #订阅主题
38 client.on_message = on_message
39
40 msg = 'hello world'
41 for i in range(1000):
42 client.publish(devTopic, payload=msg , qos=0) #发布信息
43 time.sleep(4)
    client.loop_forever()  # 持续连接

客户端2代码

 #!/usr/bin/python
import datetime
import json
import logging from mqttService.mqttClient import MqttClient #------------------客户端2------------------- # 订阅主题
devTopic = '/devices/#'
# 发布主题

responseDevTopicPrefix = '/7.0/' def on_message(client, userdata, message):
curtime = datetime.datetime.now()
strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S")
recvMsg = message.payload # 获取发送设备的设备ID
topicArr = str(message.topic).split("/")
topicArr = [item for item in filter(lambda x: x != '', topicArr)] # 去除空串
deviceId = topicArr[1] logging.info("%s: 接收硬件装置 %s 信息:主题: %s 内容: %s" %(strcurtime, deviceId, message.topic, recvMsg)) # 发送响应回硬件终端
client.publish(responseDevTopicPrefix +deviceId, payload="收到数据"+recvMsg, qos=0)if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG) mqttAddr = '192.168.1.103'
mqttPort = 1883 username = "UserName2"
password = "PassWord2"
# 与客户端1通信
client = MqttClient()
client.initClient(mqttAddr, mqttPort, username, password)
client.subscribe(devTopic, qos=0)
client.on_message = on_message
client.loop_start() # 开始监听
# 阻塞主程序
while True:
pass

最新文章

  1. CSS3 Flexbox轻松实现元素的水平居中和垂直居中
  2. Python几种常用的测试框架
  3. highchart 添加新的series
  4. C#语法知识
  5. [BZOJ 3218]a + b Problem
  6. MySQL 通过idb文件恢复Innodb 数据【转】
  7. 【转】 strcpy和memcpy的区别
  8. uva753 A Plug for UNIX
  9. Linux系统下如何查看CPU个数
  10. C++拾遗(一)关于main()函数
  11. poj2262---素数(质数)的判断
  12. jQuery+CSS实现的图片滚动效果
  13. SAP 条形码
  14. JDownload: 一款可以从网络上下载文件的小程序第四篇(整体架构描述)
  15. Java基础-常用的String方法
  16. 面试被问烂的 Spring IOC(求求你别再问了)
  17. excel数据导入mysql
  18. 【PAT】B1070 结绳(25 分)
  19. VMware Workstation Pro 14.1.1 正式版
  20. Git 藏匿操作

热门文章

  1. js--手动实现一个常见的短信验证码输入框
  2. GO文件读写03---使用缓冲读写实现视频文件的拷贝
  3. pytest - 打标记:mark功能
  4. Fine-Tuning微调原理
  5. 基于C语言文件操作的学生成绩管理系统
  6. CArray CList CMap 插入与遍历效率对比
  7. 实验3、Flask数据库操作-如何使用Flask与数据库
  8. UF_LAYER 图层操作
  9. 【NX二次开发】常用的标准对话框
  10. 合宙模块LUA相关资料汇总