python mqtt通信(windows)
一、消息队列服务器
这里我用到activemq,可到官网下载 //activemq.apache.org/
1. 若遇到点击apache-activemq-5.16.2\bin\activemq.bat 出现闪退,64位系统请点击apache-activemq-5.16.2\bin\win64\activemq.bat,启动mqtt服务器
ActiveMQ启动闪退的问题可见 //blog.csdn.net/pavel101/article/details/79460672
2. ActiveMQ 默认用户名和密码
用户名:admin 密码:admin
可以在/conf/users.properties中寻找。
默认登录地址://localhost:8161/admin/,这里mqtt默认端口为1883,ip为192.168.1.103
二、封装客户端
1 import paho.mqtt.client as mqtt 2 3 import logging 4 5 class MqttClient(mqtt.Client): 6 7 def initClient(self, mqttServer, mqttPort, username, password, timeout=10000): 8 logging.basicConfig(level=logging.DEBUG) 9 10 self.mqttServer = mqttServer 11 self.mqttPort = mqttPort 12 self.username_pw_set(username, password=password) 13 14 self.connect(self.mqttServer, self.mqttPort, timeout) # keeplive仅为10000秒 15 self.on_connect = self.on_connect 16 17 def getClient(self): 18 return self.client 19 20 def on_connect(self, client, userdata, flags, rc): 21 linkAddr = client.mqttServer + ":"+ str(client.mqttPort) 22 if rc == 0: 23 logging.info("与mqtt服务器:"+ linkAddr +"连接成功") 24 elif rc == 1: 25 logging.error("协议版本错误") 26 elif rc == 2: 27 logging.error("无效的客户端标识") 28 elif rc == 3: 29 logging.error("服务端无法使用") 30 elif rc == 4: 31 logging.error("与mqtt服务器连接失败: 错误的用户名或密码 ") 32 elif rc == 5: 33 logging.error("登录用户未经授权 ") 34 else: 35 logging.error("与mqtt服务器:%s 连接返回异常结果:%s " % (linkAddr, str(rc))) 36 37 def on_subscribe(self, client, userdata, mid, granted_qos): 38 logging.info("订阅成功: " + str(mid) + " " + str(granted_qos)) 39 40 def on_publish(self, client, userdata, mid): 41 logging.info("OnPublish, mid: " + str(mid))
三、下面模拟客户端1和客户端2通信,客户端1发布信息,客户端2接收信息
客户端1代码
1 #!/usr/bin/python 2 import datetime 3 import json 4 import logging 5 import time 6 7 from mqttService.mqttClient import MqttClient 8 9 #------------------客户端1-------------------- 10 # ====================================================== 11 12 13 # 服务器地址 14 mqttServer = "192.168.1.103" 15 # 通信端口 16 mqttPort = 1883 17 # 订阅主题 18 devTopic = '/devices/dev1' 19 responseDevTopic= '/7.0/dev1' 20 21 22 # 接收客户端2响应信息 23 def on_message(client, userdata, message): 24 curtime = datetime.datetime.now() 25 strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S") 26 logging.info("%s: 接收 %s 响应信息:主题:%s 内容:%s" %(strcurtime, mqttServer+":"+str(mqttPort), message.topic, str(message.payload, encoding = "utf-8"))) 27 28 29 if __name__ == '__main__': 30 logging.basicConfig(level=logging.DEBUG) 31 32 username = "UserName1" 33 password = "PassWord1" 34 35 client = MqttClient() 36 client.initClient(mqttServer, mqttPort, username, password) 37 client.subscribe(responseDevTopic, qos=0) #订阅主题 38 client.on_message = on_message 39 40 msg = 'hello world' 41 for i in range(1000): 42 client.publish(devTopic, payload=msg , qos=0) #发布信息 43 time.sleep(4)
client.loop_forever() # 持续连接
客户端2代码
#!/usr/bin/python import datetime import json import logging from mqttService.mqttClient import MqttClient #------------------客户端2------------------- # 订阅主题 devTopic = '/devices/#'
# 发布主题
responseDevTopicPrefix = '/7.0/' def on_message(client, userdata, message): curtime = datetime.datetime.now() strcurtime = curtime.strftime("%Y-%m-%d %H:%M:%S") recvMsg = message.payload # 获取发送设备的设备ID topicArr = str(message.topic).split("/") topicArr = [item for item in filter(lambda x: x != '', topicArr)] # 去除空串 deviceId = topicArr[1] logging.info("%s: 接收硬件装置 %s 信息:主题: %s 内容: %s" %(strcurtime, deviceId, message.topic, recvMsg)) # 发送响应回硬件终端 client.publish(responseDevTopicPrefix +deviceId, payload="收到数据"+recvMsg, qos=0)if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) mqttAddr = '192.168.1.103' mqttPort = 1883 username = "UserName2" password = "PassWord2" # 与客户端1通信 client = MqttClient() client.initClient(mqttAddr, mqttPort, username, password) client.subscribe(devTopic, qos=0) client.on_message = on_message client.loop_start() # 开始监听 # 阻塞主程序 while True: pass