1. 架構
我(wo)們先來看(kan)下(xia)整體架(jia)構。
Producer,消(xiao)息(xi)(xi)生(sheng)產者,即消(xiao)息(xi)(xi)的(de)發布方, 生(sheng)產者生(sheng)產的(de)消(xiao)息(xi)(xi),首(shou)先發布到指定的(de)交(jiao)換(huan)機上(shang),交(jiao)換(huan)機通過路由鍵(jian)(RoutingKey)的(de)匹配,選擇對應的(de)隊(dui)列(lie)進行(xing)投遞(di)。消(xiao)息(xi)(xi)者訂閱隊(dui)列(lie),消(xiao)費隊(dui)列(lie)的(de)消(xiao)息(xi)(xi)。
Connectiton,客(ke)戶端與Broker間的TCP連接(jie)。
Channel,信道,每個連接采(cai)用多路復用,包(bao)含(han)多個信道。producer與Broker間采(cai)用信道傳遞數據。
Broker,Rabbit服務節點。
Vhost,虛機(ji)機(ji),一個節點下包含多(duo)(duo)個vhost,vhost間的exchange,queue相互(hu)隔離(li)。就(jiu)好(hao)比一臺(tai)(tai)物理機(ji)上(Broker)部署多(duo)(duo)臺(tai)(tai)虛機(ji)(vhost),虛擬機(ji)采用不同的用戶名密碼登錄,實(shi)現多(duo)(duo)租戶。
Exchange,交換機,消(xiao)息(xi)首(shou)先會(hui)傳(chuan)遞到交換機,由(you)交換機匹配(pei)路由(you)鍵(RoutingKey)決定投遞到哪個queue。類比郵政局。
Queue,隊列,存儲(chu)消息的(de)數據結(jie)構。類比(bi)小區(qu)的(de)快(kuai)遞柜(ju)。
Binding,綁(bang)(bang)定(ding)(ding),交換機與隊列(lie)間通過(guo)路由鍵(RoutingKey)進行綁(bang)(bang)定(ding)(ding)起來(lai)。RoutingKey類比目的地址(zhi)。
Consumer,消費者,即消息的接受方(fang)。
2. 消息發布
生產(chan)者(zhe)創建(jian)和啟(qi)動(dong)信道(dao)連接,聲明交(jiao)換(huan)機機,隊列,路由鍵等信息(xi)(xi),消(xiao)息(xi)(xi)封裝(zhuang)成幀(zhen),通過(guo)信道(dao)發(fa)送到(dao)Broker。
2.1. 創建連接和信道
一個(ge)(ge)應用程序(生產者)與RabbitMQ服(fu)務節點,維(wei)護一個(ge)(ge)TCP連接(Connect),在(zai)(zai)這(zhe)個(ge)(ge)連接中可以(yi)創建多(duo)個(ge)(ge)信道(dao)用于信息的傳(chuan)遞,各個(ge)(ge)信道(dao)間相互隔離。這(zhe)樣做的目的就是避(bi)免在(zai)(zai)多(duo)線程的情(qing)況(kuang)下,頻繁(fan)開啟和關(guan)閉多(duo)個(ge)(ge)TCP連接,帶(dai)來的性能消耗(每個(ge)(ge)TCP經歷三次(ci)握手,四(si)次(ci)揮手),實際就是TCP的多(duo)路(lu)復用,類似HTTP2.0的原理(li)。
Connection connection = factory.newConnection() ; //創建連接
Channel channel = connection.createChannel() ; //創(chuang)建(jian)信(xin)道
生產者與Broker之間完成(cheng)了8次交(jiao)互,分(fen)別為Connect的啟(qi)動,調整(zheng),打開以及Channel的開啟(qi),最(zui)終打開信道。
后續所有(you)與Broker通(tong)訊都是基于該信道,從(cong)代碼(ma)層面看,都是基于channel對象操(cao)作(zuo)。
2.2. 創建交換機,隊列
生產者將消(xiao)息(xi)發送到交換機,交換機根據(ju)(ju)路(lu)由鍵(jian),投遞到對應(ying)的(de)目的(de)隊列保(bao)存。這(zhe)個過(guo)程和(he)郵(you)(you)件(jian)信(xin)息(xi)非常類似(si),寄件(jian)人將信(xin)件(jian)送到郵(you)(you)政(zheng)局,郵(you)(you)政(zheng)局根據(ju)(ju)信(xin)件(jian)的(de)地(di)址投遞到目的(de)地(di)的(de)郵(you)(you)箱(xiang)中。
RabbitMQ提供了(le)通過(guo)管理平臺或(huo)者命名(ming)行(xing)預(yu)先(xian)(xian)創(chuang)(chuang)建(jian)交換(huan)機和隊(dui)列(lie)。采(cai)用預(yu)先(xian)(xian)定義還是客(ke)戶端(duan)創(chuang)(chuang)建(jian),就看業務(wu)的需要,比如我們(men)對交互(hu)機以(yi)及隊(dui)列(lie)預(yu)先(xian)(xian)做了(le)充分的規劃,那就采(cai)用預(yu)先(xian)(xian)創(chuang)(chuang)建(jian);如果需要創(chuang)(chuang)建(jian)一些臨時的交換(huan)機或(huo)者隊(dui)列(lie),客(ke)戶端(duan)創(chuang)(chuang)建(jian)更(geng)適合(he)。
要想創建(jian)一個可用的交互機和隊列,需要完(wan)成以下步驟(zou)
(1)創建交換機
以Java客戶端(duan)為例,調用exchangeDeclare方法。
exchangeDeclare(String exchange ,String type , boolean durable ,boolean autoDelete , boolean internal ,Map<String, Object> arguments)
該方法有很(hen)多重(zhong)載的方法,我們不一(yi)一(yi)介(jie)(jie)紹(shao),只介(jie)(jie)紹(shao)下幾個重(zhong)要的參數
exchange,定義該交換器的名稱
type,定義該交換器(qi)的路由類型,分類fanout,direct,topic,headers等(后面我們(men)重點介紹)
autoDelete,是否自動(dong)刪(shan)除(chu),當設(she)置為true,所有與(yu)這個交(jiao)(jiao)換器(qi)綁定的隊(dui)列或(huo)者交(jiao)(jiao)換器(qi)都與(yu)此解綁,就會自動(dong)刪(shan)除(chu)。
(2)創建隊列
以(yi)Java客戶端為例,調用exchangeDeclare方法。
queueDeclare (String queue , boolean durable , boolean exclusive,boolean autoDelete, Map<String,Object> arguments)
其中重要的參數如下:
queue,隊列名稱
durable,隊列是(shi)·否持(chi)久化(注意(yi),與消(xiao)息(xi)是(shi)否持(chi)久化不是(shi)一(yi)回事,后面再介紹(shao))
exclusive,是否(fou)排他(后(hou)面介紹)
autoDelete,是(shi)否自(zi)動刪除,當設置為true,所有與(yu)這(zhe)個(ge)隊列連接的消(xiao)費(fei)者(zhe)都斷開時,才會自(zi)動刪除。
(3)綁定
交(jiao)(jiao)換機(ji)(ji)和隊列創建好(hao)了(le),接下來就(jiu)需要(yao)將隊列綁定到(dao)交(jiao)(jiao)換機(ji)(ji)上,告訴交(jiao)(jiao)換機(ji)(ji),什么樣的(de)消息(xi)需要(yao)投遞給該交(jiao)(jiao)換機(ji)(ji)。
queueBind(String queue , String exchange , String routingKey)
queue,待綁定的隊列名。
exchange,需要綁定到的交(jiao)換(huan)機(ji)。
routingKey,用來綁定隊列(lie)和交換機的(de)路由鍵(jian)(有些地(di)方為了區別發(fa)送的(de)時候(hou)的(de)路由鍵(jian),將(jiang)綁定時候(hou)的(de)鍵(jian)稱之為綁定鍵(jian)Bindkey)
如果交換機和隊(dui)列已經存在,再(zai)次聲明,Broker不會(hui)重新創(chuang)建,直(zhi)接返回成功(gong)。
2.3. 創建消息
RabbitMQ的(de)消(xiao)息分為消(xiao)息頭與(yu)消(xiao)息體,類(lei)似http協(xie)議的(de)消(xiao)息結構(gou)。
消(xiao)息頭定了消(xiao)息的相關屬性(xing),主要的有以下:
content-type,傳(chuan)輸消息體的MIMEl類型,比如:application/json
content-encoding,消息體的編(bian)碼(ma),如(ru)gzip
expiration,消息的過期時間
delivery-mode,消息是(shi)否(fou)持久化(hua)(hua)到(dao)磁盤,1表示(shi)非持久化(hua)(hua),2表示(shi)持久化(hua)(hua)。
priority,消(xiao)息(xi)的優先(xian)(xian)級(ji),數值越(yue)大表示該消(xiao)息(xi)的優先(xian)(xian)級(ji)越(yue)高(gao),優先(xian)(xian)消(xiao)費(fei)。
消息體(ti)主要是用戶自定的(de)消息,可以是json,也可以是xml,通過(guo)序(xu)列化成二(er)進制數據。
以Java客戶端為例,
//創建消息頭
Map<String , Object> headers = new HashMap<String , Object>() ;
headers.put( " localtion" , "here " );
headers . put( " time " , " today " );
//創建消息
byte[] messageBodyBytes = "H ello , world! ". getBytes();
2.4. 發布消息
 消息頭和消息體都創建完成,接下來就是向指定的交換機發布消息,聲明routingKey,告知交換機投遞到那個隊列上。
以Java客戶端(duan)為例,
channe1 .basi cPublish(exchangeName,routingKey,mandatory,immediate,
  new AMQP.BasicProperti es.Buil der ()
.headers(headers)
 .build()) ,
  messageBodyBytes) ;
exchangeName,交換機的名稱
routingKey,路由鍵
mandatory,immediate,這兩(liang)個(ge)參數后面(mian)再介紹(shao)。
客戶端會將消息(xi)封裝方法幀,消息(xi)頭幀,消息(xi)體幀。
每(mei)個(ge)(ge)消(xiao)息(xi)體幀(zhen)最大是131Kb,如果超過需(xu)(xu)要分割成多(duo)個(ge)(ge)。所以一個(ge)(ge)完(wan)整消(xiao)息(xi),至少需(xu)(xu)要三個(ge)(ge)幀(zhen)(方法幀(zhen),消(xiao)息(xi)頭幀(zhen),消(xiao)息(xi)體幀(zhen))。
2.5. 消息響應
當生(sheng)產(chan)者(zhe)(zhe)將(jiang)消(xiao)息發送到交換機(ji)(ji)后,交互(hu)機(ji)(ji)是否將(jiang)消(xiao)息正確(que)路(lu)由,并存入隊(dui)列(lie)呢?對于可靠性要求高的系統,這個信息對于生(sheng)產(chan)者(zhe)(zhe)很重要。RabbitMQ提供了兩種方式,事(shi)務和生(sheng)產(chan)確(que)認(ren)機(ji)(ji)制(zhi)(pulisher comfirm)。這里我們(men)介紹下生(sheng)產(chan)者(zhe)(zhe)確(que)認(ren)機(ji)(ji)制(zhi),事(shi)務放到后面介紹。
生產(chan)者將(jiang)信(xin)道(dao)設(she)置成comfirm模式,所(suo)有(you)通(tong)過該信(xin)道(dao)發(fa)送的消(xiao)息(xi)(xi)都會被(bei)分配一個(ge)唯一的ID,一旦消(xiao)息(xi)(xi)被(bei)投送到(dao)正確的隊列后,將(jiang)會返回(hui)確認信(xin)息(xi)(xi)(Basic.Ack),生產(chan)者接受信(xin)息(xi)(xi)后確認消(xiao)息(xi)(xi)已發(fa)布成功;當然,如果投送失(shi)敗,會返回(hui)Basic.Nack信(xin)息(xi)(xi),生產(chan)者根據(ju)業務(wu)場景判斷是否需要重發(fa)。
確認(ren)信息(xi)是通過(guo)異步回調返(fan)回,不會阻塞生(sheng)產者發送其(qi)他消息(xi),這(zhe)種模式性能較高(gao),但(dan)是在重試(shi)的情況下,無法確保消息(xi)的順(shun)序寫入(ru)。
3. 消息路由與存儲
3.1. 消息路由
消息發送到節點的(de)交換(huan)機(ji)(ji)后,交換(huan)機(ji)(ji)需要通過路由鍵投遞到對應的(de)隊列上(shang),前面介紹過交換(huan)機(ji)(ji)有四種路由類(lei)型,分別(bie)為(wei)direct,fanout,topic,headers。
(1)direct
任何綁定(ding)在交換機的(de)隊列,只要它的(de)路(lu)由鍵和發(fa)布消(xiao)(xiao)息的(de)路(lu)郵建一(yi)致,就能收到消(xiao)(xiao)息。一(yi)般用于需要將一(yi)個(ge)消(xiao)(xiao)息投遞到一(yi)個(ge)或者多個(ge)確定(ding)的(de)目標隊列。
(2)fanout
所(suo)有發往(wang)fanout交換機的(de)消息被投遞到(dao)所(suo)有綁定到(dao)該交換機的(de)隊列中,這(zhe)個(ge)應用(yong)于"廣(guang)播"模式,該模式下,路(lu)由鍵不起作用(yong)。
(3)topic
采用句點分(fen)隔的形式(shi),隊列可以通(tong)過使用基于通(tong)配符(fu)(*和#)的模式(shi)匹配的方式(shi)來綁定(ding)到路(lu)由鍵 上(shang),發送的消(xiao)息攜帶的路(lu)由鍵匹配上(shang)就會投遞到該隊列。
4、headers
在隊列綁定時,定義匹配headers的參數(shu)的值,并設(she)(she)置x-match參數(shu)。發布(bu)消息時,定義header的屬性值,當x-match設(she)(she)置為(wei)(wei)all表(biao)示(shi)所有的消息的header屬性值都(dou)匹配才能路由到隊列,設(she)(she)置為(wei)(wei)any表(biao)示(shi),任何(he)一個header的屬性值匹配就可以(yi)路由到隊列。
headers類型的(de)靈活性以(yi)及效率都要比其(qi)他的(de)類型差,所以(yi)用的(de)比較少。
3.2. 消息存儲
隊列(lie)其實還是(shi)(shi)個(ge)邏輯概念(nian),它包含rabbit_amqqueue_process(隊列(lie)進程)和backing_queue(維護5個(ge)狀態(tai)棧)。通過rabbit_amqqueue_process負責(ze)接受(shou)生產者發布(bu)的消(xiao)(xiao)(xiao)息(xi)(xi),向消(xiao)(xiao)(xiao)費者交付消(xiao)(xiao)(xiao)息(xi)(xi),處理消(xiao)(xiao)(xiao)息(xi)(xi)的確認等;backing_queue是(shi)(shi)消(xiao)(xiao)(xiao)息(xi)(xi)存儲的具體形式和引擎,并向rabbit_amqqueue_process提供相關(guan)的接口。
delivery-mode參(can)數,1表示非持(chi)(chi)久化,2表示持(chi)(chi)久化。非持(chi)(chi)久化的消息(xi)(xi)置(zhi)于內(nei)存,當內(nei)存達到設定閥(fa)值(zhi)(vm memory high watermark paging ratio ),逐步寫入(ru)磁(ci)盤;持(chi)(chi)久化消息(xi)(xi)則直接寫入(ru)磁(ci)盤。磁(ci)盤的寫入(ru)速(su)度要(yao)(yao)遠小(xiao)于內(nei)存(都是順序寫入(ru)的前提下),并且(qie)需要(yao)(yao)等待落(luo)盤后(hou)才返回(hui)生(sheng)產者確認信息(xi)(xi),所以持(chi)(chi)久化模(mo)式的吞吐量要(yao)(yao)低(di)于非持(chi)(chi)久化模(mo)式,但是可靠性強。
對于非持(chi)久化需(xu)要注意一點,如果大(da)量的(de)消息堆積內(nei)存(cun),來不及(ji)進行(xing)落盤釋放內(nei)存(cun),一旦達到內(nei)存(cun)的(de)告警(jing)閥值(zhi)(vffi memory high watermark),就會(hui)產(chan)生告警(jing),為確保(bao)節點的(de)可用性(xing),會(hui)阻(zu)塞所有生產(chan)者的(de)連接,直(zhi)到內(nei)存(cun)恢復到正常狀態,這種情況(kuang)下,會(hui)嚴重影響吞(tun)吐的(de)。
持久化層分為兩部分,隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。隊列索引負責維護隊列中落盤消息的信息,包 括消息的存儲地點、是否己被交付給消費者、是否己被消費者 ack 等,每個隊列都有一個對應的隊列索引,以".idx"為文件后綴。消息存儲以鍵值隊的形式存儲消息,被所有的隊列共享,每個(ge)節點有且(qie)只(zhi)有一(yi)個(ge),以(yi)".rdq"為(wei)文件后(hou)綴,順序寫入。
除了消息(xi)可以設(she)(she)置(zhi)持(chi)久化,交換機和隊(dui)列(lie)也可以設(she)(she)置(zhi)持(chi)久化,三者(zhe)需要同時設(she)(she)置(zhi),才能(neng)確保真正的持(chi)久化。如果隊(dui)列(lie)沒有設(she)(she)置(zhi)持(chi)久化,消息(xi)是(shi)(shi)持(chi)久化的,那么就會(hui)導致隊(dui)列(lie)刪除了,但是(shi)(shi)消息(xi)還在(zai),無法消費。
創建消(xiao)息的(de)時(shi)候,可以(yi)通過(guo)消(xiao)息頭的(de)expiration設(she)置消(xiao)息的(de)過(guo)期(qi)時(shi)間,如果消(xiao)息到達了過(guo)期(qi)時(shi)間,沒有被消(xiao)息,將會(hui)被投遞到死(si)信隊列(lie)(見后(hou)面章節)
3.3. 消息刪除
消息(xi)存儲時,會(hui)(hui)在ETS(Erlang Term Storage)表中記錄消息(xi)在文(wen)件(jian)(jian)中的位(wei)置映射和文(wen)件(jian)(jian)的相關信息(xi)。消息(xi)被(bei)正確的消費(fei)后,即(ji)(ji)會(hui)(hui)被(bei)刪(shan)除,首先刪(shan)除該表的記錄,但是不會(hui)(hui)立即(ji)(ji)刪(shan)除文(wen)件(jian)(jian),而(er)是僅(jin)標記待刪(shan)除數據(ju),待一個(ge)文(wen)件(jian)(jian)都是垃圾數據(ju)時可以將這個(ge)文(wen)件(jian)(jian)刪(shan)除。
4. 消息消費
4.1. 消費模式
與生(sheng)產(chan)者(zhe)(zhe)一(yi)樣,消(xiao)(xiao)(xiao)費者(zhe)(zhe)需要與節點建(jian)立連接,打開信道,實現通訊(xun)。消(xiao)(xiao)(xiao)費者(zhe)(zhe)需要訂閱隊列進行消(xiao)(xiao)(xiao)費,消(xiao)(xiao)(xiao)費的模式(shi)分為推(push)和拉(pull)兩種.
(1)推模式
  這個模式(shi)RabbitMQ推薦的(de),隊列(lie)會將消息推送給(gei)消費者,流程如下:
為了提高消(xiao)費(fei)速度,消(xiao)費(fei)者(zhe)可(ke)以(yi)評估自身(shen)的消(xiao)費(fei)能(neng)力,預設(she)(she)Qos,也就是設(she)(she)置一(yi)次(ci)可(ke)以(yi)獲取消(xiao)息的個數(shu)(類似(si)TCP的滑(hua)塊窗口),隊列(lie)批量推(tui)送(song)消(xiao)息到(dao)消(xiao)費(fei)端,消(xiao)費(fei)者(zhe)獲取消(xiao)息后(hou)恢復確(que)認(ren)信息。
(2)拉模式
消費者每次(ci)(ci)去隊列拉(la)消息,每次(ci)(ci)只能拉(la)一條。
如(ru)果(guo)只是(shi)(shi)想(xiang)獲取單條信息,而不是(shi)(shi)持(chi)續訂閱,可以采用拉模式。但是(shi)(shi)不建(jian)議使用循(xun)環(huan)來(lai)替代推模式,這樣(yang)會(hui)嚴重影響RabbitMQ的(de)性能。
4.2. 消息確認和拒絕
消費者獲取消息后,可以(yi)設(she)置不回(hui)復確認(ren),自動(dong)回(hui)復確認(ren),以(yi)及手(shou)動(dong)回(hui)復確認(ren)三(san)種(zhong)模式。
1、不回(hui)復確認,隊列會默認發出去的消息都(dou)會被正確的處理,無需等待確認,這個方式吞吐最高,但(dan)是可靠性性最差。
2、自動回(hui)復(fu)確認,有(you)可能(neng)消息沒有(you)正確處理(li),也被回(hui)復(fu),隊(dui)列無法識別,導致消息丟失(shi)。
3、手動(dong)回復(fu)確認,可以根據業(ye)務(wu)(wu)處理的(de)邏輯,把握回復(fu)的(de)時機,會(hui)帶(dai)來一定的(de)業(ye)務(wu)(wu)復(fu)雜度,但是可靠性(xing)最好。
對于設置Qos,可以進(jin)行(xing)批量回復,無需(xu)單次回復,這(zhe)個(ge)也是提(ti)高(gao)吞(tun)吐的重要手段(duan)。
消息(xi)(xi)者也可以拒絕消息(xi)(xi),比如消息(xi)(xi)解析(xi)錯誤(wu)等情況,通(tong)過(guo)設置(zhi)requeue,來告訴隊(dui)列是(shi)否需要(yao)重(zhong)新投遞還是(shi)丟棄該消息(xi)(xi)(如果配置(zhi)了死信隊(dui)列,那么丟失(shi)的消息(xi)(xi)會進入死信隊(dui)列,等待處(chu)理(li))。
如果選擇了重新(xin)投遞(di),那么(me)消(xiao)費(fei)的(de)順序是無法得到保證的(de)。
4.3. 隊列處理
為(wei)(wei)了(le)提升消費的(de)速度,生產(chan)發布的(de)消息(xi)到(dao)(dao)達隊(dui)列后(hou),如(ru)(ru)果隊(dui)列為(wei)(wei)空(kong),且有(you)消息(xi)者(zhe)(zhe)等待消息(xi),則直(zhi)接(jie)發送給消費者(zhe)(zhe),異步(bu)放入內存或者(zhe)(zhe)磁盤,提升消費速度。這(zhe)種標(biao)記已(yi)投遞的(de)方式與Kafka的(de)標(biao)記offset比較,邏(luo)輯上處理簡單了(le),確保了(le)消息(xi)不(bu)會被重復消費,但是也(ye)(ye)(ye)導(dao)致消息(xi)無法回(hui)溯。隊(dui)列創(chuang)建時,可以設(she)置(zhi)exclusive屬性,如(ru)(ru)果這(zhe)是為(wei)(wei)true,表示排他隊(dui)列,也(ye)(ye)(ye)就是當時創(chuang)建隊(dui)列連(lian)接(jie)(包括所有(you)的(de)信道)的(de)客(ke)戶端可以消費,其他的(de)無法消息(xi)。通俗些,只有(you)你當前這(zhe)個程序(或進程)進行消費處理,不(bu)希望別(bie)的(de)客(ke)戶端讀取(qu)到(dao)(dao)這(zhe)個隊(dui)列,一般用(yong)在RPC模(mo)式。一旦(dan)連(lian)接(jie)中(zhong)斷,排他隊(dui)列也(ye)(ye)(ye)將刪(shan)除,無論(lun)是否設(she)置(zhi)為(wei)(wei)持久化隊(dui)列。
5. 集群
單點無法確保高(gao)可用,同時一臺I/O能力有限,無法滿足高(gao)吞吐,在企業級應用中,一般(ban)都會部署集群(qun),提供服務。
5.1. 集群拓撲
集群節(jie)點間(jian)呈網狀(zhuang)連接,節(jie)點間(jian)相互通(tong)訊,每個節(jie)點上保(bao)留(liu)所有的(de)元數據,包括:
1、交換機,交換機的名稱和屬性。
2、隊列(lie)(lie)元數據,隊列(lie)(lie)的的名稱和(he)屬性。
3、綁定關(guan)系,交(jiao)(jiao)(jiao)換機與交(jiao)(jiao)(jiao)換機以(yi)及交(jiao)(jiao)(jiao)換機與隊列的綁定關(guan)系元數據(ju)。
4、vhost,vhost內的隊列,交換(huan)機和綁定的命名空間(jian)以及安全屬性(xing)。
所有節(jie)點全量(liang)保(bao)留元數據,會有以下的影(ying)響:
1、當(dang)客(ke)戶端連接(jie)某(mou)個節(jie)點創建這些(xie)元數(shu)據的(de)(de)時候(比如隊列,交換機),需要同步到所(suo)有節(jie)點上,并等待(dai)所(suo)有的(de)(de)節(jie)點的(de)(de)完成(cheng)后,才答復成(cheng)功,所(suo)以(yi)會有一定(ding)的(de)(de)延(yan)遲。
2、客戶(hu)端連接其中某個節(jie)點,都(dou)能獲取(qu)到(dao)所有(you)的(de)元數據(這(zhe)點和Kafka類(lei)似)。
消(xiao)息(xi)內容會不會所有節(jie)(jie)點(dian)都(dou)備份呢?答案是否。因為這會造成(cheng)大(da)量的(de)空(kong)間浪(lang)費(fei)(fei)(鏡像隊(dui)列(lie)(lie)除外,后面介紹)。如果一個(ge)生產(chan)者(消(xiao)費(fei)(fei)者)連接到(dao)某個(ge)節(jie)(jie)點(dian)發布(消(xiao)費(fei)(fei))隊(dui)列(lie)(lie)消(xiao)息(xi),但該(gai)(gai)隊(dui)列(lie)(lie)不在該(gai)(gai)節(jie)(jie)點(dian)上,那需要通過該(gai)(gai)節(jie)(jie)點(dian)路由。如下:
消息(xi)的(de)發布和消費鏈(lian)路加長,會導(dao)致延遲加長,吞吐量降低,為了減少(shao)影響(xiang),集群的(de)節(jie)點建議部署(shu)到(dao)在一個匯聚(ju)下,不要跨可用區部署(shu)(后面會介紹跨可用區的(de)方法)。我們希望(wang)客(ke)戶端對接的(de)節(jie)點上部署(shu)所需(xu)要的(de)隊(dui)列(lie),這個需(xu)要規劃得當。
5.2. 負載均衡
在多節(jie)點情況下,客(ke)戶端的請求通過負載均衡(heng),將流量均勻分攤(tan)到各節(jie)點,RabbitMQ集(ji)群可以通過HAProxy,LVS+keepalived等LB實(shi)現,如(ru)下圖所示(shi)。
5.3. 鏡像隊列
集群(qun)多(duo)節(jie)點(dian)能確保整(zheng)體(ti)服務的可(ke)(ke)用性,但是對于單個(ge)隊(dui)列(lie)來(lai)說,如果做不(bu)了(le)多(duo)節(jie)點(dian)部(bu)署,還是有(you)單節(jie)點(dian)故(gu)障的可(ke)(ke)能,ActiveMQ中采用主從模式保證了(le)高可(ke)(ke)用,在RabbitMQ中,也(ye)有(you)類似機制,稱之(zhi)為鏡像隊(dui)列(lie)(或者HA隊(dui)列(lie))。
(1)創建鏡像隊列
在(zai)創建隊(dui)列時,通過ha-mode,ha-params,ha-sync-mode來定義鏡像隊(dui)列個數,分布(bu),以(yi)及(ji)同步模式。
ha-mode,有(you)效值為,all,exactly,nodes。alls表示在(zai)所有(you)的(de)(de)節(jie)點上(shang)創(chuang)建(jian)鏡(jing)像隊列(lie),exactly表示指定(ding)個數(shu)的(de)(de)節(jie)點上(shang)創(chuang)建(jian),ha-params設置個數(shu);nodes指定(ding)在(zai)哪些節(jie)點上(shang)創(chuang)建(jian),ha-params指定(ding)節(jie)點名。
ha-sync-mode,有效值為automatic,manual。automatic表示(shi)新(xin)節(jie)點(dian)加(jia)入時(shi),默認自動同步鏡(jing)像隊(dui)列(lie)(lie)消息;manual表示(shi)新(xin)節(jie)點(dian)加(jia)入時(shi),不(bu)會自動同步鏡(jing)像隊(dui)列(lie)(lie)消息。因(yin)為同步操作會導致隊(dui)列(lie)(lie)的阻(zu)塞(sai),建議(yi)使用manual模式。
鏡像(xiang)隊列(lie)有一個master和多個slave組成(cheng)(cheng),創建(jian)完成(cheng)(cheng)后,直接連(lian)接的(de)(de)節(jie)點上的(de)(de)隊列(lie)為(wei)master,其他的(de)(de)為(wei)slave。
(2)發布消息
可以認(ren)為有(you)個隱藏的(de)fanout交換機(ji),向(xiang)所有(you)的(de)鏡(jing)像隊(dui)列進行廣播(bo)(這里與Kafka不(bu)大一(yi)樣(yang),它是通過follower向(xiang)master請求(qiu)同(tong)步內容)。當(dang)所有(you)的(de)鏡(jing)像隊(dui)列確認(ren)完成后(hou),才向(xiang)發布者回復publish-comfirm(所以鏡(jing)像個數不(bu)能太(tai)多(duo),否則影(ying)響(xiang)發布吞吐量,一(yi)般2-3個為宜),這樣(yang)能確保所有(you)的(de)鏡(jing)像隊(dui)列的(de)消息都是同(tong)步的(de)。
(3)消費
除了發布(bu)需要向master和(he)slave同(tong)時投(tou)遞(di)消(xiao)息(xi),其他的(de)(de)(de)都(dou)是(shi)由master負責向slave同(tong)步(bu),包(bao)括(kuo)消(xiao)費(fei),ack等,如果(guo)消(xiao)息(xi)者(zhe)(zhe)(zhe)連接(jie)master隊(dui)(dui)列(lie)所(suo)在(zai)的(de)(de)(de)節(jie)(jie)(jie)點(dian)(如消(xiao)費(fei)者(zhe)(zhe)(zhe)2),則(ze)消(xiao)息(xi)隊(dui)(dui)列(lie)信(xin)息(xi)即(ji)可;如果(guo)消(xiao)費(fei)者(zhe)(zhe)(zhe)連接(jie)的(de)(de)(de)是(shi)slave隊(dui)(dui)列(lie)所(suo)在(zai)的(de)(de)(de)節(jie)(jie)(jie)點(dian)(如消(xiao)費(fei)者(zhe)(zhe)(zhe)1),slave節(jie)(jie)(jie)點(dian)需要將(jiang)消(xiao)費(fei)指(zhi)令發送(song)給master節(jie)(jie)(jie)點(dian),master節(jie)(jie)(jie)點(dian)將(jiang)數據準(zhun)備好,發送(song)給到(dao)slave節(jie)(jie)(jie)點(dian),再投(tou)遞(di)給消(xiao)費(fei)者(zhe)(zhe)(zhe)。slave隊(dui)(dui)列(lie)為何不類似(si)于mysql提供讀服務(wu)呢,這(zhe)個(ge)和(he)Kafka的(de)(de)(de)設計(ji)類似(si),RabbitMQ的(de)(de)(de)負載(zai)粒度在(zai)隊(dui)(dui)列(lie)上,而不是(shi)整個(ge)節(jie)(jie)(jie)點(dian),只需要將(jiang)master隊(dui)(dui)列(lie)均衡(heng)(heng)分布(bu),就是(shi)平衡(heng)(heng)整個(ge)節(jie)(jie)(jie)點(dian)的(de)(de)(de)壓力(li)。
如(ru)圖,節(jie)點(dian)1,節(jie)點(dian)2,節(jie)點(dian)3分(fen)別分(fen)擔隊(dui)(dui)列1,隊(dui)(dui)列2,隊(dui)(dui)列3的壓力。
(4)失效轉移
當(dang)(dang)slave所(suo)在節點掛掉后,除了與(yu)slave相連(lian)的客戶端全部(bu)斷開(kai)連(lian)接,其他的沒有影響。但是當(dang)(dang)master所(suo)在的節點宕(dang)機后,就會產生連(lian)鎖(suo)反應。
1)與(yu)master節(jie)點所有的客戶端連(lian)接(jie)斷開。
2)選舉最(zui)老的(de)slave節點(dian)為master節點(dian),因為最(zui)老的(de)slave與master之間的(de)同步狀態是(shi)最(zui)好的(de),但是(shi)也存在(zai)未同步的(de)信息丟失。
3)新的(de)master節點重(zhong)續入隊所有(you)unack信息(xi)(xi)(xi)。消費(fei)者獲取信息(xi)(xi)(xi)進行消費(fei),還(huan)沒來得及(ji)ack,或者ack信息(xi)(xi)(xi)沒有(you)同步到新的(de)master上(shang),新master無法確(que)認這部分(fen)消息(xi)(xi)(xi)是否被正確(que)消費(fei),為了安全起(qi)見,所有(you)的(de)unack都重(zhong)新入隊,此時客戶端會有(you)重(zhong)復消費(fei)的(de)可能。
4)如果消費端直連master所在節點(如上圖中的消費者2),master節點宕機后,TCP連接斷開,重入附加并監聽新的master節點;如果是連接slave所在節點(如上圖中的消費者1),就無法感知master節點宕機了,只認為隊列沒有消息。此時,在basicComsume消費時需要指定x-cancel-on-ha-failover參數,監聽master節點斷開的通知,接受到通知后重入附加并監聽新的master。這點非常重要,否則會導致消息大量積壓,但是消息端又無消息消費的問題。這個(ge)問題(ti)在(zai)(zai)Kafka中是不(bu)存在(zai)(zai)的,分(fen)區(qu)選舉(ju)的結果保存在(zai)(zai)所(suo)有的節點(dian)上,客戶端(duan)通過元數據更新,獲取最(zui)新分(fen)區(qu)leader信息(xi)。
6. 跨集群
現在大型的(de)網(wang)站系統,為了實現異地(di)容災,一般采用多機房或者混合云部署,以下(xia)(xia)分析單(dan)個集群在這(zhe)種場景下(xia)(xia)的(de)集中方(fang)案(an)
1、僅部署一個機房
單個(ge)集群僅部署(shu)在北京機房(fang),南京機房(fang)的發布者和消(xiao)費者跨機房(fang)訪(fang)問。這個(ge)方案有以下幾(ji)個(ge)問題。
(1)無法做到容災,一旦北京機房掛了(le),整個集群不可用。
(2)降(jiang)低吞吐(tu)量(liang),南京(jing)機房的客戶(hu)端發(fa)送請(qing)求后(hou),會阻(zu)塞住,直到節點回(hui)復(fu)確(que)認,由于兩個機房的存在延遲(chi)(假設北京(jing)到南京(jing)機房的RTT在30ms),導致每次請(qing)求時間增加,降(jiang)低了吞吐(tu)量(liang)。
2、延展機房部署
單(dan)個(ge)集群(qun)跨機房(fang)部(bu)署(shu),節點(dian)1,節點(dian)2部(bu)署(shu)到(dao)北京機房(fang),節點(dian)3部(bu)署(shu)到(dao)南京機房(fang)。這個(ge)方(fang)案通過鏡像(xiang)隊列在節點(dian)1,節點(dian)2與節點(dian)3間互備(bei),能做(zuo)到(dao)部(bu)分(fen)的容災。但是也有如下問題:
(1)跨(kua)機房(fang)(fang)請求,部分(fen)消費者(zhe)和發(fa)布者(zhe)還是需(xu)要(yao)跨(kua)機房(fang)(fang)請求,與(yu)方(fang)案(an)1類似,導致請求時間增加,降低(di)了吞吐(tu)量。
(2)腦裂(lie),異(yi)地間的網(wang)絡環(huan)境復雜,網(wang)絡波動(dong)會(hui)導(dao)致分區,進而"腦裂(lie)",單個集群是無法做到跨可用區的。
單個集(ji)群(qun)是無法滿足(zu)跨機(ji)房(fang)場景,需要(yao)采用多集(ji)群(qun)部署解決這(zhe)個問題(ti),跨集(ji)群(qun)間的"橋(qiao)接"可以通過Federation,Shovel兩(liang)種方式。
6.1. Federation(聯邦)
Federation,可(ke)以翻譯為"聯(lian)(lian)邦(bang)"。Federation 可(ke)以通過AMQP 協議讓原(yuan)本發(fa)(fa)送到(dao)某(mou)個Broker(或集群)中(zhong)的(de)(de)(de)交(jiao)換(huan)器(或隊(dui)列)上的(de)(de)(de)消息能夠轉發(fa)(fa)到(dao)另一個Broker(或集群)中(zhong)的(de)(de)(de)交(jiao)換(huan)器(或隊(dui)列)上,兩(liang)方(fang)的(de)(de)(de)交(jiao)換(huan)器(或隊(dui)列〉看起(qi)來是(shi)以一種"聯(lian)(lian)邦(bang)"的(de)(de)(de)形式(shi)在運(yun)作(zuo)。又分為聯(lian)(lian)邦(bang)交(jiao)換(huan)機和(he)聯(lian)(lian)邦(bang)隊(dui)列兩(liang)種模式(shi)。
1、聯邦交換機
假(jia)設發(fa)布者1位于北京(jing)機(ji)房(fang)(fang),需(xu)要發(fa)布消(xiao)(xiao)息到(dao)南京(jing)機(ji)房(fang)(fang)的(de)(de)(de)Broker2節點(dian)上,首先在Broker2節點(dian)的(de)(de)(de)ExchangeA上建(jian)立(li)(li)到(dao)Broker1節點(dian)的(de)(de)(de)federation link,Broker1上會(hui)建(jian)立(li)(li)一(yi)個同(tong)名的(de)(de)(de)交互機(ji)ExchangeA,同(tong)時建(jian)立(li)(li)一(yi)個內部的(de)(de)(de)交換(huan)機(ji)ExchangeA->Broker2, 并(bing)通過路由(you)鍵"rkA"將這(zhe)兩個交換(huan)器(qi)綁(bang)定(ding)(ding),同(tong)時,還會(hui)創建(jian)一(yi)個Exchange->Broker2隊(dui)列,并(bing)與Exchange->Broker2交換(huan)機(ji)綁(bang)定(ding)(ding),Federation插件會(hui)在Broker1的(de)(de)(de)ExchangeA->Broker2隊(dui)列與Broker2的(de)(de)(de)ExchangeA建(jian)立(li)(li)AMQP連接,實時消(xiao)(xiao)費Exchange->Broker2隊(dui)列的(de)(de)(de)消(xiao)(xiao)息。發(fa)布者1僅(jin)需(xu)要把消(xiao)(xiao)息發(fa)送了ExchangeA上,保存(cun)到(dao)隊(dui)列ExchangeA->Broker2上即可,剩下的(de)(de)(de)時就交由(you)Federation插件搞定(ding)(ding)。
2、聯邦隊列
聯邦交(jiao)(jiao)換(huan)機(ji)(ji)是在兩個(ge)節點的交(jiao)(jiao)換(huan)機(ji)(ji)間(jian)建立(li)連接(jie),聯邦隊(dui)列就(jiu)是在兩個(ge)隊(dui)列間(jian)建立(li)連接(jie)
聯邦(bang)隊(dui)列(lie)(lie)(lie)可以(yi)看(kan)做(zuo)互為擴展隊(dui)列(lie)(lie)(lie),如圖中的Queue1與(yu)Queue3建立聯邦(bang)隊(dui)列(lie)(lie)(lie),當Queue3有消息(xi)堆積,消費(fei)者1優先(xian)將Queue3的消費(fei)完,此時則(ze)會(hui)從Queue1拉(la)取(qu)消息(xi);反(fan)之亦(yi)然,如果Queue1的隊(dui)列(lie)(lie)(lie)消費(fei)完成,將會(hui)從Queue3中拉(la)取(qu)消息(xi),消費(fei)者1繼續消息(xi),所以(yi)Queue1與(yu)Queue3間的消息(xi)是(shi)可以(yi)"漂移(yi)"的。聯邦(bang)隊(dui)列(lie)(lie)(lie)會(hui)讓消費(fei)能力強的一側多消費(fei)些,確保隊(dui)列(lie)(lie)(lie)間的平衡。聯邦(bang)交換機的消息(xi)流向(xiang)(xiang)是(shi)單向(xiang)(xiang)的,聯邦(bang)隊(dui)列(lie)(lie)(lie)消息(xi)流向(xiang)(xiang)是(shi)雙向(xiang)(xiang)。
6.2. Shovel
以(yi)Broker1的(de)(de)(de)(de)Queue為源,Broker2的(de)(de)(de)(de)Exchange為目標,建立shovel link,發布者發送的(de)(de)(de)(de)Msg1消(xiao)息,存入(ru)queue,Shovel消(xiao)費后存入(ru)Broker2的(de)(de)(de)(de)queue。當Broker1的(de)(de)(de)(de)queue產(chan)生消(xiao)息堆積時,通過shovel link轉移消(xiao)息到(dao)其他集(ji)群進行消(xiao)息,減少broke1的(de)(de)(de)(de)壓力。
7. 可靠性
可靠性(xing)是衡量消(xiao)息組(zu)件的(de)重要(yao)因素,但可靠性(xing)都是相對的(de),沒有任何(he)組(zu)件確(que)保百(bai)分百(bai)可靠。我(wo)們看(kan)下RabbitMQ有哪些(xie)措施(shi)保證高可靠,將整個消(xiao)息的(de)生命周期分為四個階段來逐一分析
7.1. 第一階段:消息投遞
前面介紹了生產(chan)者確(que)認的(de)機(ji)制(zhi),通(tong)過對(dui)信(xin)道(dao)設置(zhi)comfirm,對(dui)于(yu)每(mei)條投遞(di)的(de)消(xiao)息,都(dou)會(hui)返回確(que)認信(xin)息。除此之外(wai),生產(chan)者還可以(yi)通(tong)過事務機(ji)制(zhi)確(que)保消(xiao)息投遞(di)的(de)可靠性。
- 事務機制
生(sheng)產者(zhe)通過二(er)階段提交方式,確報批量消息的事務一致性。
1、客戶端(duan)發送Tx.select,將信道置為(wei)事(shi)務(wu)模式(shi)。
2、Broker回復Tx.selectOk,確認已將信道置(zhi)為(wei)事務模式(shi)。
3、客戶(hu)端發(fa)送批量消息(xi)。
4、根據結(jie)果(guo)(捕獲(huo)異常),客(ke)戶(hu)端確定(ding)是否提交還是回滾。
5、Broker確認(ren)提交或者回滾指令。
事務機制能確保批量消息投遞的一致性,由于其同步操作,導致性能大大下降。事務與生產者確認機制是互斥的,一般情況下我們采用輕量級的生產者確認機制。
7.2. 第二階段:消息路由
1、mandatory
當消息(xi)發送到交換機,有可(ke)能(neng)由于路由配置(zhi)錯誤,導致(zhi)消息(xi)無法正確投遞。此時,有兩種處理方式,一種是直(zhi)接丟棄,一種是反饋(kui)給生產者。第一章節(jie)在介紹調(diao)用(yong)publish的時候(hou),說到mandatory參(can)數,該(gai)參(can)數如果設(she)(she)置(zhi)為(wei)true,會回(hui)調(diao)生產端(duan)的監聽接口,返(fan)回(hui) 消息(xi);如果設(she)(she)置(zhi)為(wei)false,則直(zhi)接丟棄。顯然設(she)(she)置(zhi)為(wei)true,將提(ti)升(sheng)可(ke)靠性。
2、備份交換機
保留路由(you)(you)錯(cuo)誤的消息(xi),可以設置(zhi)備份(fen)交(jiao)換(huan)機,路由(you)(you)失敗的消息(xi)會丟到備份(fen)交(jiao)換(huan)機,保存到特定的隊(dui)列。通過該隊(dui)列監控(kong)路由(you)(you)錯(cuo)誤的消息(xi)。
7.3. 第三階段:消息存儲
消息存(cun)儲階段,介紹(shao)了(le)以下幾種可靠性(xing)機(ji)制
1、持久化,隊列和消息(xi)設置為(wei)持久化,寫入到磁(ci)盤,確(que)保Broker宕(dang)機重(zhong)啟后,消息(xi)不丟失(shi)。但(dan)是消息(xi)先寫入頁(ye)面緩(huan)存(cun),再批(pi)量寫入磁(ci)盤,如果在(zai)這期(qi)間宕(dang)機,還是會存(cun)在(zai)丟失(shi)風險。
2、鏡像(xiang)隊(dui)列(lie),采用(yong)msater-slave多副本機制,最大程度確保可用(yong)性。
3、死信隊列
死信(xin)隊(dui)列(lie)(lie)是一(yi)種特殊隊(dui)列(lie)(lie),存儲死信(xin)消(xiao)息(xi)(xi),有以下幾種情況下的消(xiao)息(xi)(xi)會(hui)變成死信(xin)消(xiao)息(xi)(xi)。任何隊(dui)列(lie)(lie)都(dou)可以設置一(yi)個死信(xin)交換機,將(jiang)符(fu)合條件的消(xiao)息(xi)(xi)路由到死信(xin)隊(dui)列(lie)(lie)。死信(xin)隊(dui)列(lie)(lie)也(ye)是一(yi)個普通隊(dui)列(lie)(lie),里面(mian)的消(xiao)息(xi)(xi)也(ye)可被訂閱。監控死信(xin)隊(dui)列(lie)(lie),及時處(chu)理死信(xin)消(xiao)息(xi)(xi),確保消(xiao)息(xi)(xi)的可靠性。
1)消(xiao)息被拒絕(Basic.reject,并設置(zhi)request為false)
2)消息過期(超過expiration時(shi)間)
3)隊(dui)列達到(dao)最(zui)大長度。
7.4. 第四階段:消息消費
消(xiao)(xiao)費(fei)(fei)者獲取消(xiao)(xiao)息后,通過手動確(que)認,能最(zui)大程度消(xiao)(xiao)息消(xiao)(xiao)費(fei)(fei)的可靠(kao)性,也可以(yi)拒(ju)絕(jue)消(xiao)(xiao)息,并設置(zhi)request為true,該消(xiao)(xiao)息會重新入隊。
8. 高吞吐
高可用,高吞吐在有些場(chang)景中(zhong)是互斥的,所以需要(yao)根據自身(shen)的業(ye)務(wu)進行抉擇,以下分析提高吞吐率的一(yi)些措施。
1、生產者不設置信道確(que)認,能大幅提高吞吐量,但是可靠性(xing)較差,可以用(yong)在允許部(bu)分消息丟失的場景(jing),比(bi)如統計用(yong)戶點擊(ji)量。
2、按照(zhao)業務規則(ze)將隊列拆(chai)分多個,分布到不同的(de)節點(dian)上。
3、一(yi)個節點(dian)上設置(zhi)合理的(de)(de)隊(dui)列個數,一(yi)個隊(dui)列對(dui)應一(yi)個線程,在一(yi)個多核的(de)(de)節點(dian),使用多個隊(dui)列與消費者可以獲得更好的(de)(de)吞吐量,將(jiang)(jiang)隊(dui)列數量設置(zhi)為等(deng)于服務器cpu核數將(jiang)(jiang)獲得最(zui)佳(jia)吞吐量。
4、限制(zhi)隊列(lie)的大小,設置TTL或(huo)者max-length限制(zhi)隊列(lie)大小。
5、自動刪(shan)除(chu)不需要的(de)隊列(lie),設置隊列(lie)的(de)TTL,配置auto-delete以及排它隊列(lie),刪(shan)除(chu)無用的(de)隊列(lie)。
6、設置盡可能大(da)的預取個數(shu),不(bu)過(guo)這個是需要綜(zong)合網絡帶寬,客戶端緩存(cun),消息處理速度等(deng)各方面的因素評估(gu)。