1. 架構
RocketMQ由四部分組成,如下:
NameServer,名字服務,每個NameServer維護全量的Broker,topic路由的相關信息。NameServer可集群化部署,本身是無狀態的(元數據存在內存中,不落盤),相互之間獨立。為客戶端(生產者和消費者)提供路由發現,同時接受Broker的心跳信息,維護Broker在線狀態。其作為類似Kafka的zookeeper。
Producer cluster,生產者集群,包含一組生產者,負責生產消息,通過MQ的負載均衡模塊選擇相應的Broker集群隊列進行消息投遞。提供多種發送方式,同步發送、異步發送、單向發送。投遞的過程支持快速失敗并且低延遲。
Broker cluster,節點服務器集群,接受生產者發送的消息,將消息持久化(CommitLog文件),并發送給消費者消費。節點服務器也存儲相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。Broker節點間采用主從模式,實現高可用。
Consumer cluster,消費者集群,包含一組消費者,提供拉和推兩種方式,從Broker獲取消息并提供給應用程序消費。同時也支持集群方式和廣播方式的消費。
除了這幾個部分外,還需要了解一下兩個概念
Topic,主題,表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。
Message Queue,消息隊列,消息邏輯存儲單位,一個Topic下的消息,順序保存多個消息隊列中(默認為創建4個讀,4個寫隊列,這個類似于Kafka的分區)
2. 生產消息
2.1. 創建消息
消息由消息主題(Topic),消息Flag,消息屬性,消息體組成。
? 主題,該消息所屬的topic。
? 消息Flag,RocketMQ不做處理。
? 消息屬性,包括tag,用于消息的過濾;key,消息的索引,多個用空格隔開,用于消息的快速索引或者追蹤;waitStoreMsgOk,消息發送是否等消息存儲完成后再返回;delayTimeLevel,消息延遲級別,用于定時消息或者消息重試。
? 消息體,消息實際內容,字節數組形式存儲。
對消息采用自定義的編解碼格式,如下:
(1) 消息長度:總長度,四個字節存儲,占用一個int類型;
(2) 序列化類型&消息頭長度:同樣占用一個int類型,第一個字節表示序列化類型,后面三個字節表示消息頭長度;
(3) 消息頭數據:經過序列化后的消息頭數據;
(4) 消息主體數據:消息主體的二進制字節數據內容;
2.2. 選擇消息隊列
消息的邏輯存儲單位是消息隊列,一個Topic下有多個MessageQueue,并分布在不同的Broker上。所以發送消息前,需要獲取Topic路由信息。如圖,TopicA主題下有8個消息隊列(需要注意是,Broker-a的queue0與Broker-b的queue-0是兩個隊列,并不是備份)。
首先生產者查看本地是否緩存了Topic的路由信息(主要是MessageQueue列表,每個MessageQueue對象包括BrokerName,queueId等信息);如果本地沒有查到,則需要向NameServer查詢,并本地組裝該Topic的路由信息。
下一步就是要選擇往哪個消息隊列發送了,RocketMQ有兩種負載均衡測策略,默認策略和集群超時容忍策略。默認策略實際是一種輪詢策略,通過自增隨機數對MessageQueue列表大小取余,并獲取MessageQueue的位置信息,但獲得的MessageQueue所在的集群不能是上次的失敗集群;集群超時容忍策略,先隨機選擇一個MessageQueue,如果因為超時等異常發送失敗,會優先選擇該Broker集群下其他的MessageQueue進行發送,是一種高可用處理策略。也可以自定義發送規則,選擇特定的MessageQueue發送,比如sharding key 進行區塊分區,發送到對應的MessageQueue。
2.3. 發送消息
消息創建完成,并且選擇了待發送的隊列,萬事俱備,只待發送。RocketMQ發送模式有三種,同步發送,異步發送,單向發送(oneway)。
? 同步發送,發送后,阻塞當前的線程,等待Broker的響應。這種模式,可靠性高,但是會影響吞吐量。
? 異步發送,配置回調方法,發送后立即返回,Broker調用回調方法響應發送結果,如同步模式相比,性能顯著提高。
? 單向發送,僅將消息寫入socket即可,不關注發送是否成功,也不會有重試機制。這種吞吐量最高,但是會有丟失消息的風險。
除了支持單條消息的發送,RocketMQ還支持批量消息的發,多條消息的消息體合并一個。
2.4. 消息重試
在同步和異步模式下,如果消息發送異常,可以進行重試,重試的策略課設置如下:
? retryTimesWhenSendFailed:同步發送失敗重投次數,默認為2,因此生產者會最多嘗試發送retryTimesWhenSendFailed + 1次。不會選擇上次失敗的Broker,嘗試向其他Broker發送,最大程度保證消息不丟。超過重投次數,拋出異常,由客戶端保證消息不丟。當出現RemotingException、MQClientException和部分MQBrokerException時會重投。
? retryTimesWhenSendAsyncFailed:異步發送失敗重試次數,異步重試不會選擇其他Broker,僅在同一個Broker上做重試,不保證消息不丟。
? retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他Broker,默認false。十分重要消息可以開啟。
3. 存儲消息
3.1. 刷盤與主從復制
RocketMQ采用主從架構,以下是消息的數據流走向,如圖。
生產者生產消息后,發送到消息隊列所在Master節點(僅在master節點寫)的頁面緩存中,一方面,master節點上數據繼續落盤到磁盤上,實現持久化;另一方面,數據同步到Slave節點上,然后落盤到Slave磁盤上。同步和異步發送的模式下,Broker會返回寫入響應結果。從上圖看,返回的時機有以下幾種可能
? 寫入Master節點頁面緩存
? Master節點落盤
? 同步到Slave節點的頁面緩存
? 同步到Slave節點,并落盤。
其中第一種效率最高,但是可靠性較差,第四種效率是最低的,但是可靠性最強。
RocketMQ不支持對單個消息進行策略設置,而是在Broker節點上進行全局設置。
? flushDiskType,可選值為SYNC_FLUSH,ASYNC_FLUSH。SYNC_FLUSH表示同步刷盤,即等待刷盤完成后,再通知返回;ASYNC_FLUSH表示異步刷盤,寫入頁面緩存后即可返回,當緩存的消息積累到一定程度,統一刷盤。
? BrokerRole,可選值為ASYNC_MASTER,SYNC_MASTER,SLAVE。其中ASYNC_MASTER表示異步復制到從節點,master節點寫入成功,即可返回;SYNC_MASTER表示同步復制,Slaver寫入成功后才返回;
鑒于性能和可靠性權衡考慮,一般建議flushDiskType選擇ASYNC_FLUSH,BrokerRole選擇SYNC_MASTER。
3.2. 存儲模式
消息最終要落盤到磁盤上,以下是消息的(CommitLog、ConsumerQueue、Index)存儲協議:
如圖所示,與消息相關的文件有三個,分別是CommitLog,ConsumerQueue(注意,不是MessageQueue),indexFile。
? CommitLog
Broker上所有topic的消息都會保存在CommitLog文件中,該文件的路徑為$HOME/store/${CommitLog}/${fileName},單個文件的大小為1G,fileName文件名長度為20,左邊補零,剩余為起始偏移量,比如00000000001073741824,表示該文件保存偏移量從1073741824起始的消息。
還記得Kafka是以每個partition作為一個文件,這個弊端就是打開文件的句柄會很多,從而限制了每臺Broker上partition的個數。
RocketMQ從設計之初,就避免了這個"坑",讓所有的消息順序寫入一個文件,同時也導致了一個問題,如果從這個大文件中查找某個消息,那將非常耗性能。為了解決這個問題,RocketMQ引入了ConsumerQueue和indexFile。
? ConsumerQueue
為了快速的查到并讀出消息,在消息到達CommitLog后,會異步轉發到消息隊列,進行消息檢索。消息隊列中并不保存消息的實際內容,而是消息的索引,每個消息包含20個字節,其中CommitLog offset為8個字節,消息長度4個字節,消息的tag hashcode為8個字節。消息隊列文件保存在HOME/store/consumequeue/{topic}/{queueId}/{fileName},單個文件保存30W個條目。通過消息隊列的索引,就能快速定位到消息在CommitLog文件的位置。
ConsumerQueue并不持有消息的內容,僅保存消息的索引信息,注意與MessageQueue的區別,可以看做是MessageQueue的索引隊列。
? IndexFile
IndexFile保存了key與offset的關系,通過key快速檢索出對應的消息。indexFile文件的保存位置$HOME \store\index${fileName},文件名fileName是以創建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統中實現HashMap結構,故RocketMQ的索引文件其底層實現為hash索引,消息最先存入CommitLog,然后將索引寫入ConsumerQueue。
3.3. 消息刪除機制
消息不會永久保存在Broker服務器上,對于CommitLog和ConsumerQueue文件,一旦文件達到大小的閥值,就會創建新的文件,那么老的文件就不會再有更新,這些老文件都是可以被認為是過期文件,理論上都是可以被刪除的。RocketMQ可以通過配置過期時間(fileReservedTime,默認為72小時),一旦達到這個過期時間,就是允許刪除,但不是馬上刪除,而是在以下幾個場景中。
? 指定刪除文件的時間點,RocketMQ通過deleteWhenRocketMQ 通過delete When 設置一天的固定時間執行一次刪除過期文件操作, 默認為凌晨4 點。
? 磁盤空間是否充足,如果磁盤空間不充足,則返回true ,表示應該觸發過期文件刪除操作。
? 預留,手工觸發,可以通過調用excuteDeleteFilesManualy 方法手工觸發過期文件刪除,刪除消息文件時,不會判斷這些消息有沒有被消費。
4. 消費消息
4.1. 拉/推模式請求消息
與RabbitMQ類似,RocketMQ的消費端也支持pull和push兩種模式
? 拉(pull)
pull(拉)模式是消費端主動從Broker上獲取消息,獲取請求的間隔時間,每次獲取的條數,偏移量等都是有消費端決定,消費端可以根據自身的消費能力進行控制。
pull模式客戶端需要實現消息隊列的遍歷,以及每個隊列offset的保存,控制代碼較為復雜,實際應用的較少。
? 推(push)
事實上,RocketMQ并沒有實現真正意義上的推模式,而是通過pull的"長輪詢"實現。消費端不間斷的發起請求,當Broker有消息堆積時,則返回消費端,當沒有新消息時,并不著急返回,Broker會將這個請求"掛起",通過線程不斷掃描消息隊列(請求的偏移量小于消息隊列的偏移量,則表示有新消息),直到有新消息,再返回給消費端。
無論pull還是push模式,獲取到消息,都會先放入到消費端的緩存,再進行消費處理。
4.2. 消息發送
Broker是主從模式,主從節點都可讀(這個和Kafka以及RabbitMQ不同)。請求到達Broker后,根據ConsumerQueue索引消息的位置,然后再從CommitLog文件中獲取實際消息內容。ConsumerQueue以及CommitLog都是以文件的模式存儲,并且CommitLog的文件大小為1G,傳統的I/O操作,需要經過兩次拷貝操作,如圖。
對于大文件來說,兩次拷貝的性能低下,影響讀寫的吞吐量。RocketMQ采用內存映射機制,通過將應用程序的邏輯內存地址直接映射到Linux操作系統的內核緩沖區,減少一次拷貝。
內存映射的基本原理,RocketMQ通過MappedByteBuffer的map()函數將文件映射到虛擬內存,讀取操作時,如果文件已經加載到頁面緩存,則直接從內存中讀取;如果沒有,需要發生一次缺頁中斷重新拷貝到內存頁中,再被讀取。應用程序通過讀寫自己的邏輯內存,達到實際操作操作系統內核緩沖區的效果。內存映射機制適合大文件的讀寫操作,整體性能提升2-3倍。
4.3. 消息確認(ACK)
消息消費完成后,需要進行確認,以便Broker知道消息隊列的消費的進度,并進行管理。再均衡時,知道從哪個offset繼續消費。
? 消費成功,消息處理完成后,返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,表示消息成功。Broker記錄下offset。
? 消費失敗,如果由于數據庫異常,余額不足等情況,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,表示消息失敗,那么重新發回Broker,構建RETRY topic,延遲一定時間(業務可配置,默認10s),重新發送,當重試一定次數后(可配置,默認16次),則投遞到死信隊列,人工干預。
消息是通過批量獲取,且批量確認的,存在一定的重復消費風險。比如100條消息,其中99條已消費,還有一條沒有消費,此時消費端宕機或者故障,沒有來得及ack,Broker無法知曉實際的消費情況。當重新負載后,這99條將會重新投遞消費。這種情況,需要消費端做好冪等性保護(Kafka,RabbitMQ都有類似的情況)。
4.4. 消費者負載均衡
與Kafka類似,消費者也是集群模式,集群中的消息者通訊模型分為兩種,廣播模式和集群模式。
? 廣播模式,相同消費集群的每個Consumer實例都接收全量的消息(發布-訂閱模式)。
? 集群模式,相同消費集群的每個Consumer實例平均分攤消息(隊列模式)。
集群模式下,一個消息消費隊列在同一時間只允許被同一消費組內的一個消費者消費,一個消息消費者能同時消費多個消息隊列(與Kafka的設計理念一致)。當消費集群中擴容或者縮容消費者實例,或者消費隊列所在主從Broker宕機,都需要進行重新負載,使得流量重新分配。我們看下再均衡的過程。
(1)消費者通過定時任務,每個一段時間,向所有的Broker發送心跳包,包含消息消費分組名稱、訂閱關系集合、消息通信模式和客戶端id的值等信息。Broker在接受到心跳包后,本地保存這些信息,為后面的再均衡提供數據準備。
(2)消費者啟動再均衡線程,每隔20s執行一次。首先根據Topic向Broker獲取消費隊列集合(mqSet)。
(3)根據消費者,Topic向Broker獲取消費者id列表(comsumerIdList,第一步Broker已經保存了這些信息)。
(4)根據mqSet,consumerIdList,通過均衡算法,計算當前消費者分配到的消費隊列集合。比如mqSet(q0,q1,q2,q3,q4,q5,q6,q7),consumerIdList(c0,c1,c2)。常用有以下兩種算法。
• 平均分配,結果如下:
c0:q0,q1,q2
c1:q3,q4,q5
c2:q6,q7
• 平均輪詢分配,結果如下;
c0:q0,q3,q6
c1:q1,q4,q7
c2:q2,q5,
(5)新分配的隊列集合,與當前消費者原有的隊列集合進行比較。
如果新隊列集合中不包含原有的隊列,則停止原有隊列消息并移除。
如果原有隊列集合中不包含新分配的隊列集合,則創建pullquest,根據offset,開始從該消費隊列消費消息。
如果新隊列集合與原有集合隊列重合,則保持消費。
5. NameServer
NameServer的主要職責就是路由注冊、路由找查,好比RocketMQ的大腦,統一協調各部位的工作,以下這些都是NameServer(協調者)需要完成的工作。
1、生產者和消費者如何知道所要操作的Topic有哪些隊列,這些隊列都分布在哪些Broker上
2、當Broker出現故障,或者topic以及消費隊列發生變化,生產者和消費者如何能快速的獲悉。
3、Broker之間如何了解彼此的信息,如topic,隊列,主從等。
5.1. 狀態存儲結構
NameServer保存Broker集群狀態主要有以下5個HashMap變量
? topicQueueTable< String/ * topic */, List<QueueData>>
key值是topic名稱,value為List<QueueData>,QueueData 里存儲著Broker 的名稱、讀寫queue 的數量、同步標識等。
? BrokerAddrTable< String/* BrokerName*/, BrokerData>
key值為BrokerName,相同名稱的Broker可能存在多個Broker節點,包括一個Master和多個slaver。BrokerData包含BrokerName,所屬集群,主備Broker地址等信息。
? ClusterAddrTable< String/* ClusterName*/, Set<String/* BrokerName */>>
key值為集群名,value為集群下包含所有節點的BrokerName。
? BrokerLiveTable< String/ * BrokerAddr */, BrokerLivelnfo >
key值為Broker的地址,表示一臺服務器,value為該節點服務器的實時狀態,包括更新狀態的時間。
? filterServerTable<String/* BrokerAddr */,List <String >/* FilterServer*/>
key值為Broker的地址,filterServer是與這個Broker關聯的多個filterServer的地址。
所有的信息都可以由這5個變量組合而成。
5.2. 狀態維護
Broker與nameServer間建立長鏈,心跳保活,nameServer每隔10s檢查一次,如果超過2分鐘沒有更新,則認為Broker失效。從BrokerLivelnfo中剔除。注意:這樣會導致生產者和消費者最長120s后發現Broker不可用。
Broker新增和刪除,topic新增和刪除,消費隊列的變化,都會向NameServer進行注冊,及時維護狀態。
NameServer是無狀態的,就意味著存儲的變量不會落盤保存,只會保存在內存中。Broker和所有的NameServer進行注冊,每個NameServer保持全量的數據,NameServer之間相互獨立,無需同步數據。生產者和消費者可以配置多個NameServer,當連接的NameServer故障后,轉移另一臺NameServer上。
6. 可靠性
從消息發送,存儲,消費三個階段來分析下RocketMQ的可靠性。
6.1. 發送階段
前面我們介紹了消費發送時,會有三種模式,同步和異步模式下,都會返回發送的狀態,如果發送失敗,會進行消息重發,減少了在發送階段的消息丟失。我們看下在發送階段的其他的可靠性保障措施。
6.2. 容錯機制
當某個Broker節點宕機后,由于心跳檢查的延后性(前面介紹,最長需要120s),生產者還認為該Broker可用,還會向該Broker發送消息,發送的結果肯定失敗。失敗后會觸發重發機制,按照前面講的兩種策略進行負載,無論哪一種都需要"過濾"掉失敗的Broker。
RocketMQ設計了latencyFaultTolerance機制,sendLatencyFaultEnable開關開啟,對之前失敗的,按一定的時間做退避。例如,如果上次請求的latency超過550ms,就退避3000ms;超過1000ms,就退避60000ms。
6.3. 事務消息
與RabbitMQ類似,RocketMQ也支持事務消息,采用的是兩階段提交方式。
1)生產者發送"待確認"消息,RocketMQ接受到后,放入到特定的topic,該topic對消息者不可見。
2)RocketMQ回復發送成功,第一階段結束。
3)生產者開始執行本地的邏輯。
4)生產者根據本地邏輯的執行結果,向RocketMQ發送commit或者rollback消息。
5)RocketMQ接受commit消息后,將消息放入原有的topic,訂閱方將能夠接受到消息;如果接受到rollback消息,則刪除第一階段的消息,訂閱方無法接受消息。
6) 如果出現異常情況,沒有接受到4)的消息,經過固定的時間后,對"待確認"消息進行回查(不一定是原來的生產者實例,同組的即可),根據回查的結果按照5)處理。
6.4. 存儲階段
對于單臺Broker的flushDiskType有同步落盤和異步落盤兩種配置,異步落盤的可靠性高。同時,RocketMQ的Broker是主從模式,主從之間的BrokerRole可設置異步復制和同步復制,同步復制的可靠性高。我們重點看下主從和切換。
6.5. 主從同步
RocketMQ的Master-Slaver模式是高可用的保障之一,支持讀寫分離,master可讀可寫,slaver只讀。當slaver啟動后,就會從master同步信息,注意是"信息",不僅指的是消息體(CommitLog),還還包括一些元數據,如consumerOffset,SubscriptionGroupConfig等。對于這兩種數據,采用不同的同步策略。對于元數據,采用的是定時同步,因為元數據的實時性和可靠性相比較而言,沒有太高要求。對于消息體,創建TCP連接,不間斷發送同步請求。消息內容一旦沒有同步,master磁盤故障后,可能會導致消息的徹底丟失。所以對于消息體,實時性和可靠性要求高。其同步的流程圖如下:
6.6. 主從切換
對于Master-slave架構,由于Master負責寫入,如果某臺slave宕機后,不會影響消息的寫入,但是會影響從該slave上讀取消息,自動轉移到其他的節點。
當Master節點宕機后,該節點上的消息隊列將無法寫入,但不會影響其他slave節點的消息讀取,連接這臺master節點的生產者通過容錯機制,將選取其他的消息隊列進行寫入。slave無法自動切換成master,需要手動干預。RocketMQ on Dledger版本可以基于raft算法進行leader節點選舉,從而實現自動切換。
6.7. 消費階段
消費階段,通過消息確認機制,確保消息的不丟失。。
6.8. 順序消息
有些場景中,比如訂單的生成,付款,發貨3個消息,需要確保消息的順序。順序消息又分為全局順序消息和部分順序消息。無論哪種,都需要生產和消費配合。
? 全局順序消費
默認的情況下,RocketMQ的一個topic,會創建8個寫隊列,8個讀隊列,每個消息可能被寫入任意一個隊列里,而消費者有多個,每個消費者可能啟動多個線程并行處理,也無法保證順序。
如果要實現全局順序消費,需要把Topic的讀寫隊列數設置為1,然后Producer和Consumer的并發也設置為1,此時雖然能達到全局順序消息的目的,但是會犧牲高吞吐,高并發的特性。
? 局部順序消費
對于指定的Topic,根據業務的ID或者sharding key,分發到對應的MessageQueue。在消費的過程,通過加鎖的方式,控制并發消費,確保順序消費。
6.9. 定時消息
RocketMQ支持定時任務的發送,考慮到性問題,RocketMQ不支持任意精讀的延遲時間,僅支持按特定級別的延遲消息,默認為”1s 5s 10s 30s lm 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,delayLevel=1表示延遲1s,delayLevel=2表示延遲5s,依次類推。 當delayLevel為1的消息到Broker后,我們看下處理流程:
1)改變topic為SCHEDULE_TOPIC_XXXX,備份原有的主題和消息隊列到消息屬性中,放入到消息隊列為delay減1(本例為queue0)。改變topic是RocketMQ常用的方式,前面介紹的事務提交也是如此。
2)每個延遲級別對應一個消息隊列,并啟動一個定時任務,按照延遲級別對應的延遲時間進行掃描,比如delay=1,為1s掃描一次。
3) 根據上次拉取的偏移量從消費隊列中獲取所有的信息。
4)根據消息的物理偏移量和消息大小,從CommitLog拉取消息。
5)消息重新創建,恢復消息原有的主題,消息隊列,清除delaylevel屬性,存入CommitLog文件。
6)轉發到原有主題的消息 隊列,供消費者消費。
由于定時任務按照延遲級別對應的延遲時間定期掃描,所以無法做到精確的延遲。
6.10. 消息回溯
消息被消費后,并不立即被刪除,根據配置(fileReservedTime,默認為72小時)會保留一定的時間。在這段時間內,如果消費端下游系統(比如數據庫),由于故障,導致消息丟失,可以通過時間維度進行消息回溯,重新消費。但此時可能會有重復,需要做好冪等性保護。
6.11. 消息過濾
RocketMQ采用的是服務端過濾模式,避免不必要的消息傳遞到消費端。提供了三種方式進行消息過濾
? Tag過濾
在消息屬性中包含了Tag,消息隊列的后8個字節是Taghashcode, 根據Tag可以進行簡單的過濾,不需要讀取CommitLog的消息體內容,性能最高。存儲taghashcode而不是tag,是為了保持定長。
? SQL表達式過濾
Tag雖然高效,但是支持的邏輯較簡單,有時需要采用比較復雜的過濾邏輯,RocketMQ提供了類似SQL表達式的方式進行過濾。這種方式需要讀取CommitLog的內容,會增加磁盤的讀取壓力,效率較低。
? Filter Server過濾
Filter Server是比SQL更靈活的過濾方式,用戶通過自定義Java函數,根據Java函數的邏輯對消息進行過濾。
Filter Server類似一個comsumer進程,從本機的Broker獲取消息,在根據用戶上傳過來的Java函數進行過濾,過濾后的消息在傳給遠端的comsumer。這種方式雖然靈活,但是會占用很多的CPU資源,要根據實際情況選擇。
7. 高吞吐
高吞吐和高可靠性在多數場景下是一對矛盾體,這就需要我們根據應用的場景進行不同的取舍選擇。與可靠性一樣,我們也按照發送,存儲,消費三個階段來進行分析。
7.1. 發送階段
生產者發送消息有三種模式,其中單向模式,將消息發送socket后立即返回,不關心消息是否真正到達borker,這種模式的吞吐量是最高的,但是會導致消息的丟失,在某些場景下,比如日志收集是可以使用的。另一種提高發送速度的方法就是增加Producer并發量,使用多個Producer實例進行發送,由于是順序寫入CommitLog,所以能保持較高的寫入性能。
7.2. 存儲階段
數據索引和數據實體分離,既能確保較高的寫入性能,又能實現快速的讀取。內存映射機制,減少一次拷貝,提升了寫入和讀取的性能。另外,I/O的調度算法推薦使用deadline。
7.3. 消費階段
某些業務場景下,多條消息同時處理的時間會大大小于逐個處理的時間總和,比如消費消息中涉及update 某個數據庫, 一次update IO 條的時間會大大小于十次updatel 條數據的時間。這時可以通過批量方式消費來提高消費的吞吐量。批量消費消息的個數可以通過設置Consumer 的consumeMessageBatchMaxSize 這個參數。另一種方式通過增加consumer的并行量,不過在集群模式下,comsumer的個數不能超過Topic下read queue的總量。
8. 總結
RocketMQ由Producer,consumer,NameService,Broker四部分組成。其中Producer為生產者,Consumer為消費者,NameService作為協調器,管理Broker,topic相關信息;Broker存儲消息,并提供讀寫服務。
Producer從NameService獲取topic的路由信息,封裝消息后,通過負載均衡策略選擇相應的消息隊列,發送消息。發送的模式有三種,分別為同步,異步,單向。
Broker采用主從模式,主節點讀寫,從節點只讀。發送的消息到達主節點后,可選擇異步或者同步刷盤方式,同時通過同步或者異步復制到從節點,確保主從一致。同一個Broker節點上所有Topic信息寫入到同一個CommitLog,采用數據和索引分別保存,保證寫入和讀取的速度。
Consumer通過pull或者push的方式獲取消息,消息處理后,進行消費ACK,確認消費成功;對于消費不成功的消息,可進行消息重試。
RocketMQ支持事務消(xiao)(xiao)(xiao)息(xi)(xi),順序消(xiao)(xiao)(xiao)息(xi)(xi),定(ding)時消(xiao)(xiao)(xiao)息(xi)(xi),消(xiao)(xiao)(xiao)息(xi)(xi)回溯,消(xiao)(xiao)(xiao)息(xi)(xi)重試,消(xiao)(xiao)(xiao)息(xi)(xi)過濾等(deng)功能。