RocketMQ 架構簡析

Apache RocketMQ 是阿里開源的一款高

效能、高吞吐量的分散式訊息中介軟體。

RocketMQ 架構簡析

- 整體架構 -

RocketMQ 架構簡析

RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責生產訊息,Consumer 負責消費訊息,Broker 負責儲存訊息。每個 Broker 可以儲存多個Topic的訊息,每個Topic的訊息也可以分片儲存於叢集中的不同的Broker Group。

RocketMQ 架構簡析

-

說道Namesrv首先會想到服務註冊與發現。分散式服務SOA架構體系中會有服務註冊與發現中心。主要作用是指導服務呼叫方找到服務提供者提供的服務例項。RocketMQ體系中Namesrv主要作用是:為producer和consumer提供關於topic的路由資訊。管理broker節點:監控更新broker的實時狀態。路由註冊、路由刪除(故障剔除)。

Namesrv充當路由訊息的提供者。Namesrv是一個幾乎無狀態節點,多個Namesrv例項組成叢集,但相互獨立,沒有資訊交換。

路由元資訊

topicQueueTable:topic 訊息佇列路由資訊。

brokerAddrTable:broker基礎資訊。包含broker name,所屬叢集名稱,主broker地址等。

clusterAddrTable:broker叢集資訊,儲存叢集中所有broker的名稱。

brokerLiveTable:broker狀態資訊。

filterServerTable:broker上的filterServer列表。filterServer用於訊息過濾。

路由註冊 RocketMQ路由註冊是透過broker與Namesrv的心跳功能實現的。broker啟動時向叢集中所有Namesrv傳送心跳包,之後每隔30秒向叢集中所有Namesrv傳送心跳包。心跳包中包含:broker叢集資訊、broker資訊、topic配置資訊、broker關聯的FilterServer列表等。如果brokerA為Master。並且brokerA上的topic1的配置資訊發生變化或初次註冊,Namesrv會根據報文建立或更新Topic路由元資料,填充topicQueueTable。

路由刪除 Namesrv收到brokerA的心跳包會更新brokerLiveTable中的brokerA對應的BrokerLiveInfo中的lastUpdateTimestamp。Namesrv每隔10秒掃描brokerLiveTable一次。如果brokerA對應的BrokerLiveInfo 中 lastUpdateTimestamp距當前時間超過 120秒,Namesrv認為brokerA失效,會將brokerA的路由資訊移除並關閉與broker的socket連線。更新:topicQueueInfo、brokerAddrTable、brokerLiveTable、filterServerTable等。

路由發現 RocketMQ路由發現是非實時的。當Topic路由資訊發生變化是,Namesrv不會主動推送給客戶端(Producer、Consumer)。而是由客戶端定時到Namesrv拉去最新的路由資訊並快取(包含Topic路由資訊)。

與kafka對比

kafka 由zookeeper叢集提供命名服務(Naming Service)。

Kafka透過 ZooKeeper 管理叢集配置、選舉 Leader 以及在 consumer g

RocketMQ 架構簡析

整體架構

訊息中轉角色,負責儲存訊息、轉發訊息。代理伺服器在RocketMQ系統中負責接收從生產者傳送來的訊息並存儲、同時為消費者的拉取請求作準備。代理伺服器也儲存訊息相關的元資料,包括消費者組、消費進度偏移和主題和佇列訊息等。

Broker是以group為單位提供服務。一個group裡面分Master和Slave。Master和Slave儲存的資料一樣,slave從master同步資料(同步雙寫或非同步複製看配置)。一個Master可以對應多個Slave,一個Slave只能對應一個Master。Master與Slave的對應關係透過指定相同的BrokerName、不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。broker不必須是物理機或虛擬機器:

RocketMQ 架構簡析

每個Broker與Namesrv叢集中的所有節點建立長連線,定時傳送心跳包到所有Namesrv,更新broker資訊、topic路由資訊等。一個Topic的不同queue(分割槽)可分佈到叢集中不同的broker group上。

與kafka對比:

kafka和RocketMQ的broker都可以容納多個一個或多個分割槽資料(kafka分割槽:partition;RocketMQ分割槽:queue)。

kafka基於partition(分割槽) 做備份/高可用(partition follower)。

RocketMQ增加了broker group的概念,基於broker(可能包含多個分割槽)。

RocketMQ 架構簡析

-

(訊息)生產者。Producer與Namesrv叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic路由資訊,並向提供Topic服務的broker master建立長連線,且定時向broker master傳送心跳。Producer完全無狀態,可叢集部署。

Producer負責生產訊息,一般由業務系統負責生產訊息。一個訊息生產者會把業務應用系統裡產生的訊息傳送到broker伺服器。RocketMQ提供多種傳送方式,同步傳送、非同步傳送、順序傳送、單向傳送。同步和非同步方式均需要Broker返回確認資訊,單向傳送不需要。

RocketMQ 架構簡析

- Namesrv -

(訊息)消費者 Consumer與Namesrv叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連線,且定時向Master、Slave傳送心跳。Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,訂閱規則由Broker配置決定。

Consumer負責消費訊息,一般是後臺系統負責非同步消費。一個訊息消費者會從Broker伺服器拉取訊息、並將其提供給應用程式。從使用者應用的角度而言提供了兩種消費形式:拉取式消費、推動式消費。

叢集模式下:相同Consumer Group的每個Consumer例項平均分攤訊息。一個條訊息僅能被一個Consumer Group消費一次。

Producer、Consumer都只需要和叢集中一個Namesrv建立長連線。Broker需要向叢集中所有的Namesrv傳送心跳包。

其實很好理解:

Namesrv叢集提供高可用的命名服務。

Producer、Consumer只需要從其中一臺定期同步路由資訊。

如果Broker只隨機調一臺傳送心跳包。那麼不同的Namesrv儲存的路由資訊會出現

消費者型別:

拉取式消費(Pull Consumer) Consumer消費的一種型別,應用通常主動呼叫Consumer的拉訊息方法從Broker伺服器拉訊息、主動權由應用控制。一旦獲取了批次訊息,應用就會啟動消費過程。Pull方式裡,取訊息的過程需要使用者自己寫(包括提交offset等操作)。

推動式消費(Push Consumer) Consumer消費的一種型別,該模式下Broker收到資料後會主動推送給消費端,該消費模式一般實時性較高。Push Consumer原理上也是採取pull模式。實際上就是長輪詢的pull模式。

RocketMQ 架構簡析

-

主題(Topic) 表示一類訊息的集合,每個主題包含若干條訊息,每條訊息只能屬於一個主題,是RocketMQ進行訊息訂閱的基本單位。每個topic可分為若干個分割槽(queue)。

生產者組(Producer Group) 同一類Producer的集合,這類Producer傳送同一類訊息且傳送邏輯一致。如果傳送的是事務訊息且原始生產者在傳送之後崩潰,則Broker伺服器會聯絡同一生產者組的其他生產者例項以提交或回溯消費。

消費者組(Consumer Group) 同一類Consumer的集合,這類Consumer通常消費同一類訊息且消費邏輯一致。消費者組使得在訊息消費方面,實現負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者例項必須訂閱完全相同的Topic。RocketMQ 支援兩種訊息模式:叢集消費(Clustering)和廣播消費(Broadcasting)。

普通順序訊息(Normal Ordered Message) 普通順序消費模式下,消費者透過同一個消費佇列收到的訊息是有順序的,不同訊息佇列收到的訊息則可能是無順序的。

嚴格順序訊息(Strictly Ordered Message) 嚴格順序訊息模式下,消費者收到的所有訊息均是有順序的。

訊息(Message) 訊息系統所傳輸資訊的物理載體,生產和消費資料的最小單位,每條訊息必須屬於一個主題。RocketMQ中每個訊息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了透過Message ID和Key查詢訊息的功能。

標籤(Tag) 為訊息設定的標誌,用於同一主題下區分不同型別的訊息。來自同一業務單元的訊息,可以根據不同業務目的在同一主題下設定不同標籤。標籤能夠有效地保持程式碼的清晰度和連貫性,並最佳化RocketMQ提供的查詢系統。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴充套件性。

RocketMQ 架構簡析

Namesrv

訊息中介軟體需要解決的問題:非同步化、削峰填谷。

訊息中介軟體應具備的基礎能力是:訊息釋出、訂閱、消費。概念相對簡單這裡不過多描述。

訊息中介軟體的一些重要的機制:

-

優先順序是指在一個訊息佇列中,每條訊息都有不同的優先順序,一般用整數來描述,優先順序高的訊息先投遞,如果訊息完全在一個記憶體佇列中,那麼在投遞前可以按照優先順序排序,令優先順序高的先投遞。由於RocketMQ所有訊息都是持久化的,所以如果按照優先順序來排序,開銷會非常大,因此RocketMQ沒有特意支援訊息優先順序,但是可以透過變通的方式實現類似功能,即單獨配置一個優先順序高的佇列,和一個普通優先順序的佇列,將不同優先順序傳送到不同佇列即可。

- Broker -

訊息有序指的是一類訊息消費時,能按照發送的順序來消費。例如:一個訂單產生了3條訊息,分別是訂單建立,訂單付款,訂單完成。消費時,要按照這個順序消費才能有意義。但是同時訂單之間是可以並行消費的。RocketMQ可以嚴格的保證訊息有序。

投遞訊息的順序性:投遞訊息的順序性可透過將一組訊息投遞到同一分割槽實現。例如:藉助MessageQueueSelector將對相同訂單的操作訊息投放到同一分割槽。

消費訊息的順序性:RoctetMQ特性保障:特定分割槽(queue)中的訊息不能同時被同一個消費者組中的多個Consumer消費,以避免重複消費。透過自定義或使用預置的AllocateQueueStrategy可設定分割槽的分配策略(哪些分割槽分配給哪個消費者消費)。

-

3。1 訊息持久化

RocketMQ、Kafka 以檔案記錄形式持久化。

RocketMQ採用了單一的日誌檔案,即把同1個broker上面所有topic的所有queue的訊息,存放在一個檔案裡面,從而避免了隨機的磁碟寫入。

RocketMQ 架構簡析

如上圖所示,所有訊息都存在一個單一的CommitLog檔案裡面,然後有後臺執行緒非同步的同步到ConsumeQueue,再由Consumer進行消費。

TODO 同步、非同步刷盤。

RocketMQ 架構簡析

TODO RocketMQ充分利用Linux檔案系統記憶體cache來提高效能。

TODO CommitLog index Commitlog segment的大小與頁快取一致。

RocketMQ訊息儲存機制會在後面的文章詳細說明。

3。2 broker master/salve

TODO broker group master/salve

TODO Async/Sync Master;

Broker

提高併發效率 => 提高生產、消費並行度=>提高分割槽數量。

RocketMQ、kafka都支援topic資料分割槽存放、動態擴充套件。

以RocketMQ為例:

topic建立的時候可以用叢集模式去建立(這樣集群裡面每個broker的queue的數量相同),也可以用單個broker模式去建立(這樣每個broker的queue數量可以不一致)。

4。1 生產並行度

RocketMQ的生產並行度是由其自身機制及broker的數量決定的。這塊後面的文章會詳細分析。

4。2 消費並行度

廣播模式下所有消費者會接受並消費當前topic下所有Queue的訊息。

叢集模式下,一個queue只分配給一個consumer例項:這是由於拉取訊息是consumer主動控制的,如果多個例項同時消費一個queue的訊息,會導致同一個訊息在不同的例項下被消費多次,所以演算法上都是一個queue只分給一個consumer例項,一個consumer例項可以允許同時分到不同的queue。

Kafka的消費並行度依賴Topic配置的分割槽數,如分割槽數為10,那麼最多10臺機器來並行消費(每臺機器只能開啟一個執行緒),或者一臺機器消費(10個執行緒並行消費)。即消費並行度和分割槽數一致。RocketMQ消費並行度分兩種情況:順序消費方式並行度同卡夫卡完全一致;亂序方式並行度取決於Consumer的執行緒數,如Topic配置10個佇列,10臺機器消費,每臺機器100個執行緒,那麼並行度為1000。

4。3 訊息佇列分配策略

Producer使用MessageQueueSelector選擇將訊息投放到哪個分割槽 使用AllocateMessageQueueStrategy將不同分割槽分配給Consumer Group中的不同Consumer。一個分割槽(queue)僅允許分配給同一個Consumer Group下的一個Consumer(防止重複消費)。

MessageQueueSelector

RocketMQ 架構簡析

內建實現類:SelectMessageQueueByMachineRoom SelectMessageQueueByHash SelectMessageQueueByRandom

可以透過實現MessageQueueSelector介面,來自定義Producer投遞訊息時選擇分割槽的演算法。

AllocateMessageQueueStrategy

RocketMQ 架構簡析

內建實現類:

AllocateMessageQueueAveragely:平均分配演算法

AllocateMessageQueueAveragelyByCircle:基於環形平均分配演算法

AllocateMachineRoomNearby:基於機房臨近原則演算法

AllocateMessageQueueByMachineRoom:基於機房分配演算法

AllocateMessageQueueConsistentHash:基於一致性hash演算法

AllocateMessageQueueByConfig:基於配置分配演算法

可以透過實現AllocateMessageQueueStrategy來自定義queue 分配給特定Consumer Group下不同Consumer的策略。

參考:

https://github。com/apache/rocketmq/blob/master/docs/cn/

https://juejin。im/post/6844903589819875336

https://jaskey。github。io/blog/2016/12/19/rocketmq-rebalance/

http://objcoding。com/2019/09/13/kafka-partition-and-rmq-queue/

http://www。itmuch。com/books/rocketmq

作者

:RyanLee86799