亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

消息選型-Rabbitmq篇

2023-09-21 10:46:56
18
0

 

1.   架構

我(wo)們先來看(kan)下(xia)整體架(jia)構。

 

Producer,消(xiao)息(xi)(xi)生(sheng)產者,即消(xiao)息(xi)(xi)的(de)發布方, 生(sheng)產者生(sheng)產的(de)消(xiao)息(xi)(xi),首(shou)先發布到指定的(de)交(jiao)換(huan)機上(shang),交(jiao)換(huan)機通過路由鍵(jian)(RoutingKey)的(de)匹配,選擇對應的(de)隊(dui)列(lie)進行(xing)投遞(di)。消(xiao)息(xi)(xi)者訂閱隊(dui)列(lie),消(xiao)費隊(dui)列(lie)的(de)消(xiao)息(xi)(xi)。

Connectiton,客(ke)戶端與Broker間的TCP連接(jie)。

Channel,信道,每個連接采(cai)用多路復用,包(bao)含(han)多個信道。producer與Broker間采(cai)用信道傳遞數據。

Broker,Rabbit服務節點。

Vhost,虛機(ji)機(ji),一個節點下包含多(duo)(duo)個vhost,vhost間的exchange,queue相互(hu)隔離(li)。就(jiu)好(hao)比一臺(tai)(tai)物理機(ji)上(Broker)部署多(duo)(duo)臺(tai)(tai)虛機(ji)(vhost),虛擬機(ji)采用不同的用戶名密碼登錄,實(shi)現多(duo)(duo)租戶。

Exchange,交換機,消(xiao)息(xi)首(shou)先會(hui)傳(chuan)遞到交換機,由(you)交換機匹配(pei)路由(you)鍵(RoutingKey)決定投遞到哪個queue。類比郵政局。

Queue,隊列,存儲(chu)消息的(de)數據結(jie)構。類比(bi)小區(qu)的(de)快(kuai)遞柜(ju)。

Binding,綁(bang)(bang)定(ding)(ding),交換機與隊列(lie)間通過(guo)路由鍵(RoutingKey)進行綁(bang)(bang)定(ding)(ding)起來(lai)。RoutingKey類比目的地址(zhi)。

Consumer,消費者,即消息的接受方(fang)。

2.   消息發布

生產(chan)者(zhe)創建(jian)和啟(qi)動(dong)信道(dao)連接,聲明交(jiao)換(huan)機機,隊列,路由鍵等信息(xi)(xi),消(xiao)息(xi)(xi)封裝(zhuang)成幀(zhen),通過(guo)信道(dao)發(fa)送到(dao)Broker。

 

2.1.  創建連接和信道

 一個(ge)(ge)應用程序(生產者)與RabbitMQ服(fu)務節點,維(wei)護一個(ge)(ge)TCP連接(Connect),在(zai)(zai)這(zhe)個(ge)(ge)連接中可以(yi)創建多(duo)個(ge)(ge)信道(dao)用于信息的傳(chuan)遞,各個(ge)(ge)信道(dao)間相互隔離。這(zhe)樣做的目的就是避(bi)免在(zai)(zai)多(duo)線程的情(qing)況(kuang)下,頻繁(fan)開啟和關(guan)閉多(duo)個(ge)(ge)TCP連接,帶(dai)來的性能消耗(每個(ge)(ge)TCP經歷三次(ci)握手,四(si)次(ci)揮手),實際就是TCP的多(duo)路(lu)復用,類似HTTP2.0的原理(li)。

Connection connection = factory.newConnection() ; //創建連接
Channel channel = connection.createChannel() ; //創(chuang)建(jian)信(xin)道

生產者與Broker之間完成(cheng)了8次交(jiao)互,分(fen)別為Connect的啟(qi)動,調整(zheng),打開以及Channel的開啟(qi),最(zui)終打開信道。

 

后續所有(you)與Broker通(tong)訊都是基于該信道,從(cong)代碼(ma)層面看,都是基于channel對象操(cao)作(zuo)。

2.2.  創建交換機,隊列

 

生產者將消(xiao)息(xi)發送到交換機,交換機根據(ju)(ju)路(lu)由鍵(jian),投遞到對應(ying)的(de)目的(de)隊列保(bao)存。這(zhe)個過(guo)程和(he)郵(you)(you)件(jian)信(xin)息(xi)非常類似(si),寄件(jian)人將信(xin)件(jian)送到郵(you)(you)政(zheng)局,郵(you)(you)政(zheng)局根據(ju)(ju)信(xin)件(jian)的(de)地(di)址投遞到目的(de)地(di)的(de)郵(you)(you)箱(xiang)中。

 RabbitMQ提供了(le)通過(guo)管理平臺或(huo)者命名(ming)行(xing)預(yu)先(xian)(xian)創(chuang)(chuang)建(jian)交換(huan)機和隊(dui)列(lie)。采(cai)用預(yu)先(xian)(xian)定義還是客(ke)戶端(duan)創(chuang)(chuang)建(jian),就看業務(wu)的需要,比如我們(men)對交互(hu)機以(yi)及隊(dui)列(lie)預(yu)先(xian)(xian)做了(le)充分的規劃,那就采(cai)用預(yu)先(xian)(xian)創(chuang)(chuang)建(jian);如果需要創(chuang)(chuang)建(jian)一些臨時的交換(huan)機或(huo)者隊(dui)列(lie),客(ke)戶端(duan)創(chuang)(chuang)建(jian)更(geng)適合(he)。

要想創建(jian)一個可用的交互機和隊列,需要完(wan)成以下步驟(zou)

(1)創建交換機

以Java客戶端(duan)為例,調用exchangeDeclare方法。

exchangeDeclare(String exchange ,String type , boolean durable ,boolean autoDelete , boolean internal ,Map<String, Object> arguments)

該方法有很(hen)多重(zhong)載的方法,我們不一(yi)一(yi)介(jie)(jie)紹(shao),只介(jie)(jie)紹(shao)下幾個重(zhong)要的參數

exchange,定義該交換器的名稱

type,定義該交換器(qi)的路由類型,分類fanout,direct,topic,headers等(后面我們(men)重點介紹)

autoDelete,是否自動(dong)刪(shan)除(chu),當設(she)置為true,所有與(yu)這個交(jiao)(jiao)換器(qi)綁定的隊(dui)列或(huo)者交(jiao)(jiao)換器(qi)都與(yu)此解綁,就會自動(dong)刪(shan)除(chu)。

(2)創建隊列

以(yi)Java客戶端為例,調用exchangeDeclare方法。

queueDeclare (String queue , boolean durable , boolean exclusive,boolean autoDelete, Map<String,Object> arguments)

其中重要的參數如下:

queue,隊列名稱

durable,隊列是(shi)·否持(chi)久化(注意(yi),與消(xiao)息(xi)是(shi)否持(chi)久化不是(shi)一(yi)回事,后面再介紹(shao))

exclusive,是否(fou)排他(后(hou)面介紹)

autoDelete,是(shi)否自(zi)動刪除,當設置為true,所有與(yu)這(zhe)個(ge)隊列連接的消(xiao)費(fei)者(zhe)都斷開時,才會自(zi)動刪除。

(3)綁定

交(jiao)(jiao)換機(ji)(ji)和隊列創建好(hao)了(le),接下來就(jiu)需要(yao)將隊列綁定到(dao)交(jiao)(jiao)換機(ji)(ji)上,告訴交(jiao)(jiao)換機(ji)(ji),什么樣的(de)消息(xi)需要(yao)投遞給該交(jiao)(jiao)換機(ji)(ji)。

queueBind(String queue , String exchange , String routingKey)

queue,待綁定的隊列名。

exchange,需要綁定到的交(jiao)換(huan)機(ji)。

routingKey,用來綁定隊列(lie)和交換機的(de)路由鍵(jian)(有些地(di)方為了區別發(fa)送的(de)時候(hou)的(de)路由鍵(jian),將(jiang)綁定時候(hou)的(de)鍵(jian)稱之為綁定鍵(jian)Bindkey)

如果交換機和隊(dui)列已經存在,再(zai)次聲明,Broker不會(hui)重新創(chuang)建,直(zhi)接返回成功(gong)。

2.3.  創建消息

RabbitMQ的(de)消(xiao)息分為消(xiao)息頭與(yu)消(xiao)息體,類(lei)似http協(xie)議的(de)消(xiao)息結構(gou)。

消(xiao)息頭定了消(xiao)息的相關屬性(xing),主要的有以下:

content-type,傳(chuan)輸消息體的MIMEl類型,比如:application/json

content-encoding,消息體的編(bian)碼(ma),如(ru)gzip

expiration,消息的過期時間

delivery-mode,消息是(shi)否(fou)持久化(hua)(hua)到(dao)磁盤,1表示(shi)非持久化(hua)(hua),2表示(shi)持久化(hua)(hua)。

priority,消(xiao)息(xi)的優先(xian)(xian)級(ji),數值越(yue)大表示該消(xiao)息(xi)的優先(xian)(xian)級(ji)越(yue)高(gao),優先(xian)(xian)消(xiao)費(fei)。

消息體(ti)主要是用戶自定的(de)消息,可以是json,也可以是xml,通過(guo)序(xu)列化成二(er)進制數據。

以Java客戶端為例,

//創建消息頭

Map<String , Object> headers = new HashMap<String , Object>() ;

headers.put( " localtion"&nbsp;, "here " );

headers . put( " time ;" , " today " );

//創建消息

byte[] messageBodyBytes = "H ello , world! ". getBytes();

2.4.  發布消息

 消息頭和消息體都創建完成,接下來就是向指定的交換機發布消息,聲明routingKey,告知交換機投遞到那個隊列上。
以Java客戶端(duan)為例,

channe1 .basi cPublish(exchangeName,routingKey,mandatory,immediate,

        &nbsp;       new AMQP.BasicProperti es.Buil der ()

                  .headers(headers)

                &nbsp;.build()) ,

              &nbsp;  messageBodyBytes) ;

exchangeName,交換機的名稱

routingKey,路由鍵

mandatoryimmediate,這兩(liang)個(ge)參數后面(mian)再介紹(shao)。

客戶端會將消息(xi)封裝方法幀,消息(xi)頭幀,消息(xi)體幀。

 

每(mei)個(ge)(ge)消(xiao)息(xi)體幀(zhen)最大是131Kb,如果超過需(xu)(xu)要分割成多(duo)個(ge)(ge)。所以一個(ge)(ge)完(wan)整消(xiao)息(xi),至少需(xu)(xu)要三個(ge)(ge)幀(zhen)(方法幀(zhen),消(xiao)息(xi)頭幀(zhen),消(xiao)息(xi)體幀(zhen))。

 

2.5.  消息響應

當生(sheng)產(chan)者(zhe)(zhe)將(jiang)消(xiao)息發送到交換機(ji)(ji)后,交互(hu)機(ji)(ji)是否將(jiang)消(xiao)息正確(que)路(lu)由,并存入隊(dui)列(lie)呢?對于可靠性要求高的系統,這個信息對于生(sheng)產(chan)者(zhe)(zhe)很重要。RabbitMQ提供了兩種方式,事(shi)務和生(sheng)產(chan)確(que)認(ren)機(ji)(ji)制(zhi)(pulisher comfirm)。這里我們(men)介紹下生(sheng)產(chan)者(zhe)(zhe)確(que)認(ren)機(ji)(ji)制(zhi),事(shi)務放到后面介紹。

     生產(chan)者將(jiang)信(xin)道(dao)設(she)置成comfirm模式,所(suo)有(you)通(tong)過該信(xin)道(dao)發(fa)送的消(xiao)息(xi)(xi)都會被(bei)分配一個(ge)唯一的ID,一旦消(xiao)息(xi)(xi)被(bei)投送到(dao)正確的隊列后,將(jiang)會返回(hui)確認信(xin)息(xi)(xi)(Basic.Ack),生產(chan)者接受信(xin)息(xi)(xi)后確認消(xiao)息(xi)(xi)已發(fa)布成功;當然,如果投送失(shi)敗,會返回(hui)Basic.Nack信(xin)息(xi)(xi),生產(chan)者根據(ju)業務(wu)場景判斷是否需要重發(fa)。

   

確認(ren)信息(xi)是通過(guo)異步回調返(fan)回,不會阻塞生(sheng)產者發送其(qi)他消息(xi),這(zhe)種模式性能較高(gao),但(dan)是在重試(shi)的情況下,無法確保消息(xi)的順(shun)序寫入(ru)。     

3.   消息路由與存儲

3.1.  消息路由

消息發送到節點的(de)交換(huan)機(ji)(ji)后,交換(huan)機(ji)(ji)需要通過路由鍵投遞到對應的(de)隊列上(shang),前面介紹過交換(huan)機(ji)(ji)有四種路由類(lei)型,分別(bie)為(wei)direct,fanout,topic,headers。

(1)direct

任何綁定(ding)在交換機的(de)隊列,只要它的(de)路(lu)由鍵和發(fa)布消(xiao)(xiao)息的(de)路(lu)郵建一(yi)致,就能收到消(xiao)(xiao)息。一(yi)般用于需要將一(yi)個(ge)消(xiao)(xiao)息投遞到一(yi)個(ge)或者多個(ge)確定(ding)的(de)目標隊列。

 

(2)fanout

所(suo)有發往(wang)fanout交換機的(de)消息被投遞到(dao)所(suo)有綁定到(dao)該交換機的(de)隊列中,這(zhe)個(ge)應用(yong)于"廣(guang)播"模式,該模式下,路(lu)由鍵不起作用(yong)。

 

(3)topic

     采用句點分(fen)隔的形式(shi),隊列可以通(tong)過使用基于通(tong)配符(fu)(*和#)的模式(shi)匹配的方式(shi)來綁定(ding)到路(lu)由鍵 上(shang),發送的消(xiao)息攜帶的路(lu)由鍵匹配上(shang)就會投遞到該隊列。

 

 

 4headers

在隊列綁定時,定義匹配headers的參數(shu)的值,并設(she)(she)置x-match參數(shu)。發布(bu)消息時,定義header的屬性值,當x-match設(she)(she)置為(wei)(wei)all表(biao)示(shi)所有的消息的header屬性值都(dou)匹配才能路由到隊列,設(she)(she)置為(wei)(wei)any表(biao)示(shi),任何(he)一個header的屬性值匹配就可以(yi)路由到隊列。

headers類型的(de)靈活性以(yi)及效率都要比其(qi)他的(de)類型差,所以(yi)用的(de)比較少。

3.2.  消息存儲

隊列(lie)其實還是(shi)(shi)個(ge)邏輯概念(nian),它包含rabbit_amqqueue_process(隊列(lie)進程)和backing_queue(維護5個(ge)狀態(tai)棧)。通過rabbit_amqqueue_process負責(ze)接受(shou)生產者發布(bu)的消(xiao)(xiao)(xiao)息(xi)(xi),向消(xiao)(xiao)(xiao)費者交付消(xiao)(xiao)(xiao)息(xi)(xi),處理消(xiao)(xiao)(xiao)息(xi)(xi)的確認等;backing_queue是(shi)(shi)消(xiao)(xiao)(xiao)息(xi)(xi)存儲的具體形式和引擎,并向rabbit_amqqueue_process提供相關(guan)的接口。

delivery-mode參(can)數,1表示非持(chi)(chi)久化,2表示持(chi)(chi)久化。非持(chi)(chi)久化的消息(xi)(xi)置(zhi)于內(nei)存,當內(nei)存達到設定閥(fa)值(zhi)(vm memory high watermark paging ratio ),逐步寫入(ru)磁(ci)盤;持(chi)(chi)久化消息(xi)(xi)則直接寫入(ru)磁(ci)盤。磁(ci)盤的寫入(ru)速(su)度要(yao)(yao)遠小(xiao)于內(nei)存(都是順序寫入(ru)的前提下),并且(qie)需要(yao)(yao)等待落(luo)盤后(hou)才返回(hui)生(sheng)產者確認信息(xi)(xi),所以持(chi)(chi)久化模(mo)式的吞吐量要(yao)(yao)低(di)于非持(chi)(chi)久化模(mo)式,但是可靠性強。

對于非持(chi)久化需(xu)要注意一點,如果大(da)量的(de)消息堆積內(nei)存(cun),來不及(ji)進行(xing)落盤釋放內(nei)存(cun),一旦達到內(nei)存(cun)的(de)告警(jing)閥值(zhi)(vffi memory high watermark),就會(hui)產(chan)生告警(jing),為確保(bao)節點的(de)可用性(xing),會(hui)阻(zu)塞所有生產(chan)者的(de)連接,直(zhi)到內(nei)存(cun)恢復到正常狀態,這種情況(kuang)下,會(hui)嚴重影響吞(tun)吐的(de)。

持久化層分為兩部分,隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。隊列索引負責維護隊列中落盤消息的信息,包 括消息的存儲地點、是否己被交付給消費者、是否己被消費者 ack 等,每個隊列都有一個對應的隊列索引,以".idx"為文件后綴。消息存儲以鍵值隊的形式存儲消息,被所有的隊列共享,每個(ge)節點有且(qie)只(zhi)有一(yi)個(ge),以(yi)".rdq"為(wei)文件后(hou)綴,順序寫入。

除了消息(xi)可以設(she)(she)置(zhi)持(chi)久化,交換機和隊(dui)列(lie)也可以設(she)(she)置(zhi)持(chi)久化,三者(zhe)需要同時設(she)(she)置(zhi),才能(neng)確保真正的持(chi)久化。如果隊(dui)列(lie)沒有設(she)(she)置(zhi)持(chi)久化,消息(xi)是(shi)(shi)持(chi)久化的,那么就會(hui)導致隊(dui)列(lie)刪除了,但是(shi)(shi)消息(xi)還在(zai),無法消費。

     創建消(xiao)息的(de)時(shi)候,可以(yi)通過(guo)消(xiao)息頭的(de)expiration設(she)置消(xiao)息的(de)過(guo)期(qi)時(shi)間,如果消(xiao)息到達了過(guo)期(qi)時(shi)間,沒有被消(xiao)息,將會(hui)被投遞到死(si)信隊列(lie)(見后(hou)面章節)

3.3.  消息刪除

消息(xi)存儲時,會(hui)(hui)在ETS(Erlang Term Storage)表中記錄消息(xi)在文(wen)件(jian)(jian)中的位(wei)置映射和文(wen)件(jian)(jian)的相關信息(xi)。消息(xi)被(bei)正確的消費(fei)后,即(ji)(ji)會(hui)(hui)被(bei)刪(shan)除,首先刪(shan)除該表的記錄,但是不會(hui)(hui)立即(ji)(ji)刪(shan)除文(wen)件(jian)(jian),而(er)是僅(jin)標記待刪(shan)除數據(ju),待一個(ge)文(wen)件(jian)(jian)都是垃圾數據(ju)時可以將這個(ge)文(wen)件(jian)(jian)刪(shan)除。

4.   消息消費

4.1.  消費模式

      與生(sheng)產(chan)者(zhe)(zhe)一(yi)樣,消(xiao)(xiao)(xiao)費者(zhe)(zhe)需要與節點建(jian)立連接,打開信道,實現通訊(xun)。消(xiao)(xiao)(xiao)費者(zhe)(zhe)需要訂閱隊列進行消(xiao)(xiao)(xiao)費,消(xiao)(xiao)(xiao)費的模式(shi)分為推(push)和拉(pull)兩種.

   (1)推模式

&nbsp; 這個模式(shi)RabbitMQ推薦的(de),隊列(lie)會將消息推送給(gei)消費者,流程如下:

 

為了提高消(xiao)費(fei)速度,消(xiao)費(fei)者(zhe)可(ke)以(yi)評估自身(shen)的消(xiao)費(fei)能(neng)力,預設(she)(she)Qos,也就是設(she)(she)置一(yi)次(ci)可(ke)以(yi)獲取消(xiao)息的個數(shu)(類似(si)TCP的滑(hua)塊窗口),隊列(lie)批量推(tui)送(song)消(xiao)息到(dao)消(xiao)費(fei)端,消(xiao)費(fei)者(zhe)獲取消(xiao)息后(hou)恢復確(que)認(ren)信息。

(2)拉模式

 

消費者每次(ci)(ci)去隊列拉(la)消息,每次(ci)(ci)只能拉(la)一條。

 

如(ru)果(guo)只是(shi)(shi)想(xiang)獲取單條信息,而不是(shi)(shi)持(chi)續訂閱,可以采用拉模式。但是(shi)(shi)不建(jian)議使用循(xun)環(huan)來(lai)替代推模式,這樣(yang)會(hui)嚴重影響RabbitMQ的(de)性能。

4.2.  消息確認和拒絕

消費者獲取消息后,可以(yi)設(she)置不回(hui)復確認(ren),自動(dong)回(hui)復確認(ren),以(yi)及手(shou)動(dong)回(hui)復確認(ren)三(san)種(zhong)模式。

1、不回(hui)復確認,隊列會默認發出去的消息都(dou)會被正確的處理,無需等待確認,這個方式吞吐最高,但(dan)是可靠性性最差。

2、自動回(hui)復(fu)確認,有(you)可能(neng)消息沒有(you)正確處理(li),也被回(hui)復(fu),隊(dui)列無法識別,導致消息丟失(shi)。

3、手動(dong)回復(fu)確認,可以根據業(ye)務(wu)(wu)處理的(de)邏輯,把握回復(fu)的(de)時機,會(hui)帶(dai)來一定的(de)業(ye)務(wu)(wu)復(fu)雜度,但是可靠性(xing)最好。

對于設置Qos,可以進(jin)行(xing)批量回復,無需(xu)單次回復,這(zhe)個(ge)也是提(ti)高(gao)吞(tun)吐的重要手段(duan)。

    消息(xi)(xi)者也可以拒絕消息(xi)(xi),比如消息(xi)(xi)解析(xi)錯誤(wu)等情況,通(tong)過(guo)設置(zhi)requeue,來告訴隊(dui)列是(shi)否需要(yao)重(zhong)新投遞還是(shi)丟棄該消息(xi)(xi)(如果配置(zhi)了死信隊(dui)列,那么丟失(shi)的消息(xi)(xi)會進入死信隊(dui)列,等待處(chu)理(li))。

  如果選擇了重新(xin)投遞(di),那么(me)消(xiao)費(fei)的(de)順序是無法得到保證的(de)。

4.3.  隊列處理

 

為(wei)(wei)了(le)提升消費的(de)速度,生產(chan)發布的(de)消息(xi)到(dao)(dao)達隊(dui)列后(hou),如(ru)(ru)果隊(dui)列為(wei)(wei)空(kong),且有(you)消息(xi)者(zhe)(zhe)等待消息(xi),則直(zhi)接(jie)發送給消費者(zhe)(zhe),異步(bu)放入內存或者(zhe)(zhe)磁盤,提升消費速度。這(zhe)種標(biao)記已(yi)投遞的(de)方式與Kafka的(de)標(biao)記offset比較,邏(luo)輯上處理簡單了(le),確保了(le)消息(xi)不(bu)會被重復消費,但是也(ye)(ye)(ye)導(dao)致消息(xi)無法回(hui)溯。隊(dui)列創(chuang)建時,可以設(she)置(zhi)exclusive屬性,如(ru)(ru)果這(zhe)是為(wei)(wei)true,表示排他隊(dui)列,也(ye)(ye)(ye)就是當時創(chuang)建隊(dui)列連(lian)接(jie)(包括所有(you)的(de)信道)的(de)客(ke)戶端可以消費,其他的(de)無法消息(xi)。通俗些,只有(you)你當前這(zhe)個程序(或進程)進行消費處理,不(bu)希望別(bie)的(de)客(ke)戶端讀取(qu)到(dao)(dao)這(zhe)個隊(dui)列,一般用(yong)在RPC模(mo)式。一旦(dan)連(lian)接(jie)中(zhong)斷,排他隊(dui)列也(ye)(ye)(ye)將刪(shan)除,無論(lun)是否設(she)置(zhi)為(wei)(wei)持久化隊(dui)列。

5.   集群

單點無法確保高(gao)可用,同時一臺I/O能力有限,無法滿足高(gao)吞吐,在企業級應用中,一般(ban)都會部署集群(qun),提供服務。

5.1.  集群拓撲

 

集群節(jie)點間(jian)呈網狀(zhuang)連接,節(jie)點間(jian)相互通(tong)訊,每個節(jie)點上保(bao)留(liu)所有的(de)元數據,包括:

1、交換機,交換機的名稱和屬性。

2、隊列(lie)(lie)元數據,隊列(lie)(lie)的的名稱和(he)屬性。

3、綁定關(guan)系,交(jiao)(jiao)(jiao)換機與交(jiao)(jiao)(jiao)換機以(yi)及交(jiao)(jiao)(jiao)換機與隊列的綁定關(guan)系元數據(ju)。

4、vhost,vhost內的隊列,交換(huan)機和綁定的命名空間(jian)以及安全屬性(xing)。

所有節(jie)點全量(liang)保(bao)留元數據,會有以下的影(ying)響:

1、當(dang)客(ke)戶端連接(jie)某(mou)個節(jie)點創建這些(xie)元數(shu)據的(de)(de)時候(比如隊列,交換機),需要同步到所(suo)有節(jie)點上,并等待(dai)所(suo)有的(de)(de)節(jie)點的(de)(de)完成(cheng)后,才答復成(cheng)功,所(suo)以(yi)會有一定(ding)的(de)(de)延(yan)遲。

2、客戶(hu)端連接其中某個節(jie)點,都(dou)能獲取(qu)到(dao)所有(you)的(de)元數據(這(zhe)點和Kafka類(lei)似)。

消(xiao)息(xi)內容會不會所有節(jie)(jie)點(dian)都(dou)備份呢?答案是否。因為這會造成(cheng)大(da)量的(de)空(kong)間浪(lang)費(fei)(fei)(鏡像隊(dui)列(lie)(lie)除外,后面介紹)。如果一個(ge)生產(chan)者(消(xiao)費(fei)(fei)者)連接到(dao)某個(ge)節(jie)(jie)點(dian)發布(消(xiao)費(fei)(fei))隊(dui)列(lie)(lie)消(xiao)息(xi),但該(gai)(gai)隊(dui)列(lie)(lie)不在該(gai)(gai)節(jie)(jie)點(dian)上,那需要通過該(gai)(gai)節(jie)(jie)點(dian)路由。如下:

 

     消息(xi)的(de)發布和消費鏈(lian)路加長,會導(dao)致延遲加長,吞吐量降低,為了減少(shao)影響(xiang),集群的(de)節(jie)點建議部署(shu)到(dao)在一個匯聚(ju)下,不要跨可用區部署(shu)(后面會介紹跨可用區的(de)方法)。我們希望(wang)客(ke)戶端對接的(de)節(jie)點上部署(shu)所需(xu)要的(de)隊(dui)列(lie),這個需(xu)要規劃得當。

 

5.2.  負載均衡

在多節(jie)點情況下,客(ke)戶端的請求通過負載均衡(heng),將流量均勻分攤(tan)到各節(jie)點,RabbitMQ集(ji)群可以通過HAProxy,LVS+keepalived等LB實(shi)現,如(ru)下圖所示(shi)。

5.3.  鏡像隊列

集群(qun)多(duo)節(jie)點(dian)能確保整(zheng)體(ti)服務的可(ke)(ke)用性,但是對于單個(ge)隊(dui)列(lie)來(lai)說,如果做不(bu)了(le)多(duo)節(jie)點(dian)部(bu)署,還是有(you)單節(jie)點(dian)故(gu)障的可(ke)(ke)能,ActiveMQ中采用主從模式保證了(le)高可(ke)(ke)用,在RabbitMQ中,也(ye)有(you)類似機制,稱之(zhi)為鏡像隊(dui)列(lie)(或者HA隊(dui)列(lie))。

(1)創建鏡像隊列

在(zai)創建隊(dui)列時,通過ha-mode,ha-params,ha-sync-mode來定義鏡像隊(dui)列個數,分布(bu),以(yi)及(ji)同步模式。

ha-mode,有(you)效值為,all,exactly,nodes。alls表示在(zai)所有(you)的(de)(de)節(jie)點上(shang)創(chuang)建(jian)鏡(jing)像隊列(lie),exactly表示指定(ding)個數(shu)的(de)(de)節(jie)點上(shang)創(chuang)建(jian),ha-params設置個數(shu);nodes指定(ding)在(zai)哪些節(jie)點上(shang)創(chuang)建(jian),ha-params指定(ding)節(jie)點名。

ha-sync-mode,有效值為automatic,manual。automatic表示(shi)新(xin)節(jie)點(dian)加(jia)入時(shi),默認自動同步鏡(jing)像隊(dui)列(lie)(lie)消息;manual表示(shi)新(xin)節(jie)點(dian)加(jia)入時(shi),不(bu)會自動同步鏡(jing)像隊(dui)列(lie)(lie)消息。因(yin)為同步操作會導致隊(dui)列(lie)(lie)的阻(zu)塞(sai),建議(yi)使用manual模式。

鏡像(xiang)隊列(lie)有一個master和多個slave組成(cheng)(cheng),創建(jian)完成(cheng)(cheng)后,直接連(lian)接的(de)(de)節(jie)點上的(de)(de)隊列(lie)為(wei)master,其他的(de)(de)為(wei)slave。

(2)發布消息

 

可以認(ren)為有(you)個隱藏的(de)fanout交換機(ji),向(xiang)所有(you)的(de)鏡(jing)像隊(dui)列進行廣播(bo)(這里與Kafka不(bu)大一(yi)樣(yang),它是通過follower向(xiang)master請求(qiu)同(tong)步內容)。當(dang)所有(you)的(de)鏡(jing)像隊(dui)列確認(ren)完成后(hou),才向(xiang)發布者回復publish-comfirm(所以鏡(jing)像個數不(bu)能太(tai)多(duo),否則影(ying)響(xiang)發布吞吐量,一(yi)般2-3個為宜),這樣(yang)能確保所有(you)的(de)鏡(jing)像隊(dui)列的(de)消息都是同(tong)步的(de)。

(3)消費

除了發布(bu)需要向master和(he)slave同(tong)時投(tou)遞(di)消(xiao)息(xi),其他的(de)(de)(de)都(dou)是(shi)由master負責向slave同(tong)步(bu),包(bao)括(kuo)消(xiao)費(fei),ack等,如果(guo)消(xiao)息(xi)者(zhe)(zhe)(zhe)連接(jie)master隊(dui)(dui)列(lie)所(suo)在(zai)的(de)(de)(de)節(jie)(jie)(jie)點(dian)(如消(xiao)費(fei)者(zhe)(zhe)(zhe)2),則(ze)消(xiao)息(xi)隊(dui)(dui)列(lie)信(xin)息(xi)即(ji)可;如果(guo)消(xiao)費(fei)者(zhe)(zhe)(zhe)連接(jie)的(de)(de)(de)是(shi)slave隊(dui)(dui)列(lie)所(suo)在(zai)的(de)(de)(de)節(jie)(jie)(jie)點(dian)(如消(xiao)費(fei)者(zhe)(zhe)(zhe)1),slave節(jie)(jie)(jie)點(dian)需要將(jiang)消(xiao)費(fei)指(zhi)令發送(song)給master節(jie)(jie)(jie)點(dian),master節(jie)(jie)(jie)點(dian)將(jiang)數據準(zhun)備好,發送(song)給到(dao)slave節(jie)(jie)(jie)點(dian),再投(tou)遞(di)給消(xiao)費(fei)者(zhe)(zhe)(zhe)。slave隊(dui)(dui)列(lie)為何不類似(si)于mysql提供讀服務(wu)呢,這(zhe)個(ge)和(he)Kafka的(de)(de)(de)設計(ji)類似(si),RabbitMQ的(de)(de)(de)負載(zai)粒度在(zai)隊(dui)(dui)列(lie)上,而不是(shi)整個(ge)節(jie)(jie)(jie)點(dian),只需要將(jiang)master隊(dui)(dui)列(lie)均衡(heng)(heng)分布(bu),就是(shi)平衡(heng)(heng)整個(ge)節(jie)(jie)(jie)點(dian)的(de)(de)(de)壓力(li)。

 

如(ru)圖,節(jie)點(dian)1,節(jie)點(dian)2,節(jie)點(dian)3分(fen)別分(fen)擔隊(dui)(dui)列1,隊(dui)(dui)列2,隊(dui)(dui)列3的壓力。

(4)失效轉移

當(dang)(dang)slave所(suo)在節點掛掉后,除了與(yu)slave相連(lian)的客戶端全部(bu)斷開(kai)連(lian)接,其他的沒有影響。但是當(dang)(dang)master所(suo)在的節點宕(dang)機后,就會產生連(lian)鎖(suo)反應。

1)與(yu)master節(jie)點所有的客戶端連(lian)接(jie)斷開。

2)選舉最(zui)老的(de)slave節點(dian)為master節點(dian),因為最(zui)老的(de)slave與master之間的(de)同步狀態是(shi)最(zui)好的(de),但是(shi)也存在(zai)未同步的(de)信息丟失。

3)新的(de)master節點重(zhong)續入隊所有(you)unack信息(xi)(xi)(xi)。消費(fei)者獲取信息(xi)(xi)(xi)進行消費(fei),還(huan)沒來得及(ji)ack,或者ack信息(xi)(xi)(xi)沒有(you)同步到新的(de)master上(shang),新master無法確(que)認這部分(fen)消息(xi)(xi)(xi)是否被正確(que)消費(fei),為了安全起(qi)見,所有(you)的(de)unack都重(zhong)新入隊,此時客戶端會有(you)重(zhong)復消費(fei)的(de)可能。

4)如果消費端直連master所在節點(如上圖中的消費者2),master節點宕機后,TCP連接斷開,重入附加并監聽新的master節點;如果是連接slave所在節點(如上圖中的消費者1),就無法感知master節點宕機了,只認為隊列沒有消息。此時,在basicComsume消費時需要指定x-cancel-on-ha-failover參數,監聽master節點斷開的通知,接受到通知后重入附加并監聽新的master。這點非常重要,否則會導致消息大量積壓,但是消息端又無消息消費的問題。這個(ge)問題(ti)在(zai)(zai)Kafka中是不(bu)存在(zai)(zai)的,分(fen)區(qu)選舉(ju)的結果保存在(zai)(zai)所(suo)有的節點(dian)上,客戶端(duan)通過元數據更新,獲取最(zui)新分(fen)區(qu)leader信息(xi)。

6.   跨集群

現在大型的(de)網(wang)站系統,為了實現異地(di)容災,一般采用多機房或者混合云部署,以下(xia)(xia)分析單(dan)個集群在這(zhe)種場景下(xia)(xia)的(de)集中方(fang)案(an)

1僅部署一個機房

 

單個(ge)集群僅部署(shu)在北京機房(fang),南京機房(fang)的發布者和消(xiao)費者跨機房(fang)訪(fang)問。這個(ge)方案有以下幾(ji)個(ge)問題。

(1)無法做到容災,一旦北京機房掛了(le),整個集群不可用。

(2)降(jiang)低吞吐(tu)量(liang),南京(jing)機房的客戶(hu)端發(fa)送請(qing)求后(hou),會阻(zu)塞住,直到節點回(hui)復(fu)確(que)認,由于兩個機房的存在延遲(chi)(假設北京(jing)到南京(jing)機房的RTT在30ms),導致每次請(qing)求時間增加,降(jiang)低了吞吐(tu)量(liang)。

2、延展機房部署

 

單(dan)個(ge)集群(qun)跨機房(fang)部(bu)署(shu),節點(dian)1,節點(dian)2部(bu)署(shu)到(dao)北京機房(fang),節點(dian)3部(bu)署(shu)到(dao)南京機房(fang)。這個(ge)方(fang)案通過鏡像(xiang)隊列在節點(dian)1,節點(dian)2與節點(dian)3間互備(bei),能做(zuo)到(dao)部(bu)分(fen)的容災。但是也有如下問題:

(1)跨(kua)機房(fang)(fang)請求,部分(fen)消費者(zhe)和發(fa)布者(zhe)還是需(xu)要(yao)跨(kua)機房(fang)(fang)請求,與(yu)方(fang)案(an)1類似,導致請求時間增加,降低(di)了吞吐(tu)量。

(2)腦裂(lie),異(yi)地間的網(wang)絡環(huan)境復雜,網(wang)絡波動(dong)會(hui)導(dao)致分區,進而"腦裂(lie)",單個集群是無法做到跨可用區的。

單個集(ji)群(qun)是無法滿足(zu)跨機(ji)房(fang)場景,需要(yao)采用多集(ji)群(qun)部署解決這(zhe)個問題(ti),跨集(ji)群(qun)間的"橋(qiao)接"可以通過Federation,Shovel兩(liang)種方式。

6.1.  Federation(聯邦)

Federation,可(ke)以翻譯為"聯(lian)(lian)邦(bang)"。Federation 可(ke)以通過AMQP 協議讓原(yuan)本發(fa)(fa)送到(dao)某(mou)個Broker(或集群)中(zhong)的(de)(de)(de)交(jiao)換(huan)器(或隊(dui)列)上的(de)(de)(de)消息能夠轉發(fa)(fa)到(dao)另一個Broker(或集群)中(zhong)的(de)(de)(de)交(jiao)換(huan)器(或隊(dui)列)上,兩(liang)方(fang)的(de)(de)(de)交(jiao)換(huan)器(或隊(dui)列〉看起(qi)來是(shi)以一種"聯(lian)(lian)邦(bang)"的(de)(de)(de)形式(shi)在運(yun)作(zuo)。又分為聯(lian)(lian)邦(bang)交(jiao)換(huan)機和(he)聯(lian)(lian)邦(bang)隊(dui)列兩(liang)種模式(shi)。

1、聯邦交換機

 

假(jia)設發(fa)布者1位于北京(jing)機(ji)房(fang)(fang),需(xu)要發(fa)布消(xiao)(xiao)息到(dao)南京(jing)機(ji)房(fang)(fang)的(de)(de)(de)Broker2節點(dian)上,首先在Broker2節點(dian)的(de)(de)(de)ExchangeA上建(jian)立(li)(li)到(dao)Broker1節點(dian)的(de)(de)(de)federation link,Broker1上會(hui)建(jian)立(li)(li)一(yi)個同(tong)名的(de)(de)(de)交互機(ji)ExchangeA,同(tong)時建(jian)立(li)(li)一(yi)個內部的(de)(de)(de)交換(huan)機(ji)ExchangeA->Broker2,   并(bing)通過路由(you)鍵"rkA"將這(zhe)兩個交換(huan)器(qi)綁(bang)定(ding)(ding),同(tong)時,還會(hui)創建(jian)一(yi)個Exchange->Broker2隊(dui)列,并(bing)與Exchange->Broker2交換(huan)機(ji)綁(bang)定(ding)(ding),Federation插件會(hui)在Broker1的(de)(de)(de)ExchangeA->Broker2隊(dui)列與Broker2的(de)(de)(de)ExchangeA建(jian)立(li)(li)AMQP連接,實時消(xiao)(xiao)費Exchange->Broker2隊(dui)列的(de)(de)(de)消(xiao)(xiao)息。發(fa)布者1僅(jin)需(xu)要把消(xiao)(xiao)息發(fa)送了ExchangeA上,保存(cun)到(dao)隊(dui)列ExchangeA->Broker2上即可,剩下的(de)(de)(de)時就交由(you)Federation插件搞定(ding)(ding)。

2、聯邦隊列

聯邦交(jiao)(jiao)換(huan)機(ji)(ji)是在兩個(ge)節點的交(jiao)(jiao)換(huan)機(ji)(ji)間(jian)建立(li)連接(jie),聯邦隊(dui)列就(jiu)是在兩個(ge)隊(dui)列間(jian)建立(li)連接(jie)

 

聯邦(bang)隊(dui)列(lie)(lie)(lie)可以(yi)看(kan)做(zuo)互為擴展隊(dui)列(lie)(lie)(lie),如圖中的Queue1與(yu)Queue3建立聯邦(bang)隊(dui)列(lie)(lie)(lie),當Queue3有消息(xi)堆積,消費(fei)者1優先(xian)將Queue3的消費(fei)完,此時則(ze)會(hui)從Queue1拉(la)取(qu)消息(xi);反(fan)之亦(yi)然,如果Queue1的隊(dui)列(lie)(lie)(lie)消費(fei)完成,將會(hui)從Queue3中拉(la)取(qu)消息(xi),消費(fei)者1繼續消息(xi),所以(yi)Queue1與(yu)Queue3間的消息(xi)是(shi)可以(yi)"漂移(yi)"的。聯邦(bang)隊(dui)列(lie)(lie)(lie)會(hui)讓消費(fei)能力強的一側多消費(fei)些,確保隊(dui)列(lie)(lie)(lie)間的平衡。聯邦(bang)交換機的消息(xi)流向(xiang)(xiang)是(shi)單向(xiang)(xiang)的,聯邦(bang)隊(dui)列(lie)(lie)(lie)消息(xi)流向(xiang)(xiang)是(shi)雙向(xiang)(xiang)。

6.2.  Shovel

 

以(yi)Broker1的(de)(de)(de)(de)Queue為源,Broker2的(de)(de)(de)(de)Exchange為目標,建立shovel link,發布者發送的(de)(de)(de)(de)Msg1消(xiao)息,存入(ru)queue,Shovel消(xiao)費后存入(ru)Broker2的(de)(de)(de)(de)queue。當Broker1的(de)(de)(de)(de)queue產(chan)生消(xiao)息堆積時,通過shovel link轉移消(xiao)息到(dao)其他集(ji)群進行消(xiao)息,減少broke1的(de)(de)(de)(de)壓力。

7.   可靠性

可靠性(xing)是衡量消(xiao)息組(zu)件的(de)重要(yao)因素,但可靠性(xing)都是相對的(de),沒有任何(he)組(zu)件確(que)保百(bai)分百(bai)可靠。我(wo)們看(kan)下RabbitMQ有哪些(xie)措施(shi)保證高可靠,將整個消(xiao)息的(de)生命周期分為四個階段來逐一分析

 

7.1.  第一階段:消息投遞

前面介紹了生產(chan)者確(que)認的(de)機(ji)制(zhi),通(tong)過對(dui)信(xin)道(dao)設置(zhi)comfirm,對(dui)于(yu)每(mei)條投遞(di)的(de)消(xiao)息,都(dou)會(hui)返回確(que)認信(xin)息。除此之外(wai),生產(chan)者還可以(yi)通(tong)過事務機(ji)制(zhi)確(que)保消(xiao)息投遞(di)的(de)可靠性。

  • 事務機制

生(sheng)產者(zhe)通過二(er)階段提交方式,確報批量消息的事務一致性。

 

1、客戶端(duan)發送Tx.select,將信道置為(wei)事(shi)務(wu)模式(shi)。

2、Broker回復Tx.selectOk,確認已將信道置(zhi)為(wei)事務模式(shi)。

3、客戶(hu)端發(fa)送批量消息(xi)。

4、根據結(jie)果(guo)(捕獲(huo)異常),客(ke)戶(hu)端確定(ding)是否提交還是回滾。

5、Broker確認(ren)提交或者回滾指令。

事務機制能確保批量消息投遞的一致性,由于其同步操作,導致性能大大下降。事務與生產者確認機制是互斥的,一般情況下我們采用輕量級的生產者確認機制

7.2.  第二階段:消息路由

1、mandatory

當消息(xi)發送到交換機,有可(ke)能(neng)由于路由配置(zhi)錯誤,導致(zhi)消息(xi)無法正確投遞。此時,有兩種處理方式,一種是直(zhi)接丟棄,一種是反饋(kui)給生產者。第一章節(jie)在介紹調(diao)用(yong)publish的時候(hou),說到mandatory參(can)數,該(gai)參(can)數如果設(she)(she)置(zhi)為(wei)true,會回(hui)調(diao)生產端(duan)的監聽接口,返(fan)回(hui) 消息(xi);如果設(she)(she)置(zhi)為(wei)false,則直(zhi)接丟棄。顯然設(she)(she)置(zhi)為(wei)true,將提(ti)升(sheng)可(ke)靠性。

 

2、備份交換機

保留路由(you)(you)錯(cuo)誤的消息(xi),可以設置(zhi)備份(fen)交(jiao)換(huan)機,路由(you)(you)失敗的消息(xi)會丟到備份(fen)交(jiao)換(huan)機,保存到特定的隊(dui)列。通過該隊(dui)列監控(kong)路由(you)(you)錯(cuo)誤的消息(xi)。

 

7.3.  第三階段:消息存儲

消息存(cun)儲階段,介紹(shao)了(le)以下幾種可靠性(xing)機(ji)制

1、持久化,隊列和消息(xi)設置為(wei)持久化,寫入到磁(ci)盤,確(que)保Broker宕(dang)機重(zhong)啟后,消息(xi)不丟失(shi)。但(dan)是消息(xi)先寫入頁(ye)面緩(huan)存(cun),再批(pi)量寫入磁(ci)盤,如果在(zai)這期(qi)間宕(dang)機,還是會存(cun)在(zai)丟失(shi)風險。

2、鏡像(xiang)隊(dui)列(lie),采用(yong)msater-slave多副本機制,最大程度確保可用(yong)性。

3、死信隊列

死信(xin)隊(dui)列(lie)(lie)是一(yi)種特殊隊(dui)列(lie)(lie),存儲死信(xin)消(xiao)息(xi)(xi),有以下幾種情況下的消(xiao)息(xi)(xi)會(hui)變成死信(xin)消(xiao)息(xi)(xi)。任何隊(dui)列(lie)(lie)都(dou)可以設置一(yi)個死信(xin)交換機,將(jiang)符(fu)合條件的消(xiao)息(xi)(xi)路由到死信(xin)隊(dui)列(lie)(lie)。死信(xin)隊(dui)列(lie)(lie)也(ye)是一(yi)個普通隊(dui)列(lie)(lie),里面(mian)的消(xiao)息(xi)(xi)也(ye)可被訂閱。監控死信(xin)隊(dui)列(lie)(lie),及時處(chu)理死信(xin)消(xiao)息(xi)(xi),確保消(xiao)息(xi)(xi)的可靠性。

1)消(xiao)息被拒絕(Basic.reject,并設置(zhi)request為false)

2)消息過期(超過expiration時(shi)間)

3)隊(dui)列達到(dao)最(zui)大長度。

7.4.  第四階段:消息消費

消(xiao)(xiao)費(fei)(fei)者獲取消(xiao)(xiao)息后,通過手動確(que)認,能最(zui)大程度消(xiao)(xiao)息消(xiao)(xiao)費(fei)(fei)的可靠(kao)性,也可以(yi)拒(ju)絕(jue)消(xiao)(xiao)息,并設置(zhi)request為true,該消(xiao)(xiao)息會重新入隊。

8.   高吞吐

高可用,高吞吐在有些場(chang)景中(zhong)是互斥的,所以需要(yao)根據自身(shen)的業(ye)務(wu)進行抉擇,以下分析提高吞吐率的一(yi)些措施。

1、生產者不設置信道確(que)認,能大幅提高吞吐量,但是可靠性(xing)較差,可以用(yong)在允許部(bu)分消息丟失的場景(jing),比(bi)如統計用(yong)戶點擊(ji)量。

2、按照(zhao)業務規則(ze)將隊列拆(chai)分多個,分布到不同的(de)節點(dian)上。

3、一(yi)個節點(dian)上設置(zhi)合理的(de)(de)隊(dui)列個數,一(yi)個隊(dui)列對(dui)應一(yi)個線程,在一(yi)個多核的(de)(de)節點(dian),使用多個隊(dui)列與消費者可以獲得更好的(de)(de)吞吐量,將(jiang)(jiang)隊(dui)列數量設置(zhi)為等(deng)于服務器cpu核數將(jiang)(jiang)獲得最(zui)佳(jia)吞吐量。

4、限制(zhi)隊列(lie)的大小,設置TTL或(huo)者max-length限制(zhi)隊列(lie)大小。

5、自動刪(shan)除(chu)不需要的(de)隊列(lie),設置隊列(lie)的(de)TTL,配置auto-delete以及排它隊列(lie),刪(shan)除(chu)無用的(de)隊列(lie)。

6、設置盡可能大(da)的預取個數(shu),不(bu)過(guo)這個是需要綜(zong)合網絡帶寬,客戶端緩存(cun),消息處理速度等(deng)各方面的因素評估(gu)。

 

0條評論
作者已關閉評論
莫****過
7文章(zhang)數
0粉絲數
莫****過
7 文章(zhang) | 0 粉絲
莫****過
7文章數
0粉絲數
莫****過
7 文章 | 0 粉絲
原創(chuang)

消息選型-Rabbitmq篇

2023-09-21 10:46:56
18
0

 

1.   架構

我(wo)們先來(lai)看下整體架(jia)構。

 

Producer,消(xiao)息生產(chan)者,即消(xiao)息的(de)發(fa)布方, 生產(chan)者生產(chan)的(de)消(xiao)息,首(shou)先發(fa)布到指(zhi)定(ding)的(de)交換機上(shang),交換機通過路由鍵(RoutingKey)的(de)匹配,選擇對應的(de)隊(dui)列(lie)進(jin)行投遞。消(xiao)息者訂閱隊(dui)列(lie),消(xiao)費隊(dui)列(lie)的(de)消(xiao)息。

Connectiton,客(ke)戶(hu)端(duan)與(yu)Broker間的TCP連接。

Channel,信道(dao),每個連接采用(yong)多路復用(yong),包含多個信道(dao)。producer與Broker間采用(yong)信道(dao)傳遞數據。

Broker,Rabbit服(fu)務(wu)節(jie)點。

Vhost,虛機(ji)(ji)機(ji)(ji),一(yi)個(ge)節點下包(bao)含多(duo)個(ge)vhost,vhost間的exchange,queue相(xiang)互(hu)隔(ge)離。就好比一(yi)臺物理機(ji)(ji)上(Broker)部署多(duo)臺虛機(ji)(ji)(vhost),虛擬機(ji)(ji)采用不(bu)同的用戶名密碼登(deng)錄(lu),實現(xian)多(duo)租戶。

Exchange,交(jiao)換機,消息首先會傳(chuan)遞到交(jiao)換機,由(you)交(jiao)換機匹配(pei)路由(you)鍵(jian)(RoutingKey)決定(ding)投遞到哪個queue。類(lei)比郵政(zheng)局。

Queue,隊列,存儲消息的數據結構。類比小區的快遞(di)柜。

Binding,綁(bang)定(ding),交(jiao)換機與隊列(lie)間通過路(lu)由鍵(RoutingKey)進行綁(bang)定(ding)起來(lai)。RoutingKey類(lei)比目的地址。

Consumer,消費者,即消息的接受方。

2.   消息發布

生產者創建和啟動信(xin)道連接,聲明(ming)交換機(ji)機(ji),隊列,路由鍵等信(xin)息,消息封裝成幀(zhen),通過信(xin)道發(fa)送(song)到Broker。

 

2.1.  創建連接和信道

 一個(ge)(ge)應用(yong)(yong)程序(生產者)與RabbitMQ服(fu)務(wu)節點(dian),維護一個(ge)(ge)TCP連接(Connect),在這(zhe)個(ge)(ge)連接中(zhong)可以創建多(duo)個(ge)(ge)信道(dao)用(yong)(yong)于信息的(de)(de)傳遞,各(ge)個(ge)(ge)信道(dao)間相互隔離。這(zhe)樣(yang)做的(de)(de)目的(de)(de)就(jiu)是避免在多(duo)線程的(de)(de)情況下,頻繁(fan)開啟和(he)關閉多(duo)個(ge)(ge)TCP連接,帶來的(de)(de)性能消耗(每個(ge)(ge)TCP經歷(li)三次(ci)(ci)握手,四次(ci)(ci)揮手),實際就(jiu)是TCP的(de)(de)多(duo)路復用(yong)(yong),類似HTTP2.0的(de)(de)原理。

Connection connection = factory.newConnection() ; //創建連接
Channel channel = connection.createChannel() ; //創建信道

生產者(zhe)與Broker之間(jian)完成了8次交互(hu),分別為Connect的(de)啟動,調整,打開(kai)以及Channel的(de)開(kai)啟,最終打開(kai)信道。

 

后續所有與Broker通訊都是基于該信道,從(cong)代碼(ma)層面(mian)看,都是基于channel對象操作。

2.2.  創建交換機,隊列

 

生產者將消息發送到交換機(ji),交換機(ji)根(gen)據路由鍵(jian),投遞(di)到對(dui)應的(de)(de)目的(de)(de)隊(dui)列保存。這(zhe)個過程和郵(you)件(jian)信(xin)息非常(chang)類似,寄件(jian)人將信(xin)件(jian)送到郵(you)政(zheng)局,郵(you)政(zheng)局根(gen)據信(xin)件(jian)的(de)(de)地(di)址投遞(di)到目的(de)(de)地(di)的(de)(de)郵(you)箱中。

 RabbitMQ提(ti)供了通(tong)過管理平(ping)臺或(huo)者(zhe)命名行預(yu)先創建(jian)交(jiao)換機(ji)和隊列。采用(yong)(yong)預(yu)先定義還是客戶端(duan)創建(jian),就看業務的需要(yao),比如(ru)我(wo)們對交(jiao)互機(ji)以及隊列預(yu)先做了充(chong)分的規劃,那就采用(yong)(yong)預(yu)先創建(jian);如(ru)果需要(yao)創建(jian)一些臨時的交(jiao)換機(ji)或(huo)者(zhe)隊列,客戶端(duan)創建(jian)更適合。

要(yao)想創建(jian)一(yi)個(ge)可用(yong)的(de)交互機和隊列,需要(yao)完成以下步驟

(1)創建交換機

以Java客戶端為例,調用exchangeDeclare方法。

exchangeDeclare(String exchange ,String type , boolean durable ,boolean autoDelete , boolean internal ,Map<String, Object> arguments)

該方(fang)法有很多重(zhong)載(zai)的(de)方(fang)法,我(wo)們(men)不一(yi)一(yi)介紹(shao)(shao),只介紹(shao)(shao)下幾個(ge)重(zhong)要的(de)參數

exchange,定義該(gai)交換器(qi)的名稱

type,定義該交換器的(de)路由類(lei)型,分類(lei)fanout,direct,topic,headers等(后面我們重(zhong)點介(jie)紹)

autoDelete,是(shi)否自動刪除,當設置為true,所有與這個交換器綁(bang)(bang)定的隊列(lie)或者交換器都與此解(jie)綁(bang)(bang),就會自動刪除。

(2)創建隊列

以Java客戶端為例(li),調用(yong)exchangeDeclare方法。

queueDeclare (String queue , boolean durable , boolean exclusive,boolean autoDelete, Map<String,Object> arguments)

其中重要的參(can)數(shu)如下(xia):

queue,隊列名稱

durable,隊列(lie)是&middot;否(fou)持久化(注意,與消(xiao)息是否(fou)持久化不是一回(hui)事,后面再介紹)

exclusive,是否排他(后面介紹)

autoDelete,是否自(zi)動刪除(chu),當設置(zhi)為true,所有與這個隊列連接的(de)消費者都斷(duan)開時(shi),才會自(zi)動刪除(chu)。

(3)綁定

交(jiao)(jiao)(jiao)換(huan)(huan)機(ji)和(he)隊列(lie)創(chuang)建好(hao)了,接(jie)下來(lai)就(jiu)需(xu)要將隊列(lie)綁(bang)定到交(jiao)(jiao)(jiao)換(huan)(huan)機(ji)上,告(gao)訴交(jiao)(jiao)(jiao)換(huan)(huan)機(ji),什么樣的消息需(xu)要投遞給該交(jiao)(jiao)(jiao)換(huan)(huan)機(ji)。

queueBind(String queue , String exchange , String routingKey)

queue,待綁定的隊列名。

exchange,需(xu)要綁(bang)定到(dao)的交換(huan)機。

routingKey,用來(lai)綁(bang)定(ding)隊列和交換(huan)機的路由鍵(jian)(有些地方為(wei)了區(qu)別發(fa)送的時候(hou)的路由鍵(jian),將綁(bang)定(ding)時候(hou)的鍵(jian)稱之為(wei)綁(bang)定(ding)鍵(jian)Bindkey)

如果交換機和隊列已經存在,再(zai)次聲(sheng)明,Broker不(bu)會重新創建,直接返回成功。

2.3.  創建消息

RabbitMQ的消(xiao)息(xi)(xi)分為消(xiao)息(xi)(xi)頭與消(xiao)息(xi)(xi)體,類似(si)http協議的消(xiao)息(xi)(xi)結構。

消息頭(tou)定了消息的(de)相關屬性,主要的(de)有(you)以下:

content-type,傳輸消息體的MIMEl類型,比如(ru):application/json

content-encoding,消(xiao)息體的編(bian)碼(ma),如gzip

expiration,消息的過期時間

delivery-mode,消(xiao)息是(shi)否持久化(hua)到磁盤,1表示非持久化(hua),2表示持久化(hua)。

priority,消(xiao)息的優先級(ji),數值越(yue)大表示(shi)該消(xiao)息的優先級(ji)越(yue)高,優先消(xiao)費。

消息體主要是(shi)用戶自定的消息,可以是(shi)json,也可以是(shi)xml,通過序列化成二(er)進制(zhi)數據(ju)。

以Java客戶端為例,

//創建消息頭

Map<String , Object> headers = new HashMap<String , Object>() ;

headers.put( "&nbsp;localtion" , "here " );

headers . put( " time " , " today ;" );

//創建消息

byte[] messageBodyBytes = "H ello , world! ". getBytes();

2.4.  發布消息

 消息頭和消息體都創建完成,接下來就是向指定的交換機發布消息,聲明routingKey,告知交換機投遞到那個隊列上。
以(yi)Java客戶端為例,

channe1 .basi cPublish(exchangeName,routingKey,mandatory,immediate,

&nbsp;               new AMQP.BasicProperti es.Buil der ()

               ;   .headers(headers)

                &nbsp;.build()) ,

                 ;messageBodyBytes) ;

exchangeName,交換機的名稱

routingKey,路由鍵

mandatoryimmediate,這兩個(ge)參數(shu)后面(mian)再介紹。

客(ke)戶端會(hui)將消(xiao)息(xi)封裝方法幀(zhen)(zhen),消(xiao)息(xi)頭(tou)幀(zhen)(zhen),消(xiao)息(xi)體幀(zhen)(zhen)。

 

每(mei)個消(xiao)息(xi)體幀(zhen)最大(da)是(shi)131Kb,如果(guo)超(chao)過需(xu)要(yao)分割成多個。所以(yi)一(yi)個完整消(xiao)息(xi),至少需(xu)要(yao)三(san)個幀(zhen)(方法(fa)幀(zhen),消(xiao)息(xi)頭幀(zhen),消(xiao)息(xi)體幀(zhen))。

 

2.5.  消息響應

當(dang)生產(chan)者(zhe)將消(xiao)息發送到(dao)交換機后,交互機是(shi)否將消(xiao)息正確(que)路由,并存入隊列呢?對于可靠性(xing)要(yao)求高的系(xi)統,這個信息對于生產(chan)者(zhe)很(hen)重要(yao)。RabbitMQ提供了兩(liang)種方式(shi),事務(wu)和生產(chan)確(que)認(ren)機制(zhi)(pulisher comfirm)。這里我(wo)們介(jie)紹下生產(chan)者(zhe)確(que)認(ren)機制(zhi),事務(wu)放到(dao)后面(mian)介(jie)紹。

     生產者(zhe)將信(xin)(xin)道設(she)置成comfirm模式,所有通過(guo)該信(xin)(xin)道發送(song)的(de)消(xiao)息(xi)都會被(bei)分配一(yi)個唯(wei)一(yi)的(de)ID,一(yi)旦消(xiao)息(xi)被(bei)投(tou)送(song)到正(zheng)確(que)的(de)隊列后,將會返回確(que)認(ren)信(xin)(xin)息(xi)(Basic.Ack),生產者(zhe)接受(shou)信(xin)(xin)息(xi)后確(que)認(ren)消(xiao)息(xi)已發布成功;當然,如果投(tou)送(song)失敗,會返回Basic.Nack信(xin)(xin)息(xi),生產者(zhe)根據業務(wu)場景判斷是(shi)否需要(yao)重發。

   

確(que)認信(xin)息是(shi)(shi)通過異步回調返回,不會(hui)阻塞生產(chan)者發送其(qi)他消(xiao)息,這種模式(shi)性能較(jiao)高(gao),但是(shi)(shi)在重試的情況下,無(wu)法確(que)保(bao)消(xiao)息的順序寫(xie)入。     

3.   消息路由與存儲

3.1.  消息路由

消息(xi)發送(song)到節點的交換(huan)機(ji)后,交換(huan)機(ji)需要通過路由(you)鍵(jian)投遞(di)到對應的隊列上,前面介紹(shao)過交換(huan)機(ji)有四種路由(you)類型,分別為direct,fanout,topic,headers。

(1)direct

任(ren)何綁定(ding)(ding)在交換機的(de)(de)(de)隊列(lie),只要(yao)它的(de)(de)(de)路(lu)由鍵和發(fa)布消息的(de)(de)(de)路(lu)郵建一致,就能收到(dao)消息。一般(ban)用于需要(yao)將(jiang)一個消息投(tou)遞到(dao)一個或者(zhe)多個確定(ding)(ding)的(de)(de)(de)目(mu)標隊列(lie)。

 

(2)fanout

所(suo)(suo)有發往fanout交換機的(de)消息被投遞到所(suo)(suo)有綁定到該(gai)交換機的(de)隊列中,這(zhe)個應用于"廣播(bo)"模(mo)(mo)式,該(gai)模(mo)(mo)式下,路(lu)由鍵不(bu)起作用。

 

(3)topic

     采(cai)用句(ju)點分隔(ge)的(de)(de)形(xing)式,隊(dui)(dui)列可以通過(guo)使用基于通配符(*和(he)#)的(de)(de)模式匹配的(de)(de)方式來綁定(ding)到路由(you)鍵(jian) 上,發送的(de)(de)消息攜帶的(de)(de)路由(you)鍵(jian)匹配上就(jiu)會投遞到該隊(dui)(dui)列。

 

 

 4headers

在隊(dui)列(lie)綁定(ding)時,定(ding)義匹配headers的參數(shu)的值,并設置(zhi)(zhi)x-match參數(shu)。發布消息(xi)時,定(ding)義header的屬性值,當(dang)x-match設置(zhi)(zhi)為all表示所有的消息(xi)的header屬性值都匹配才能路(lu)由(you)到隊(dui)列(lie),設置(zhi)(zhi)為any表示,任何一個header的屬性值匹配就可以路(lu)由(you)到隊(dui)列(lie)。

headers類型的(de)靈活性以(yi)及效(xiao)率都要(yao)比其他的(de)類型差,所以(yi)用的(de)比較(jiao)少。

3.2.  消息存儲

隊列其實還是個(ge)邏輯概念,它(ta)包(bao)含(han)rabbit_amqqueue_process(隊列進程)和(he)backing_queue(維護(hu)5個(ge)狀態棧)。通過rabbit_amqqueue_process負責接受生(sheng)產者發布的消(xiao)息(xi),向消(xiao)費者交付消(xiao)息(xi),處理消(xiao)息(xi)的確認等(deng);backing_queue是消(xiao)息(xi)存儲的具體形(xing)式和(he)引擎,并向rabbit_amqqueue_process提(ti)供相(xiang)關的接口。

delivery-mode參數,1表示非持(chi)(chi)久(jiu)(jiu)化(hua)(hua),2表示持(chi)(chi)久(jiu)(jiu)化(hua)(hua)。非持(chi)(chi)久(jiu)(jiu)化(hua)(hua)的消(xiao)息(xi)置(zhi)于內(nei)(nei)存,當內(nei)(nei)存達(da)到設定閥值(vm memory high watermark paging ratio ),逐(zhu)步寫(xie)入磁盤(pan);持(chi)(chi)久(jiu)(jiu)化(hua)(hua)消(xiao)息(xi)則(ze)直接(jie)寫(xie)入磁盤(pan)。磁盤(pan)的寫(xie)入速度要(yao)(yao)遠小于內(nei)(nei)存(都是順(shun)序寫(xie)入的前提下),并(bing)且需要(yao)(yao)等待(dai)落(luo)盤(pan)后才返回生產者確(que)認信(xin)息(xi),所以持(chi)(chi)久(jiu)(jiu)化(hua)(hua)模式的吞吐量要(yao)(yao)低于非持(chi)(chi)久(jiu)(jiu)化(hua)(hua)模式,但(dan)是可(ke)靠(kao)性(xing)強。

對于非持久化需要注意一點(dian),如(ru)果大量的消息堆積內存(cun),來不及(ji)進行(xing)落(luo)盤釋(shi)放(fang)內存(cun),一旦達到內存(cun)的告(gao)警(jing)閥值(vffi memory high watermark),就會(hui)產生(sheng)告(gao)警(jing),為確保節點(dian)的可(ke)用性,會(hui)阻塞所有生(sheng)產者的連接,直到內存(cun)恢復到正常狀態,這種(zhong)情況下,會(hui)嚴重影響吞吐(tu)的。

持久化層分為兩部分,隊列索引(rabbit_queue_index)和消息存儲(rabbit_msg_store)。隊列索引負責維護隊列中落盤消息的信息,包 括消息的存儲地點、是否己被交付給消費者、是否己被消費者 ack 等,每個隊列都有一個對應的隊列索引,以".idx"為文件后綴。消息存儲以鍵值隊的形式存儲消息,被所有的隊列共享,每(mei)個節點(dian)有(you)且只有(you)一個,以".rdq"為文(wen)件后綴(zhui),順序寫(xie)入。

除(chu)了消(xiao)息可(ke)以(yi)設置持(chi)久(jiu)化(hua)(hua),交換機和隊列(lie)也可(ke)以(yi)設置持(chi)久(jiu)化(hua)(hua),三者需要(yao)同時設置,才(cai)能確保真正的持(chi)久(jiu)化(hua)(hua)。如(ru)果隊列(lie)沒(mei)有(you)設置持(chi)久(jiu)化(hua)(hua),消(xiao)息是(shi)持(chi)久(jiu)化(hua)(hua)的,那(nei)么就會導致(zhi)隊列(lie)刪(shan)除(chu)了,但是(shi)消(xiao)息還在,無法消(xiao)費。

&nbsp;  ;  創建消息的時(shi)(shi)候,可以通過(guo)消息頭的expiration設置消息的過(guo)期時(shi)(shi)間,如果消息到達了過(guo)期時(shi)(shi)間,沒有被消息,將(jiang)會被投(tou)遞到死信隊(dui)列(lie)(見(jian)后面(mian)章節)

3.3.  消息刪除

消(xiao)息存儲時(shi),會(hui)在ETS(Erlang Term Storage)表(biao)中記錄消(xiao)息在文件(jian)(jian)中的(de)位置映射(she)和文件(jian)(jian)的(de)相(xiang)關信息。消(xiao)息被正確的(de)消(xiao)費后,即(ji)(ji)會(hui)被刪除,首先刪除該表(biao)的(de)記錄,但是不會(hui)立即(ji)(ji)刪除文件(jian)(jian),而是僅標(biao)記待刪除數(shu)據,待一個(ge)文件(jian)(jian)都是垃圾數(shu)據時(shi)可以(yi)將這個(ge)文件(jian)(jian)刪除。

4.   消息消費

4.1.  消費模式

      與生產者一樣,消(xiao)費者需(xu)要(yao)與節點建(jian)立連接,打開信道(dao),實(shi)現通(tong)訊。消(xiao)費者需(xu)要(yao)訂閱隊列進行消(xiao)費,消(xiao)費的模式分為推(push)和拉(pull)兩(liang)種.

   (1)推模式

  這個模(mo)式(shi)RabbitMQ推(tui)(tui)薦的(de),隊列會將消(xiao)息推(tui)(tui)送給消(xiao)費者,流程如下(xia):

 

為了(le)提(ti)高消(xiao)(xiao)(xiao)費(fei)速(su)度,消(xiao)(xiao)(xiao)費(fei)者可以評估自身的(de)消(xiao)(xiao)(xiao)費(fei)能(neng)力,預設Qos,也就是設置一次可以獲取消(xiao)(xiao)(xiao)息(xi)(xi)的(de)個(ge)數(類似TCP的(de)滑(hua)塊窗口),隊(dui)列批量(liang)推送消(xiao)(xiao)(xiao)息(xi)(xi)到消(xiao)(xiao)(xiao)費(fei)端,消(xiao)(xiao)(xiao)費(fei)者獲取消(xiao)(xiao)(xiao)息(xi)(xi)后(hou)恢復確認信息(xi)(xi)。

(2)拉模式

 

消(xiao)費(fei)者每次去(qu)隊列(lie)拉消(xiao)息,每次只能拉一(yi)條。

 

如果只是想獲(huo)取單條信息,而(er)不是持續訂閱,可以采用拉模式。但是不建議使用循環來替代(dai)推模式,這(zhe)樣(yang)會嚴重(zhong)影(ying)響RabbitMQ的性能。

4.2.  消息確認和拒絕

消(xiao)費者獲取(qu)消(xiao)息后(hou),可(ke)以(yi)設(she)置不(bu)回(hui)復確認,自動回(hui)復確認,以(yi)及手動回(hui)復確認三(san)種模式。

1、不回(hui)復確(que)認(ren),隊列(lie)會(hui)(hui)默認(ren)發出去的消息都會(hui)(hui)被正(zheng)確(que)的處理,無需等(deng)待確(que)認(ren),這個方式吞(tun)吐(tu)最(zui)高(gao),但是可靠性性最(zui)差。

2、自動回復確(que)認(ren),有可能消息沒有正確(que)處(chu)理,也被回復,隊列無法識別,導致消息丟(diu)失。

3、手動(dong)回(hui)復確認,可以根(gen)據(ju)業務處(chu)理的(de)邏輯,把(ba)握(wo)回(hui)復的(de)時機,會帶來一定的(de)業務復雜度,但是可靠性最好(hao)。

對于設置(zhi)Qos,可以進行批量回復(fu),無需單次回復(fu),這個(ge)也(ye)是(shi)提高吞(tun)吐的重(zhong)要手段。

    消(xiao)息者也可以拒絕(jue)消(xiao)息,比如(ru)消(xiao)息解析錯誤(wu)等(deng)情(qing)況,通(tong)過設置(zhi)requeue,來告訴(su)隊列是否需(xu)要重新投遞還是丟棄該(gai)消(xiao)息(如(ru)果配置(zhi)了死(si)信隊列,那么(me)丟失的消(xiao)息會進入(ru)死(si)信隊列,等(deng)待處理)。

&nbsp; 如果選擇了(le)重新投(tou)遞,那么消費的(de)順序是無法(fa)得到保證的(de)。

4.3.  隊列處理

 

為了提(ti)升消費(fei)的速(su)度,生產發(fa)布的消息(xi)到(dao)達隊(dui)(dui)列后(hou),如果隊(dui)(dui)列為空,且有消息(xi)者等待(dai)消息(xi),則(ze)直接(jie)發(fa)送給消費(fei)者,異步放入內存或者磁(ci)盤(pan),提(ti)升消費(fei)速(su)度。這種(zhong)標(biao)記(ji)已投遞的方式與Kafka的標(biao)記(ji)offset比較,邏輯(ji)上處理簡單了,確保了消息(xi)不會被重復消費(fei),但是(shi)也(ye)導致消息(xi)無法(fa)回溯。隊(dui)(dui)列創建(jian)(jian)時,可以設置exclusive屬(shu)性,如果這是(shi)為true,表示(shi)排他(ta)隊(dui)(dui)列,也(ye)就(jiu)是(shi)當(dang)時創建(jian)(jian)隊(dui)(dui)列連接(jie)(包(bao)括所(suo)有的信道)的客(ke)戶端(duan)可以消費(fei),其(qi)他(ta)的無法(fa)消息(xi)。通俗些,只有你當(dang)前這個程序(或進程)進行(xing)消費(fei)處理,不希望別的客(ke)戶端(duan)讀取到(dao)這個隊(dui)(dui)列,一般(ban)用(yong)在(zai)RPC模式。一旦(dan)連接(jie)中斷(duan),排他(ta)隊(dui)(dui)列也(ye)將刪(shan)除(chu),無論是(shi)否(fou)設置為持久化隊(dui)(dui)列。

5.   集群

單點無(wu)法(fa)(fa)確保高可用(yong),同時一(yi)臺(tai)I/O能(neng)力有限(xian),無(wu)法(fa)(fa)滿足高吞吐,在企(qi)業級應(ying)用(yong)中,一(yi)般都會部(bu)署集群,提供服務。

5.1.  集群拓撲

 

集群節點間(jian)呈網(wang)狀連接(jie),節點間(jian)相互(hu)通(tong)訊,每個(ge)節點上保留所有的元數據,包括:

1、交(jiao)換機(ji),交(jiao)換機(ji)的名稱(cheng)和屬(shu)性。

2、隊(dui)列元數據(ju),隊(dui)列的的名稱和屬性。

3、綁(bang)(bang)定關系,交(jiao)換機與交(jiao)換機以(yi)及交(jiao)換機與隊列的綁(bang)(bang)定關系元(yuan)數(shu)據。

4、vhost,vhost內(nei)的隊列,交換機和綁定(ding)的命名空間以及(ji)安全屬性。

所有(you)節點全(quan)量保留元數據,會有(you)以下的影響:

1、當客戶端連接(jie)某個節點(dian)創建這些元數據的(de)時候(比如隊列,交換機),需要同(tong)步到所(suo)有節點(dian)上,并(bing)等待所(suo)有的(de)節點(dian)的(de)完(wan)成(cheng)后,才答復成(cheng)功,所(suo)以(yi)會有一定的(de)延遲(chi)。

2、客戶(hu)端連接其中某個節(jie)點(dian),都(dou)能獲(huo)取到所有(you)的元數據(這點(dian)和(he)Kafka類似(si))。

消(xiao)息內容會(hui)(hui)不會(hui)(hui)所(suo)有節(jie)(jie)點(dian)(dian)(dian)都備份呢(ni)?答(da)案是否。因為(wei)這會(hui)(hui)造成大量的空間(jian)浪費(鏡像隊列(lie)(lie)除外,后(hou)面介(jie)紹(shao))。如果一個(ge)(ge)生(sheng)產(chan)者(消(xiao)費者)連接到(dao)某(mou)個(ge)(ge)節(jie)(jie)點(dian)(dian)(dian)發布(消(xiao)費)隊列(lie)(lie)消(xiao)息,但該隊列(lie)(lie)不在該節(jie)(jie)點(dian)(dian)(dian)上,那(nei)需要通過該節(jie)(jie)點(dian)(dian)(dian)路由。如下:

 

     消息的發布和消費鏈(lian)路加長,會導(dao)致(zhi)延遲加長,吞吐(tu)量(liang)降(jiang)低,為了減少(shao)影響,集群的節(jie)點建議部(bu)署(shu)(shu)到在一個匯聚下(xia),不要(yao)跨可用(yong)區部(bu)署(shu)(shu)(后面會介紹跨可用(yong)區的方法)。我們希望客戶端對接的節(jie)點上(shang)部(bu)署(shu)(shu)所(suo)需要(yao)的隊列,這個需要(yao)規劃(hua)得當。

 

5.2.  負載均衡

在多(duo)節(jie)(jie)點(dian)情況(kuang)下(xia)(xia),客戶(hu)端的請求通過負載(zai)均衡,將流量均勻分(fen)攤到(dao)各節(jie)(jie)點(dian),RabbitMQ集群可(ke)以通過HAProxy,LVS+keepalived等LB實現,如下(xia)(xia)圖所示。

5.3.  鏡像隊列

集群多(duo)節點能確(que)保(bao)整體服務的可用性(xing),但(dan)是(shi)對于(yu)單個隊(dui)列(lie)來說(shuo),如果做不了(le)多(duo)節點部署,還是(shi)有(you)單節點故障的可能,ActiveMQ中采用主從模式保(bao)證了(le)高可用,在RabbitMQ中,也(ye)有(you)類似機制(zhi),稱之為鏡像(xiang)隊(dui)列(lie)(或者HA隊(dui)列(lie))。

(1)創建鏡像隊列

在(zai)創建隊列時,通過ha-mode,ha-params,ha-sync-mode來定義(yi)鏡像隊列個數,分布,以及同步(bu)模式(shi)。

ha-mode,有效值為,all,exactly,nodes。alls表示(shi)在所有的節(jie)點(dian)(dian)上創建鏡(jing)像隊(dui)列,exactly表示(shi)指(zhi)定個數的節(jie)點(dian)(dian)上創建,ha-params設置個數;nodes指(zhi)定在哪些節(jie)點(dian)(dian)上創建,ha-params指(zhi)定節(jie)點(dian)(dian)名。

ha-sync-mode,有效(xiao)值為automatic,manual。automatic表示新節(jie)點(dian)(dian)加入時,默認自動同(tong)步鏡(jing)像隊(dui)列消息;manual表示新節(jie)點(dian)(dian)加入時,不(bu)會自動同(tong)步鏡(jing)像隊(dui)列消息。因為同(tong)步操(cao)作會導致(zhi)隊(dui)列的阻塞,建議使(shi)用manual模(mo)式。

鏡像隊列(lie)有一(yi)個master和多個slave組成,創建完成后,直接連接的(de)節點上的(de)隊列(lie)為(wei)master,其他的(de)為(wei)slave。

(2)發布消息

 

可(ke)以(yi)(yi)認為(wei)有(you)個隱藏的(de)(de)fanout交換機,向所有(you)的(de)(de)鏡(jing)像(xiang)隊列進(jin)行(xing)廣播(這里與Kafka不大一樣(yang),它是通過follower向master請(qing)求同步(bu)內容(rong))。當所有(you)的(de)(de)鏡(jing)像(xiang)隊列確認完(wan)成后,才向發布者(zhe)回(hui)復(fu)publish-comfirm(所以(yi)(yi)鏡(jing)像(xiang)個數不能太多(duo),否則影響發布吞吐量,一般2-3個為(wei)宜),這樣(yang)能確保所有(you)的(de)(de)鏡(jing)像(xiang)隊列的(de)(de)消(xiao)息(xi)都是同步(bu)的(de)(de)。

(3)消費

除了發(fa)布需(xu)要向master和slave同(tong)時投(tou)遞消(xiao)(xiao)息,其他的(de)都是(shi)由master負(fu)責向slave同(tong)步,包括消(xiao)(xiao)費(fei)(fei)(fei)(fei)(fei),ack等,如果消(xiao)(xiao)息者(zhe)(zhe)連接(jie)master隊列(lie)(lie)(lie)所(suo)在(zai)(zai)的(de)節(jie)點(dian)(dian)(dian)(如消(xiao)(xiao)費(fei)(fei)(fei)(fei)(fei)者(zhe)(zhe)2),則消(xiao)(xiao)息隊列(lie)(lie)(lie)信息即可;如果消(xiao)(xiao)費(fei)(fei)(fei)(fei)(fei)者(zhe)(zhe)連接(jie)的(de)是(shi)slave隊列(lie)(lie)(lie)所(suo)在(zai)(zai)的(de)節(jie)點(dian)(dian)(dian)(如消(xiao)(xiao)費(fei)(fei)(fei)(fei)(fei)者(zhe)(zhe)1),slave節(jie)點(dian)(dian)(dian)需(xu)要將消(xiao)(xiao)費(fei)(fei)(fei)(fei)(fei)指令發(fa)送(song)給(gei)master節(jie)點(dian)(dian)(dian),master節(jie)點(dian)(dian)(dian)將數(shu)據(ju)準備好(hao),發(fa)送(song)給(gei)到slave節(jie)點(dian)(dian)(dian),再投(tou)遞給(gei)消(xiao)(xiao)費(fei)(fei)(fei)(fei)(fei)者(zhe)(zhe)。slave隊列(lie)(lie)(lie)為何不(bu)類(lei)似(si)于mysql提供讀服務(wu)呢,這(zhe)個(ge)和Kafka的(de)設計類(lei)似(si),RabbitMQ的(de)負(fu)載粒度在(zai)(zai)隊列(lie)(lie)(lie)上,而不(bu)是(shi)整個(ge)節(jie)點(dian)(dian)(dian),只需(xu)要將master隊列(lie)(lie)(lie)均衡(heng)分布,就是(shi)平衡(heng)整個(ge)節(jie)點(dian)(dian)(dian)的(de)壓力。

 

如(ru)圖,節點(dian)1,節點(dian)2,節點(dian)3分(fen)別分(fen)擔隊列1,隊列2,隊列3的壓力。

(4)失效轉移

當slave所(suo)在節點掛掉(diao)后(hou),除(chu)了與slave相連的客戶端全部斷開連接,其他(ta)的沒有影響。但是(shi)當master所(suo)在的節點宕機后(hou),就(jiu)會產生連鎖反應(ying)。

1)與master節點所有(you)的客戶端連接斷開。

2)選舉最老的(de)slave節點為(wei)master節點,因為(wei)最老的(de)slave與master之間(jian)的(de)同步(bu)狀態(tai)是(shi)最好的(de),但是(shi)也存在未同步(bu)的(de)信息丟失。

3)新的master節(jie)點重續入(ru)隊所有(you)unack信息。消(xiao)費(fei)(fei)者獲取信息進行消(xiao)費(fei)(fei),還沒來得及(ji)ack,或者ack信息沒有(you)同步到(dao)新的master上(shang),新master無法確(que)(que)認這部(bu)分消(xiao)息是否被正確(que)(que)消(xiao)費(fei)(fei),為了(le)安全起見,所有(you)的unack都重新入(ru)隊,此(ci)時客戶端會有(you)重復消(xiao)費(fei)(fei)的可能。

4)如果消費端直連master所在節點(如上圖中的消費者2),master節點宕機后,TCP連接斷開,重入附加并監聽新的master節點;如果是連接slave所在節點(如上圖中的消費者1),就無法感知master節點宕機了,只認為隊列沒有消息。此時,在basicComsume消費時需要指定x-cancel-on-ha-failover參數,監聽master節點斷開的通知,接受到通知后重入附加并監聽新的master。這點非常重要,否則會導致消息大量積壓,但是消息端又無消息消費的問題。這個(ge)問題在Kafka中是不存(cun)(cun)在的(de)(de),分區選(xuan)舉的(de)(de)結果(guo)保存(cun)(cun)在所(suo)有的(de)(de)節點上,客戶端通過(guo)元數(shu)據更新,獲取最新分區leader信息。

6.   跨集群

現在(zai)大型的(de)網站系統,為了實(shi)現異地容災,一般采(cai)用多機房或者混(hun)合云部署(shu),以下(xia)分析單個(ge)集(ji)群在(zai)這種場(chang)景(jing)下(xia)的(de)集(ji)中方案

1僅部署一個機房

 

單個集(ji)群僅部(bu)署在北京機房,南(nan)京機房的發布者和消費者跨機房訪問(wen)。這個方案有以下(xia)幾個問(wen)題。

(1)無法(fa)做(zuo)到(dao)容災,一旦北京機(ji)房掛(gua)了,整個集群不(bu)可用。

(2)降低(di)吞(tun)(tun)吐(tu)量(liang),南(nan)京機(ji)房的(de)客戶端發送(song)請(qing)求(qiu)后,會阻塞住(zhu),直到(dao)(dao)節點(dian)回復確認,由于兩(liang)個機(ji)房的(de)存(cun)在(zai)延遲(假(jia)設北京到(dao)(dao)南(nan)京機(ji)房的(de)RTT在(zai)30ms),導致每(mei)次(ci)請(qing)求(qiu)時間(jian)增加,降低(di)了吞(tun)(tun)吐(tu)量(liang)。

2、延展機房部署

 

單個集群跨機房(fang)部(bu)署,節點(dian)1,節點(dian)2部(bu)署到(dao)北京(jing)機房(fang),節點(dian)3部(bu)署到(dao)南京(jing)機房(fang)。這個方案通(tong)過鏡像隊列(lie)在節點(dian)1,節點(dian)2與節點(dian)3間互備(bei),能做到(dao)部(bu)分(fen)的容災。但是也有如下(xia)問題:

(1)跨(kua)機房(fang)請求(qiu),部分消(xiao)費(fei)者和發(fa)布(bu)者還是需要跨(kua)機房(fang)請求(qiu),與方案1類似,導(dao)致請求(qiu)時間增加,降低(di)了吞吐(tu)量。

(2)腦裂,異地間的(de)網(wang)絡環境(jing)復雜,網(wang)絡波動會(hui)導致分(fen)區,進而"腦裂",單(dan)個(ge)集群是無(wu)法做到跨可用區的(de)。

單個集(ji)(ji)群是無(wu)法滿(man)足(zu)跨機房(fang)場景,需要采用多集(ji)(ji)群部署解決這個問題(ti),跨集(ji)(ji)群間(jian)的"橋接"可以通(tong)過Federation,Shovel兩種(zhong)方式(shi)。

6.1.  Federation(聯邦)

Federation,可以(yi)(yi)翻譯(yi)為"聯(lian)邦(bang)(bang)"。Federation 可以(yi)(yi)通過AMQP 協議讓原本發送到(dao)某個Broker(或(huo)集群(qun))中(zhong)的(de)(de)(de)交換(huan)器(或(huo)隊列(lie))上的(de)(de)(de)消息能夠轉發到(dao)另一(yi)個Broker(或(huo)集群(qun))中(zhong)的(de)(de)(de)交換(huan)器(或(huo)隊列(lie))上,兩方的(de)(de)(de)交換(huan)器(或(huo)隊列(lie)〉看起來是以(yi)(yi)一(yi)種(zhong)"聯(lian)邦(bang)(bang)"的(de)(de)(de)形(xing)式在運作。又分為聯(lian)邦(bang)(bang)交換(huan)機和聯(lian)邦(bang)(bang)隊列(lie)兩種(zhong)模式。

1、聯邦交換機

 

假設發(fa)布者1位于北(bei)京(jing)機(ji)(ji)(ji)(ji)房,需(xu)要發(fa)布消息到(dao)南京(jing)機(ji)(ji)(ji)(ji)房的(de)(de)Broker2節點上,首先在Broker2節點的(de)(de)ExchangeA上建立到(dao)Broker1節點的(de)(de)federation link,Broker1上會(hui)建立一(yi)個同名的(de)(de)交互機(ji)(ji)(ji)(ji)ExchangeA,同時(shi)建立一(yi)個內(nei)部的(de)(de)交換機(ji)(ji)(ji)(ji)ExchangeA->Broker2,   并(bing)通過路由(you)鍵"rkA"將(jiang)這(zhe)兩(liang)個交換器綁定(ding),同時(shi),還會(hui)創建一(yi)個Exchange->Broker2隊列(lie)(lie),并(bing)與(yu)Exchange->Broker2交換機(ji)(ji)(ji)(ji)綁定(ding),Federation插件會(hui)在Broker1的(de)(de)ExchangeA->Broker2隊列(lie)(lie)與(yu)Broker2的(de)(de)ExchangeA建立AMQP連接,實時(shi)消費Exchange->Broker2隊列(lie)(lie)的(de)(de)消息。發(fa)布者1僅需(xu)要把消息發(fa)送了ExchangeA上,保(bao)存到(dao)隊列(lie)(lie)ExchangeA->Broker2上即可,剩下的(de)(de)時(shi)就交由(you)Federation插件搞定(ding)。

2、聯邦隊列

聯邦交換機是(shi)在兩個節(jie)點的交換機間(jian)建立連接,聯邦隊(dui)列(lie)就是(shi)在兩個隊(dui)列(lie)間(jian)建立連接

 

聯邦(bang)隊(dui)(dui)列(lie)可以(yi)看做互為擴展隊(dui)(dui)列(lie),如圖中的(de)Queue1與(yu)(yu)Queue3建立聯邦(bang)隊(dui)(dui)列(lie),當Queue3有消(xiao)息堆積,消(xiao)費(fei)者1優(you)先(xian)將Queue3的(de)消(xiao)費(fei)完,此時則會(hui)從Queue1拉取消(xiao)息;反(fan)之亦然,如果Queue1的(de)隊(dui)(dui)列(lie)消(xiao)費(fei)完成(cheng),將會(hui)從Queue3中拉取消(xiao)息,消(xiao)費(fei)者1繼續消(xiao)息,所(suo)以(yi)Queue1與(yu)(yu)Queue3間的(de)消(xiao)息是可以(yi)"漂移"的(de)。聯邦(bang)隊(dui)(dui)列(lie)會(hui)讓(rang)消(xiao)費(fei)能力強的(de)一(yi)側多消(xiao)費(fei)些,確保隊(dui)(dui)列(lie)間的(de)平衡。聯邦(bang)交換機(ji)的(de)消(xiao)息流向(xiang)是單向(xiang)的(de),聯邦(bang)隊(dui)(dui)列(lie)消(xiao)息流向(xiang)是雙向(xiang)。

6.2.  Shovel

 

以Broker1的Queue為源,Broker2的Exchange為目標(biao),建立shovel link,發(fa)布(bu)者發(fa)送的Msg1消息,存(cun)入queue,Shovel消費后存(cun)入Broker2的queue。當(dang)Broker1的queue產生消息堆積(ji)時,通過shovel link轉移消息到其他集群進行消息,減(jian)少broke1的壓力。

7.   可靠性

可靠性是衡量消(xiao)息組件(jian)的(de)重(zhong)要因素,但(dan)可靠性都(dou)是相對的(de),沒有(you)任何組件(jian)確保(bao)百(bai)(bai)分(fen)百(bai)(bai)可靠。我(wo)們看下RabbitMQ有(you)哪些措施保(bao)證高(gao)可靠,將整個(ge)消(xiao)息的(de)生命(ming)周期(qi)分(fen)為四(si)個(ge)階(jie)段(duan)來逐一(yi)分(fen)析

 

7.1.  第一階段:消息投遞

前面介紹了生產(chan)者(zhe)確(que)認的機制,通過對信道設置comfirm,對于每條投遞的消息(xi),都會返(fan)回確(que)認信息(xi)。除此之外,生產(chan)者(zhe)還可(ke)(ke)以通過事務機制確(que)保消息(xi)投遞的可(ke)(ke)靠性。

  • 事務機制

生產者通過(guo)二階段提交方式(shi),確報批量消息的事務一(yi)致性。

 

1、客戶(hu)端(duan)發(fa)送Tx.select,將信道置(zhi)為事(shi)務模式。

2、Broker回(hui)復(fu)Tx.selectOk,確認已將(jiang)信道置為事務模式。

3、客戶端發送(song)批量消(xiao)息(xi)。

4、根據結果(guo)(捕獲(huo)異常),客戶端確定(ding)是(shi)否提交還是(shi)回滾。

5、Broker確認提交或者(zhe)回(hui)滾指令。

事務機制能確保批量消息投遞的一致性,由于其同步操作,導致性能大大下降。事務與生產者確認機制是互斥的,一般情況下我們采用輕量級的生產者確認機制

7.2.  第二階段:消息路由

1、mandatory

當消息(xi)發(fa)送到交換機,有(you)可能由于路由配置(zhi)錯誤(wu),導(dao)致消息(xi)無法正確投(tou)遞(di)。此時(shi)(shi),有(you)兩(liang)種(zhong)處理(li)方式(shi),一(yi)種(zhong)是直(zhi)(zhi)接(jie)(jie)丟(diu)棄,一(yi)種(zhong)是反饋給生產(chan)者。第(di)一(yi)章節在介(jie)紹調(diao)用publish的時(shi)(shi)候,說到mandatory參(can)數,該參(can)數如(ru)果設(she)置(zhi)為(wei)true,會回(hui)調(diao)生產(chan)端的監聽接(jie)(jie)口,返回(hui) 消息(xi);如(ru)果設(she)置(zhi)為(wei)false,則直(zhi)(zhi)接(jie)(jie)丟(diu)棄。顯然設(she)置(zhi)為(wei)true,將提升可靠性。

 

2、備份交換機

保(bao)(bao)留(liu)路由(you)錯(cuo)誤的(de)(de)消息,可(ke)以設置備份交換機(ji),路由(you)失敗的(de)(de)消息會丟到備份交換機(ji),保(bao)(bao)存到特定(ding)的(de)(de)隊(dui)列(lie)。通過該(gai)隊(dui)列(lie)監控路由(you)錯(cuo)誤的(de)(de)消息。

 

7.3.  第三階段:消息存儲

消息(xi)存儲階段,介(jie)紹了以下幾種可靠性機制

1、持(chi)久化,隊(dui)列和消息設(she)置為持(chi)久化,寫(xie)入到磁盤(pan),確保(bao)Broker宕(dang)(dang)機重啟(qi)后,消息不丟失(shi)。但是(shi)消息先寫(xie)入頁(ye)面緩存,再批量寫(xie)入磁盤(pan),如果在(zai)這(zhe)期間(jian)宕(dang)(dang)機,還是(shi)會存在(zai)丟失(shi)風險。

2、鏡像隊列(lie),采用msater-slave多(duo)副(fu)本機(ji)制,最(zui)大程度確保可用性。

3、死信隊列

死(si)信(xin)隊列是(shi)一種(zhong)特殊(shu)隊列,存儲死(si)信(xin)消(xiao)息(xi)(xi),有以(yi)下幾種(zhong)情況下的消(xiao)息(xi)(xi)會變成死(si)信(xin)消(xiao)息(xi)(xi)。任何隊列都可以(yi)設置一個死(si)信(xin)交(jiao)換(huan)機,將符合條件的消(xiao)息(xi)(xi)路由到死(si)信(xin)隊列。死(si)信(xin)隊列也是(shi)一個普(pu)通隊列,里(li)面的消(xiao)息(xi)(xi)也可被訂閱。監控死(si)信(xin)隊列,及時(shi)處理(li)死(si)信(xin)消(xiao)息(xi)(xi),確保消(xiao)息(xi)(xi)的可靠性。

1)消(xiao)息被拒絕(Basic.reject,并設置request為false)

2)消息過(guo)期(超過(guo)expiration時間)

3)隊列達到最(zui)大(da)長度(du)。

7.4.  第四階段:消息消費

消(xiao)費者獲(huo)取消(xiao)息(xi)(xi)后,通(tong)過手動確認,能最大(da)程度消(xiao)息(xi)(xi)消(xiao)費的可靠性,也可以拒(ju)絕消(xiao)息(xi)(xi),并(bing)設(she)置request為true,該消(xiao)息(xi)(xi)會(hui)重新入隊。

8.   高吞吐

高(gao)可用,高(gao)吞(tun)吐在有些(xie)場景中(zhong)是互斥的(de),所以(yi)需要根據自身(shen)的(de)業務進(jin)行抉擇,以(yi)下分析(xi)提高(gao)吞(tun)吐率的(de)一些(xie)措施。

1、生產者不設置信道確認(ren),能大幅提高(gao)吞吐量,但(dan)是可(ke)靠性較差,可(ke)以(yi)用在(zai)允許(xu)部分消息丟失的場(chang)景(jing),比如統計用戶點擊量。

2、按照業務規(gui)則(ze)將(jiang)隊(dui)列拆分多個(ge),分布到(dao)不同(tong)的節點上。

3、一(yi)個(ge)(ge)節點(dian)上(shang)設置合理的(de)隊列(lie)個(ge)(ge)數(shu),一(yi)個(ge)(ge)隊列(lie)對應一(yi)個(ge)(ge)線程,在一(yi)個(ge)(ge)多核(he)的(de)節點(dian),使用(yong)多個(ge)(ge)隊列(lie)與(yu)消(xiao)費者可(ke)以獲得更好的(de)吞吐量(liang),將(jiang)(jiang)隊列(lie)數(shu)量(liang)設置為等于服(fu)務器cpu核(he)數(shu)將(jiang)(jiang)獲得最佳吞吐量(liang)。

4、限(xian)(xian)制隊列(lie)的大小(xiao),設置TTL或者max-length限(xian)(xian)制隊列(lie)大小(xiao)。

5、自動刪除不需要的(de)(de)隊(dui)列(lie),設置隊(dui)列(lie)的(de)(de)TTL,配置auto-delete以及排它隊(dui)列(lie),刪除無用的(de)(de)隊(dui)列(lie)。

6、設置盡可能(neng)大(da)的(de)預取個數,不過(guo)這(zhe)個是需要綜合網(wang)絡帶寬,客戶端緩存,消(xiao)息處理(li)速度等(deng)各方(fang)面(mian)的(de)因素(su)評估。

 

文章來自個人專欄
文章(zhang) | 訂(ding)閱
0條評論
作者已關閉評論
作者已關閉評論
0
0