本文梳理筆者 MQ 知識,從消息中間件的基礎知識講起,在有了基礎知識后,對市面上各主流的消息中間件進行詳細的解析,包括 RabbitMQ、RocketMQ、Kafka、Pulsar,最后再橫向對比這幾款主流的消息中間件。本篇是系列文章第二篇。?第一篇:關于MQ,你了解多少
RocketMQ
基礎概念
Tag
Tag(標簽)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標簽,同一業務模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分為:交易創建消息、交易完成消息等,一條消息可以沒有 Tag 。標簽有助于保持你的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統提供幫助。
Group
RocketMQ 中,訂閱者的概念是通過消費組(Consumer Group)來體現的。每個消費組都消費主題中一份完整的消息,不同消費組之間消費進度彼此不受影響,也就是說,一條消息被 Consumer Group1 消費過,也會再給 Consumer Group2 消費。消費組中包含多個消費者,同一個組內的消費者是競爭消費的關系,每個消費者負責消費組內的一部分消息。默認情況,如果一條消息被消費者 Consumer1 消費了,那同組的其他消費者就不會再收到這條消息。
Offset
在 Topic 的消費過程中,由于消息需要被不同的組進行多次消費,所以消費完的消息并不會立即被刪除,這就需要? RocketMQ 為每個消費組在每個隊列上維護一個消費位置(Consumer Offset),這個位置之前的消息都被消費過,之后的消息都沒有被消費過,每成功消費一條消息,消費位置就加一。也可以這么說,Queue 是一個長度無限的數組,Offset 就是下標。
RocketMQ 架構
RabbitMQ 類似有生產階段、存儲階段、消費階段,相較 RabbitMQ 的架構,增加了 NameServer 集群,橫向拓展能力較好。參考的 Kafka 做的設計,故也同樣擁有 NIO、PageCache、順序讀寫、零拷貝的技能,單機的吞吐量在十萬級,橫向拓展能力較強,官方聲明集群下能承載萬億級吞吐。??
存儲階段,可以通過配置可靠性優先的 Broker 參數來避免因為宕機丟消息,簡單說就是可靠性優先的場景都應該使用同步。??
1、消息只要持久化到 CommitLog(日志文件)中,即使 Broker 宕機,未消費的消息也能重新恢復再消費。?
2、Broker 的刷盤機制:同步刷盤和異步刷盤,不管哪種刷盤都可以保證消息一定存儲在 Pagecache 中(內存中),但是同步刷盤更可靠,它是 Producer 發送消息后等數據持久化到磁盤之后再返回響應給 Producer。?
Broker 通過主從模式來保證高可用,Broker 支持 Master 和 Slave 同步復制、Master 和 Slave 異步復制模式,生產者的消息都是發送給 Master,但是消費既可以從 Master 消費,也可以從 Slave 消費。同步復制模式可以保證即使 Master 宕機,消息肯定在 Slave 中有備份,保證了消息不會丟失。??
Consumer 的配置文件中,并不需要設置是從 Master 讀還是從 Slave 讀,當 Master 不可用或者繁忙的時候, Consumer 的讀請求會被自動切換到從 Slave。有了自動切換 Consumer 這種機制,當一個 Master 角色的機器出現故障后,Consumer 仍然可以從 Slave 讀取消息,不影響 Consumer 讀取消息,這就實現了讀的高可用。? ??
如何達到發送端寫的高可用性呢?在創建 Topic 的時候,把 Topic 的多個 Message Queue 創建在多個 Broker 組上(相同 Broker 名稱,不同 BrokerId 機器組成 Broker 組),這樣當 Broker 組的 Master 不可用后,其他組Master 仍然可用, Producer 仍然可以發送消息。
此架構下的 RocketMQ 不支持把 Slave 自動轉成 Master ,如果機器資源不足,需要把 Slave 轉成 Master ,則要手動停止 Slave 色的 Broker ,更改配置文件,用新的配置文件啟動 Broker。由此,在高可用場景下此問題變得棘手,故需要引入分布式算法的實現,追求 CAP,但實踐情況是不能同事滿足 CA的,在互聯網場景下較多是在時間 BASE 理論,優先滿足 AP,盡可能去滿足 C。RocketMQ 引入的是實現 Raft 算法的 Dledger,擁有了選舉能力,主從切換,架構拓撲圖是這樣的:
分布式算法中比較常常聽到的是 Paxos 算法,但是由于 Paxos 算法難于理解,且實現比較困難,所以不太受業界歡迎。然后出現新的分布式算法 Raft,其比 Paxos 更容易懂與實現,到如今在實際中運用的也已經很成熟,不同的語言都有對其的實現。Dledger 就是其中一個 Java 語言的實現,其將算法方面的內容全部抽象掉,這樣開發人員只需要關系業務即可,大大降低使用難度。
事務消息
生產者將消息發送至 Apache RocketMQ 服務端。
Apache RocketMQ 服務端將消息持久化成功之后,向生產者返回 Ack 確認消息已經發送成功,此時消息被標記為"暫不能投遞",這種狀態下的消息即為半事務消息。
生產者開始執行本地事務邏輯。
生產者根據本地事務執行結果向服務端提交二次確認結果(Commit 或是 Rollback),服務端收到確認結果后處理邏輯如下:
二次確認結果為 Commit:服務端將半事務消息標記為可投遞,并投遞給消費者。
二次確認結果為 Rollback:服務端將回滾事務,不會將半事務消息投遞給消費者。
在斷網或者是生產者應用重啟的特殊情況下,若服務端未收到發送者提交的二次確認結果,或服務端收到的二次確認結果為 Unknown 未知狀態,經過固定時間后,服務端將對消息生產者即生產者集群中任一生產者實例發起消息回查。說明 服務端回查的間隔時間和最大回查次數,請參見[參數限制](https://rocketmq.apache.org/zh/docs/introduction/03limits/)。
生產者收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
生產者根據檢查到的本地事務的最終狀態再次提交二次確認,服務端仍按照步驟4對半事務消息進行處理。
事務消息生命周期
初始化:半事務消息被生產者構建并完成初始化,待發送到服務端的狀態。
事務待提交:半事務消息被發送到服務端,和普通消息不同,并不會直接被服務端持久化,而是會被單獨存儲到事務存儲系統中,等待第二階段本地事務返回執行結果后再提交。此時消息對下游消費者不可見。
消息回滾:第二階段如果事務執行結果明確為回滾,服務端會將半事務消息回滾,該事務消息流程終止。
提交待消費:第二階段如果事務執行結果明確為提交,服務端會將半事務消息重新存儲到普通存儲系統中,此時消息對下游消費者可見,等待被消費者獲取并消費。
消費中:消息被消費者獲取,并按照消費者本地的業務邏輯進行處理的過程。此時服務端會等待消費者完成消費并提交消費結果,如果一定時間后沒有收到消費者的響應,Apache RocketMQ 會對消息進行重試處理。具體信息,請參見消費重試。
消費提交:消費者完成消費處理,并向服務端提交消費結果,服務端標記當前消息已經被處理(包括消費成功和失敗)。Apache RocketMQ 默認支持保留所有消息,此時消息數據并不會立即被刪除,只是邏輯標記已消費。消息在保存時間到期或存儲空間不足被刪除前,消費者仍然可以回溯消息重新消費。
消息刪除:Apache RocketMQ 按照消息保存機制滾動清理最早的消息數據,將消息從物理文件中刪除。更多信息,請參見消息存儲和[清理機制](https://rocketmq.apache.org/zh/docs/featureBehavior/11messagestorepolicy/)。
RocketMQ 新發展
在過去“分”往往是技術實現的妥協,而現在“合”才是用戶的真正需求。RocketMQ 5.0 基于統一 Commitlog 擴展多元化索引,包括時間索引、百萬隊列索引、事務索引、KV索引、批量索引、邏輯隊列等技術。在場景上同時支撐了 RabbitMQ、Kafka、MQTT、邊緣輕量計算等產品能力,努力實現“消息、事件、流”的擴展支持,云原生是主流。
更多信息可查看官網 [Apache RocketMQ](https://rocketmq.apache.org/zh/)。
Kafka
Kafka 是一個分布式系統,由通過高性能 TCP 網絡協議進行通信的服務器和客戶端組成。它可以部署在本地和云環境中的裸機硬件、虛擬機和容器上。
服務器:Kafka 作為一個或多個服務器集群運行,可以跨越多個數據中心或云區域。其中一些服務器形成存儲層,稱為代理。其他服務器運行 Kafka Connect 以事件流的形式持續導入和導出數據,以將 Kafka 與您現有的系統(例如關系數據庫以及其他 Kafka 集群)集成。為了讓您實現關鍵任務用例,Kafka 集群具有高度可擴展性和容錯性:如果其中任何一臺服務器發生故障,其他服務器將接管它們的工作以確保連續運行而不會丟失任何數據。
客戶端:它們允許您編寫分布式應用程序和微服務,即使在出現網絡問題或機器故障的情況下,也能以容錯的方式并行、大規模地讀取、寫入和處理事件流。Kafka 附帶了一些這樣的客戶端,這些客戶端由 Kafka 社區提供的 數十個客戶端進行了擴充:客戶端可用于 Java 和 Scala,包括更高級別的 Kafka Streams 庫,用于 Go、Python、C/C++ 和許多其他編程語言以及 REST API。
架構
與前面兩個 MQ 類似有生產階段、存儲階段、消費階段,相比 RocketMQ 這里的注冊中心是用的 Zookeeper,Kafka 的諸多事件都依賴于 ZK,元數據管理、各個角色的注冊、心跳、選舉、狀態維護,這里的角色包括 Boker、 Topic、 Partition、 消費者組等。??
所以這里也會帶來 ZK Watch 事件壓力過大的問題,大量的 ZK 節點事件阻塞在隊列中, 導致自旋鎖, 導致 CPU 上升, 由于大量數量事件對象導致占用了大量的內存。
圖中的 Controller 是 Kakfa 服務端 Broker 的概念,Broker 集群有多臺,但只有一臺 Broker 可以扮演控制器的角色;某臺 Broker 一旦成為 Controller,它用于以下權力:完成對集群成員管理、主題維護和分區的管理,如集群 Broker 信息、Topic 維護、Partition 維護、分區選舉 ISR、同步元信息給其他 Broker 等。
存儲
Topic 是邏輯上的概念,而 Partition 是物理上的概念,即一個 Topic 劃分為多個 Partition,每個 Partition 對應一個Log文件。?
.log文件:存儲消息數據的文件。? ??
.index文件:索引文件,記錄一條消息在log文件中的位置。??
.snapshot文件:記載著生產者最新的offset。??
.timeindex時間索引文件:當前日志分段文件中建立索引的消息的時間戳,是在 0.10.0 版本后增加的,用于根據時間戳快速查找特定消息的位移值,優化 Kafka 讀取歷史消息緩慢的問題。為了保證時間戳的單調遞增,可以將log.message.timestamp.type 設置成 logApendTime,而 CreateTime 不能保證是消息寫入時間。??
上圖是三個 Broker、兩個 Topic、兩個 Partition 的 Broker ?的存儲情況,可以延伸想象一下百萬級 Topic 的存儲情況會很復雜。
Rebalnce 問題
為了解決強依賴 Zookeeper 進行 Rebalance 帶來的問題,Kafka 引入了 Coordinator 機制。??
首先,觸發 Rebalance (再均衡)操作的場景目前分為以下幾種:消費者組內消費者數量發生變化,包括:?
有新消費者加入
有消費者宕機下線,包括真正宕機,或者長時間 GC、網絡延遲導致消費者未在超時時間內向 GroupCoordinator 發送心跳,也會被認為下線。??
有消費者主動退出消費者組(發送 LeaveGroupRequest 請求) 比如客戶端調用了 unsubscrible() 方法取消對某些主題的訂閱
消費者組對應的 GroupCoordinator 節點發生了變化。??
消費者組訂閱的主題發生變化(增減)或者主題分區數量發生了變化。??
節點擴容?
更多信息可查看 Kafka 官網 [Apache Kafka](https://kafka.apache.org/)
Pulsar
在最高層,一個 Pulsar 實例由一個或多個 Pulsar 集群組成。一個實例中的集群可以在它們之間復制數據。
在 Pulsar 集群中:
一個或多個 Broker 處理和負載平衡來自生產者的傳入消息,將消息分派給消費者,與 Pulsar 配置存儲通信以處理各種協調任務,將消息存儲在 BookKeeper 實例(又名 bookies)中,依賴于特定集群的 ZooKeeper 集群用于某些任務等等。
由一個或多個 Bookie 組成的 BookKeeper 集群處理消息的持久存儲。
特定于該集群的 ZooKeeper 集群處理 Pulsar 集群之間的協調任務。
下圖展示了一個 Pulsar 集群:
?
Pulsar 用 Apache BookKeeper 作為持久化存儲,Broker 持有 BookKeeper client,把未確認的消息發送到 BookKeeper 進行保存。
BookKeeper 是一個分布式的 WAL(Write Ahead Log)系統,Pulsar 使用 BookKeeper 有下面幾個便利:
可以為 Topic 創建多個 Ledgers:Ledger 是一個只追加的數據結構,并且只有一個 Writer,這個 Writer 負責多個 BookKeeper 存儲節點(就是 Bookies)的寫入。Ledger 的條目會被復制到多個 Bookies;
Broker 可以創建、關閉和刪除 Ledger,也可以追加內容到 Ledger;
Ledger 被關閉后,只能以只讀狀態打開,除非要明確地寫數據或者是因為 Writer 掛掉導致的關閉;
Ledger 只能有 Writer 這一個進程寫入,這樣寫入不會有沖突,所以寫入效率很高。如果 Writer 掛了,Ledger 會啟動恢復進程來確定 Ledger 最終狀態和最后提交的日志,保證之后所有 Ledger 進程讀取到相同的內容;??
除了保存消息數據外,還會保存 Cursors,也就是消費端訂閱消費的位置。這樣所有 Cursors 消費完一個 Ledger 的消息后這個 Ledger 就可以被刪除,這樣可以實現 Ledgers 的定期翻滾從頭寫。
節點對等
從架構圖可以看出,Broker 節點不保存數據,所有 Broker 節點都是對等的。如果一個 Broker 宕機了,不會丟失任何數據,只需要把它服務的 Topic 遷移到一個新的 Broker 上就行。??
Broker 的 Topic 擁有多個邏輯分區,同時每個分區又有多個 Segment。??
Writer 寫數據時,首先會選擇 Bookies,比如圖中的 Segment1。選擇了 Bookie1、Bookie2、Bookie4,然后并發地寫下去。這樣這 3 個節點并沒有主從關系,協調完全依賴于 Writer,因此它們也是對等的。
擴展和擴容
在遇到雙十一等大流量的場景時,必須增加 Consumer。
這時因為 Broker 不存儲任何數據,可以方便的增加 Broker。Broker 集群會有一個或多個 Broker 做消息負載均衡。當新的 Broker 加入后,流量會自動從壓力大的 Broker 上遷移過來。??
對于 BookKeeper,如果對存儲要求變高,比如之前存儲 2 個副本現在需要存儲 4 個副本,這時可以單獨擴展 Bookies 而不用考慮 Broker。因為節點對等,之前節點的 Segment 又堆放整齊,加入新節點并不用搬移數據。Writer 會感知新的節點并優先選擇使用。
容錯機制
對于 Broker,因為不保存任何數據,如果節點宕機了就相當于客戶端斷開,重新連接其他的 Broker 就可以了。
對于 BookKeeper,保存了多份副本并且這些副本都是對等的。因為沒有主從關系,所以當一個節點宕機后,不用立即恢復。后臺有一個線程會檢查宕機節點的數據備份進行恢復。
在遇到雙十一等大流量的場景時,必須增加 Consumer。
這時因為 Broker 不存儲任何數據,可以方便的增加 Broker。Broker 集群會有一個或多個 Broker 做消息負載均衡。當新的 Broker 加入后,流量會自動從壓力大的 Broker 上遷移過來。??
對于 BookKeeper,如果對存儲要求變高,比如之前存儲 2 個副本現在需要存儲 4 個副本,這時可以單獨擴展 Bookies 而不用考慮 Broker。因為節點對等,之前節點的 Segment 又堆放整齊,加入新節點并不用搬移數據。Writer 會感知新的節點并優先選擇使用。
Pulsar 可以使用多租戶來管理大集群。Pulsar 的租戶可以跨集群分布,每個租戶都可以有單獨的認證和授權機制。租戶也是存儲配額、消息 TTL 和隔離策略的管理單元。
在和其他組件或者生態對接方面,Pulsar 可以支持很多種消息協議,對于存量系統的MQ首次接入、切換MQ都很方便。
更多信息可查看 Pulsar 官網?[Apache Pulsar](https://pulsar.apache.org/)
對比
此圖摘抄自《面渣逆襲:RocketMQ二十三問》
這個圖沒有 Pulsar 的信息,從網上看到的壓測報告來看,Pulsar 吞吐量大概是 Kafka 的兩倍左右,延遲表現比 Kafka 低不少,Pulsar 的 I/O 隔離顯著優于 Kafka。比較詳實的 Pulsar 和 Kafka 的比對可以查閱 StreamNative 的文章《Pulsar和Kafka基準測試:Pulsar 性能精準解析(完整版)》,StreamNative 作為 Apache Pulsar 的商業化公司,數據和結果還是比較可靠的。
進階
常言道,最好的學習方法是帶著問題去尋找答案,在路上撿拾更多果實,增加經驗值,快速升級。很多人推薦費曼學習法,以教代學,按可以教別人的標準來學習,最終產出教學內容為目的來學習一個知識,能讓自己高效學習。在我看來這很像績效考核用的 OKR 工具,為項目設定關鍵成果,實現成功應該做什么?怎么做?而我寫這篇文章是在實踐費曼學習法。??
所以,在這里我給出幾個問題,讀者可以根據自己的興趣愛好帶著問題去尋找答案吧。??
如何保證消息的可用性/可靠性/不丟失呢?
如何處理消息重復的問題呢?
順序消息如何實現?
怎么處理消息積壓?
怎么實現分布式消息事務的?半消息?
如何實現消息過濾?
如果自己平時想到的問題太多,不知道先看哪一個,那么自己想清楚為什么要學這些知識點,哪個問題對于當前的自己收益最大。
編輯:黃飛
?
評論