在MQTT中發(fā)布和訂閱實體(主題)是MQTT通信的核心操作,下面將詳細介紹其原理、步驟以及示例代碼,幫助你全面理解這一過程。
一、MQTT發(fā)布與訂閱的基本概念
發(fā)布(Publish):客戶端將消息發(fā)送到MQTT代理(Broker)上的特定主題(Topic)。發(fā)布者無需知道有哪些客戶端訂閱了該主題,只需將消息發(fā)送到代理即可。
訂閱(Subscribe):客戶端向MQTT代理注冊感興趣的主題,當(dāng)有消息發(fā)布到這些主題時,代理會將消息推送給訂閱者。訂閱者可以訂閱一個或多個主題。
主題(Topic):主題是MQTT中的消息分類標識,使用類似文件路徑的字符串表示,例如"home/livingroom/temperature"。主題支持通配符,#表示多級通配符,+表示單級通配符。
二、發(fā)布和訂閱的步驟
(一)建立MQTT連接
在發(fā)布或訂閱消息之前,客戶端需要先與MQTT代理建立連接。這通常涉及以下參數(shù):
代理地址和端口:例如tcp://broker.hivemq.com:1883。
客戶端ID:用于唯一標識客戶端。
用戶名和密碼(可選):用于身份驗證。
QoS等級:消息質(zhì)量服務(wù)等級,分為0(最多一次)、1(至少一次)、2(恰好一次)。
(二)訂閱主題
客戶端通過訂閱特定主題來接收相關(guān)消息。訂閱操作可以指定QoS等級,代理會按照該等級向訂閱者推送消息。
(三)發(fā)布消息
客戶端將消息發(fā)布到指定的主題。發(fā)布時需要指定主題名稱、消息內(nèi)容和QoS等級。
(四)消息接收與處理
訂閱者接收到代理推送的消息后,可以對消息進行處理,例如解析JSON數(shù)據(jù)、更新UI界面等。
(五)斷開連接
當(dāng)客戶端不再需要通信時,可以斷開與MQTT代理的連接。
三、代碼示例
以下是使用Python的paho-mqtt庫實現(xiàn)MQTT發(fā)布和訂閱的示例代碼:
(一)訂閱者代碼
pythonimport paho.mqtt.client as mqtt # 定義連接回調(diào)函數(shù)def on_connect(client, userdata, flags, rc): print("Connected with result code " + str(rc)) # 訂閱主題 client.subscribe("home/livingroom/#") # 使用通配符訂閱多級主題 # 定義消息接收回調(diào)函數(shù)def on_message(client, userdata, msg): print(f"Received message on topic {msg.topic}: {msg.payload.decode()}") # 創(chuàng)建MQTT客戶端實例client = mqtt.Client() # 設(shè)置回調(diào)函數(shù)client.on_connect = on_connectclient.on_message = on_message # 連接到MQTT代理client.connect("broker.hivemq.com", 1883, 60) # 啟動網(wǎng)絡(luò)循環(huán),處理網(wǎng)絡(luò)消息和回調(diào)函數(shù)client.loop_forever()
(二)發(fā)布者代碼
pythonimport paho.mqtt.client as mqttimport time # 創(chuàng)建MQTT客戶端實例client = mqtt.Client() # 連接到MQTT代理client.connect("broker.hivemq.com", 1883, 60) # 發(fā)布消息的函數(shù)def publish_message(topic, payload, qos=0): result = client.publish(topic, payload, qos) # result是一個元組,包含消息ID和中間隊列對象 status = result[0] if status == 0: print(f"Message '{payload}' published to topic '{topic}'") else: print(f"Failed to publish message to topic '{topic}'") # 發(fā)布多條消息topics = ["home/livingroom/temperature", "home/livingroom/humidity", "home/kitchen/temperature"]payloads = ["25.5", "60", "22.0"] for topic, payload in zip(topics, payloads): publish_message(topic, payload, qos=1) # 使用QoS等級1發(fā)布消息 time.sleep(1) # 等待1秒,模擬不同時間點的發(fā)布 # 斷開連接client.disconnect()
四、關(guān)鍵點說明
主題設(shè)計
層次結(jié)構(gòu):使用斜杠(/)分隔主題的各個層級,例如"home/livingroom/temperature"。這種層次結(jié)構(gòu)便于對消息進行分類和管理。
通配符使用:+表示單級通配符,匹配任意一級主題;#表示多級通配符,必須放在主題末尾,匹配任意多級主題。例如,"home/+/temperature"可以匹配"home/livingroom/temperature"和"home/kitchen/temperature",而"home/#"可以匹配"home/livingroom/temperature"、"home/kitchen/light"等所有以"home/"開頭的主題。
QoS等級選擇
QoS 0:最多一次。消息發(fā)送后不等待確認,可能會丟失,適用于對消息可靠性要求不高的場景,如實時性要求較高但允許少量數(shù)據(jù)丟失的環(huán)境監(jiān)測。
QoS 1:至少一次。消息發(fā)送后會等待確認,如果未收到確認會重發(fā),可能會重復(fù)接收消息,適用于對消息可靠性有一定要求但允許少量重復(fù)的場景,如智能家居中的設(shè)備控制指令。
QoS 2:恰好一次。消息發(fā)送和接收會經(jīng)過多次握手確認,確保消息只被接收一次,但開銷較大,適用于對消息可靠性要求極高的場景,如金融交易數(shù)據(jù)傳輸。
錯誤處理
在實際應(yīng)用中,需要處理連接失敗、消息發(fā)布失敗等情況。例如,在連接代理時,可以檢查返回的連接結(jié)果碼,如果連接失敗則進行重試或報警。
通過以上步驟和示例代碼,你可以在MQTT中實現(xiàn)實體(主題)的發(fā)布和訂閱,構(gòu)建高效、可靠的物聯(lián)網(wǎng)通信系統(tǒng)。
審核編輯 黃宇
-
MQTT
+關(guān)注
關(guān)注
5文章
666瀏覽量
23431
發(fā)布評論請先 登錄
KaihongOS操作系統(tǒng):MQTT物聯(lián)網(wǎng)通訊協(xié)議
KaihongOS多實例MQTT接口
《DNESP32S3使用指南-IDF版_V1.6》第五十五章 基于MQTT協(xié)議連接阿里云服務(wù)器
MQTT物聯(lián)網(wǎng)平臺有哪些?有哪些功能?

基于MQTT協(xié)議的車云通信設(shè)計

百問MQTT協(xié)議分析 - MQTT簡述及協(xié)議報文格式組成
低功耗4G模組:MQTT通信功能
給我兩分鐘,搞懂發(fā)布-訂閱模式很輕松!

物聯(lián)網(wǎng)行業(yè)中MQTT通信協(xié)議詳解以及使用

MQTT協(xié)議網(wǎng)關(guān)的工作原理及功能特性

基于ArkTS語言的OpenHarmony APP應(yīng)用開發(fā):公共事件的訂閱和發(fā)布
MQTT網(wǎng)關(guān):物聯(lián)網(wǎng)中的關(guān)鍵橋梁

知識科普 MQTT Broker 代理 是什么

IG902如何連接公有MQTT測試平臺?
來了解一下MQTT Broker代理

評論