1 架構
以下官網給的架構圖,包含以下幾部分:
Producer,生產者,封裝消息,并將消息以同步或者異步的方式發送到Broker。
Broker,Broker負責消息的傳輸,topic的管理以及負載均衡,與其他消息隊列組件不同,該Broker不負責消息的存儲,是個無狀態組件。
Bookie,負責消息的的持久化,采用Apache BookKeeper組件,BookKeeper是一個分布式的WAL系統。
Consumer:消費者,以訂閱主題的方式消費消息,并確認。Pulsar中還定義了Reader角色,也是一種消費者,區別在于,它可以從指定置位獲取消息,且不需要確認。
ZK,負責集群的配置管理,包括租戶,命名空間等,并進行一致性協調。
從架構上看,與Kafka最大的不同點在于計算和存儲分離。這種設計的好處是服務層和存儲層可以獨立擴展,提升了彈性的擴容。
2 生產消息
2.1 生產者創建
創建連接和生產者,需要指定以下必要的屬性信息:
(1)Broker地址,一般是指定Broker集群的域名地址,由集群重定向到某臺具體Broker上提供服務
(2)Topic信息,明確消息發送到哪個topic,topic由以下幾個部分組成
{persistent|non-persistent}://tenant/namespace/topic
| Topic名稱組成 | Description | 
| persistent / non-persistent | 用來標識 topic 的類型。 Pulsar 支持兩種不同 topic:持久化和 非持久化型(如果你沒有明確指定,topic 將會是默認的持久化類型)。 持久化 topic 的所有消息都會存儲到硬盤上(除非是單機模式的Broker,否則都是會在多塊磁盤上)。非持久化 topic 的數據將不會存儲到硬盤上。 | 
| tenant | 租戶,Pulsa支持多租戶,該topic所屬的租戶名 | 
| namspace | 將相關聯的 topic 作為一個組來管理,是管理 Topic 的基本單元。, 每個租戶可以有多個命名空間。 | 
| topic | 主題名稱 | 
2.2 分區選擇
除了普通的topic,Pulsar也支持分區topic,一個topic可以設置多個分區,那么在消息發送時如何路由到不同的分區,Pulsar支持三種路由模式。
(1)RoundRobinPartition,如果沒有指定key,則以round-robin的方式路由到所有分區;如果指定key,則根據key做hash,散列到對應的分區上。
(2)SinglePartition,如果沒有指定key,則隨機選擇一個分區,并發送所有消息;如果指定key,則根據key做hash,散列到對應的分區上。
(3)CustomPartition,使用自定義消息路由,可以定制消息進入特定的分區的策略。
2.3 消息發送
生產者在創建的時候,配置的是集群的請求地址(比如pulsar://pulsar-cluster.acme.com:6650),最終重定向到一個具體的Broker地址上,這就涉及到尋址的過程(可以對比下Kafka的元數據更新機制)。具體的步驟如下:
(1)客戶端將嘗試通過向服務器(Broker)發送 HTTP 查找請求,來確定主題(Topic)所在的服務器(Broker)。 通過查詢Zookeeper中(緩存)的元數據,來確定這條消息的topic在哪個Broker上,如果該topic不在任何一個Broker上,則把這個topic分配在負載最少的Broker上。
(2)當客戶端獲取了Broker的地址之后,將會創建一個TCP連接(或復用連接池中的連接)并且進行鑒權。 客戶端和Broker通過該連接交換基于自定義協議的二進制命令。 同時,客戶端會向Broker發送一條命令用以在Broker上創建生產者/消費者,該命令將會在驗證授權策略后生效。
連接建立后,就可以開始發送消息。發送的模式有同步和異步兩種:
- 同步發送,生產者發送消息后,等待Broker的ack,如果沒有接受到ack,則認為發送失敗。
- 異步發送,生產者將消息發送到本地的阻塞隊列,就立即返回,客戶端將在后臺將消息發送達到Broker,隊列的大小可以配置(配置MaxPendingMessages大小)。發送完成后,將調用回調方法進行通知。
與Kafka類似,生產者支持消息的批量發送。producer將會累積一批消息,然后通過一次請求發送出去。批處理的大小取決于設置的最大的消息數量及最大的發布延遲。
3 消息存儲
Pulsar采用的是計算存儲分離架構,Broker并不持久化消息內容,可以認為是一個proxy層,實現類似Kafka的client的一部分功能,Broker是個無狀態組件,消息內容實際存儲在bookie中。Broker實現topic的負載均衡,以及對外對提供讀寫接口,實現消息傳輸。
3.1 負載均衡
生產者在發送前,需要拿到Topic歸屬的Broker,才能將消息發送到正確的Broker上。那topic和Broker的對應關系是如何維護和均衡的呢?
每個topic(分區)歸屬一個Broker,此Broker為該topic(分區)的所有者(ower),負責topic讀寫服務。topic(分區)發送變化,基于當前Broker的負載狀況,將topic(分區)動態分配到適合的Broker。實際實現中,Pulsar并不是以topic的粒度實現負載均衡的,而是bundle。幾者之間的關系如下:
bundle是每個命名空間中topic的集合,在NameSpace創建的時候可以指定bundle個數(配置defaultNumberOfNamespaceBundles),命名空間下的topic按照名稱hash到對應的bundle,以bundle為粒度單位負載到各個Broker。我們看下負載均衡的幾個場景:
- 當增加或者刪除一個topic時,僅是打開或者關閉該topic,不會引起重新分配。
- 當卸載namespace下所有的topic,則會發生再均衡,會有10ms左右的抖動。
- 當Broker超載(基于 CPU 、網絡和內存指標閾值,其中某個超過85%),會引起減負動作,將一部分bundle重新分配到其他Broker上。
- 當某個bundle的topic數量達到一定閥值,會引起bundle拆分動作(分為2個),將新的較小的 Bundle 重新分配給不同的 Broker。
均衡后的相關的數據保存到zk中。
3.2 非持久化與持久化消息存儲
在topic的定義時,可以指定該topic消息的非持久化和持久化類型。
- 非持久化消息,僅會保存到Broker的緩存中,然后轉發到消息端,不會持久化到磁盤。這就意味著,當某個 Pulsar Broker宕機,或斷開訂閱者與某個主題(非持久性)的連接,意味著所有正在傳輸的消息都會丟失,客戶端也可能會看到消息的丟失。
- 持久化消息,pulsar采用Bookkeeper作為持久化存儲。 BookKeeper是一個分布式的預寫日志(WAL)系統。我們先認識下這個組件系統,然后再來看下pulsar是如何使用它的。
3.3 BookKeeper
1) 相關的概念
- Entry,Entry是存儲到bookkeeper中的一條記錄,其中包含Entry ID,記錄實體等。
- Ledger,可以認為ledger是用來存儲Entry的,多個Entry序列組成一個ledger。
- Journal,其實就是bookkeeper的WAL(write ahead log),用于存bookkeeper的事務日志,journal文件有一個最大大小,達到這個大小后會新起一個journal文件。
- Entry log,存儲Entry的文件,ledger是一個邏輯上的概念,entry會先按ledger聚合,然后寫入entry log文件中。同樣,entry log會有一個最大值,達到最大值后會新起一個新的entry log文件
- Index file,ledger的索引文件,ledger中的entry被寫入到了entry log文件中,索引文件用于entry log文件中每一個ledger做索引,記錄每個ledger在entry log中的存儲位置以及數據在entry log文件中的長度。
- MetaData Storage,元數據存儲,是用于存儲bookie相關的元數據,比如bookie上有哪些ledger,bookkeeper目前使用的是zk存儲,所以在部署bookkeeper前,要先有zk集群。
2) 讀寫流程
結合下面的圖,我們再看下讀寫流程
- 寫入流程:
1、Ledger先寫入Journal,并同步落盤持久化。
2、寫入index以及Entry log,這一步都是先寫入page cache,再落盤。
- 讀取流程:
1、先讀取index文件,獲取entry log的索引
2、從對應的entry log中獲取數據。
3)復制機制
Bookkeeper對所有的數據拷貝多個副本,一般是三份或是五份——到不同的機器上,可以是同一數據中心,也可以是跨數據中心。如圖所示:
- Ensemble Size (E),表示將要寫入的實際的Bookies數量。通過機架感知策略,從不同的機架選擇bookie,組成bookie池。本例中bookie1-5為bookkeeper的ensemble,數量為5。
- Write Quorum Size (Qw),表示每條Entry實際需要寫入的最小bookie數量,即Entry的副本數。本例中的bookie2-4,數量為3
- Ack Quorum Size (Qa),表示完成bookie數量的寫入,即可向客戶端返回ack。本例中的bookie3-4,數量為2。
三者之間的關系如下:
QW<=E,即Entry的副本數,不能超過bookie池的大小。QW越大,副本冗余越多,可靠性越高。
QA<=QW,確認bookie數據,不能超過實際的Entry的副本數,一般QA=(QW+1)/2。不像其他使用主/從或是管道復制算法在副本之間復制數據的分布式系統(例如Apache HDFS、Ceph、Kafka),Apache BookKeeper使用一種多數投票并行復制算法在確保可預測的低延時的基礎上復制數據。
3.4 Broker與bookkeeper的交互
對于pulsar,Broker的Managed Ledger負責維護相關的bookie(即bookkeeper)。
如下圖所示,通過分層+分片,Topic(分區)的所屬的消息數據,分成多個Segment(即bookkeeper的ledeger),每個Segment有多個副本,存儲在多個 BookKeeper 上。Pulsar Broker負責Segment(ledeger)開啟和關閉。
當消息數據到達Broker后,寫入器并行寫入多個BookKeeper存儲節點(就是Bookies)。
Broker 持久化完成后,會將數據緩存在本地內存中以提供追尾讀(Tailing Reads),提高最新數據讀取速度(后面重點介紹)。
3.5 消息保留和到期
默認的情況下,消息保留有以下兩種:
- 對于沒有訂閱者的topic消息,將不會保留。
- 對于有多個訂閱者的topic消息,所有的訂閱者都已經確認完畢后,立即進行刪除;如果未被確認,被一直持久化。
在有些場景中,默認的策略是不滿足,比如即使消息都被消費并確認,但也有可能需要進行消息回溯。
Pulsar提供了保留和過期策略,覆蓋默認策略。注意:以下的策略針對namespace所有topic。
(1)消息保留,對于已經消費完成的消息,可以通過指定保留大小(defaultRetentionTimeInMinutes)和保留時間(defaultRetentionSizeInMB),當某個閥值達到后,則達到進行刪除標識,等待刪除。
(2)消息過期,未被確認的消息設置存活時長(TTL)。當時間過期后,即使消息沒有確認,也會被刪除掉。
Kafka的消息清理,主要基于時間,日志大小,日志偏移量三種模式。
4 消息消費
4.1 主題訂閱模式
在pulsar中,有4種可用的訂閱,分別是 獨占(exclusive),共享(shared),災備(failover)和 鍵共享(key_shared)。
- 獨占(exclusive),只能有一個消費者綁定到訂閱上(topic/partition)。 如果多于一個消費者嘗試以同樣方式去訂閱主題,消費者將會收到錯誤。獨占是pulsar的默認模式。
- 災備(failover),多個消費者綁定到訂閱上(topic/partition),根據消費者的 優先級,確定master(優先級高的消費者)消費者,正常情況下,master消費者生效;當master消費者出現故障,斷開連接后,那么其他的消費者按照優先級重新選定master繼續消費未確認的消息。
- 共享(shared),多個消費者可以綁定到同一個訂閱上(topic/partition),消息通過round robin輪詢機制分發給不同的消費者,并且每個消息僅會被分發給一個消費者。 當消費者斷開連接,所有被發送給他,但沒有被確認的消息將被重新安排,分發給其它存活的消費者。
- 鍵共享(key_shared),多個消費者可以綁定到同一個訂閱上(topic/partition),相同key或者orderingkey分發到同一個消費者,所以鍵共享模式的消息需要指定key或者orderingkey。
Kafka的多分區,同一個消費組中消費者的訂閱模式與pulsar的獨占模式類似。這種模式下,消費者的個數不能大于partition的個數,限制了消費能力,而pulsar的共享模式有效的解決了這個問題。
4.2 消費模式
Kafka是采用的拉模式,而pulsar是推模式。消費端訂閱主題后,監聽從Broker的推送的消息,當Broker有新消息,推送到consumer端的緩存隊列(其大小可以通過recevieQueueSize設置),通過調用receive方法進行處理。消息處理完成后,發送確認信息,Broker會記錄下當前消費的cursor(類似于Kafka的offset)。
pular支持批量發送以及累積確認,即確認最后一條消息,那么默認前面的信息都已經消費完成。pulsar也有訂閱組(類似Kafka的消費組),但是對于共享與鍵共享的訂閱類似,對于partition,沒有消費者個數的限制。
4.3 Broker的讀取流程
當某個消費者訂閱主題(分區)后,與生產者類似,會與該主題(分區)的所有者(ower)建立連接,后續消息的推送都由該Broker負責。
根據訂閱的策略模式,首先從Broker的緩存中讀取最新的數據,如果讀取到,則立即返回,不需要從磁盤讀取,這種方式(Tailing Reads)滿足大部分的場景。 如果要讀取歷史數據(Catch-up Reads),那么就從bookie中讀取。
Broker 會向所有 Bookie 發送獲取 LAC(LastAddConfirmed) 請求,得到大多數回復后即可計算出一個安全的 LAC 值,這個流程就是采用了 Quorum Read 的方式。Pulsar Broker 獲取可靠的 LAC 之后,其讀取可以從任一 Bookie 開始,如果在限定時間內沒有響應則給第二個 Bookie 發送讀取請求,然后同時等待這兩個 Bookie,誰先響應就意味著讀取成功,這個流程稱之為 Speculative Read。由于Fragment分布在不同的bookie上,所以讀取的過程會在bookie間漂移。
4.4 消費確認
當一條消息消費完成后,會向Broker發送一條確認消息,Broker將保存消費游標(類似Kafka的offset)。消費者也可以進行批量的確認,Broker記錄下最后一條消息的游標。
在異常情況下,比如消費失敗(如入庫失敗),需要重新消費,取消確認,Broker會重新加入隊列,將會重新分配。通過設置消息確認的超時時間,如果超時,也將觸發重新投遞。
1 可靠性設計
以下從生產,存儲,消費三個階段來了解下可靠性設計。
1.1 生產階段
通過同步或者異步方式將消息發送到Broker節點,并根據Broker的ack進行處理,如果發送失敗,可以進行重試。這方面與Kafka的處理類似,確保了生產階段消息的可靠性。
1.2 延遲投遞
對于有些消息,需要延遲一段時間后投遞(而不是立即投遞),生產者定義消息的延遲時間,并發送到Broker,Broker將消息存儲,并通過DelayedDeliveryTracker來管理定時的內存索引(time->messageid),當達到時間后,投遞給消費者。
1.3 存儲階段
pulsar采用計算和存儲分離的架構,每個topic都存在一個owner Broker,負責該topic的讀寫請求,采用分層存儲,將消息保存到bookie上,下面分析Broker,bookie的可靠性。
1.4 Broker故障恢復
Broker由于某種故障(比如宕機,停電),導致不可用。如下圖所示,Broker2是Topic1-Part2的owner,當Broker出現故障時(通過zk探測),Broker3要立即接管Topic1-Part2的owner角色。
由于計算和存儲分離的架構,存儲層是不需要重新復制的。如果有新數據到來,它立即附加并存儲為Topic1-Part2中的Segment x + 1。 Segment x + 1被分發并存儲在Bookie1, 2和4上,因為它不需要重新復制數據,所以所有權轉移立即發生而不會犧牲主題分區的可用性。
1.5 bookie故障恢復
如圖所示,bookie2上的Segment3由于磁盤損壞,導致不可用。
Apache BookKeeper中的副本修復是Segment(甚至是Entry)級別的多對多快速修復,這比重新復制整個主題分區要精細,只會復制必須的數據。 這意味著Apache BookKeeper可以從bookie 1和bookie 3讀取Segment 3中的消息,并在bookie 1處修復Segment 3。所有的副本修復都在后臺進行,對Broker和應用透明。
即使有Bookie節點出錯的情況發生時,通過添加新的可用的Bookie來替換失敗的Bookie,所有Broker都可以繼續接受寫入,而不會犧牲主題分區的可用性。
1.6 集群擴容
對于Broker的擴容,主題分區將立即在Brokers中做平衡遷移,一些主題分區的所有權立即轉移到新的Broker。與Kafka不同,其rebanance幾乎是零時間。
對于bookie的擴容,如下圖所示。
當Broker 2將消息寫入Topic1-Part2的Segment X時,將Bookie X和Bookie Y添加到集群中。 Broker 2立即發現新加入的Bookies X和Y。然后Broker將嘗試將Segment X + 1和X + 2的消息存儲到新添加的Bookie中。 新增加的Bookie立刻被使用起來,流量立即增加,而不會重新復制任何數據。 除了機架感知和區域感知策略之外,Apache BookKeeper還提供資源感知的放置策略,以確保流量在群集中的所有存儲節點之間保持平衡。
1.7 消費階段
在消息消費階段,接受消息后,需要進行確認。如果確認超時,或者取消確認,那么消息將重新投遞。如果消費失敗或者異常(比如消息解析失敗,或者下游系統故障等)。Pulsar提供了兩種處理模式。
1.8 死信主題
與Kafka類似,如果消費不成功,并確認不可恢復(如消息體異常),那么就投遞到死信主題。監控該主題的消息,就可以知道消息的異常情況。當然死信主題其性質也是主題,其消息也可以被消費。
1.9 重試主題
1.10 生產者可將消息發送到正常的主題和重試主題,并允許消費者重試。當消費失敗后,延遲一段時間從重試主題從獲取消息進行處理。
1.11 讀取模式(reader)
對于消費模式,消費者監聽所訂閱的topic的信息,一旦有新的消息,Broker將推送消息到consumer,consumer處理處理完這些消息并最終確認它們。 每當消費者者連接到某個主題時, 它就會自動開始從最早的沒被確認(unacked)的消息處讀取, 因為該主題的游標是由Pulsar自動管理的。
對于讀取模式,可以指定消費的位置,包含:
- 主題中的最早的 可用消息
- 主題中的最新 可用消息
- 除最早的和最新的之外的可用消息位點。
兩者模式的比較如下圖所示:
讀取模式應用的場景是消息的重放。
2 高吞吐設計
支持高吞吐是pulsar重要特性,根據性能測試,單臺Broker(4C8G),2KB大小消息體,10WTPS,端到端延遲控制在10ms以內。我們也從生產,存儲,消費三個階段分析下高并發的設計和措施。
2.1 生產階段
生產階段影響并發和吞吐量的因素主要有發送模式,壓縮機制,批量發送。發送模式前面分析過,在同步模式的場景下,需要等待Broker的ack,會降低并發。對于一些可靠性要求不高的消息,比如日志,用戶行為數據等,可以采用異步模式。以提升吞吐量。
pulsar支持LZ4,ZLIB,ZSTD,SNAPPY四種壓縮算法,消息壓縮能大大節省帶寬,提高吞吐率,從實測結果看,LZ4的壓縮效率要高;與Kafka類似,pulsar也支持批量發送消息,可以配置一次發送消息的大小(batchingMaxMessages),對于高并發的場景,一般選擇批量發送,并按照實際發送量設置該值。
2.2 存儲階段
Kafka中,先寫入到leader partition,然后復制到其他的partition分區。對于pulsar系統,Broker是無狀態的,數據會發送給服務該分區的 Broker,該 Broker 并行寫入數據到存儲層的多個節點中。一旦存儲層成功寫入數據并確認寫入,Broker 會將數據緩存在本地內存中以提供追尾讀(Tailing Reads)。
和傳統的架構相比,pulsar減少了相互復制引起的I/O和帶寬消耗,通過并行提升了寫入了速度。另外QW,QA的大小也影響并發量,數值越小,速度也快,吞吐量也越大,但是可靠性降低。
2.3 消費階段
從Broker讀取,消費端處理兩個方面分析下:
(1)追尾讀(Tailing Reads)
Broker將消息數據持久化到存儲層后,會將最新的數據放入到本地cache中。消費者讀取最近的數據,直接從緩存中獲取即可,無需訪問存儲層。
與傳統的從文件系統讀取相比,減少了磁盤的I/O以及拷貝。pulsar對于最近的數據,緩存的命中率很高,大大加快了讀取的速度。
(2)追趕讀(Catch-up Reads)
對于歷史數據,會保存到存儲層,Catch-up讀可以通過存儲層來并行讀取數據,加快讀取的速度。我們把I/O讀寫放在一起比較:
傳統的模式下,讀寫以及復制都是通過lead Broker,Broker的承擔所有I/O負載,且無法分擔。對于pulsar分層式架構,I/O隔離,不會出現讀寫單個節點資源爭搶的情況,且負載到多個bookie并行讀寫,大大緩解了磁盤的負載,提高了吞吐能力。
(3)消費端處理
對于Kafka,每個消費組中的消費者個數不能超過其訂閱的topic下partition的個數,限制了消費能力。在pulsar中,共享訂閱模式對于每個partition可以有多個消費者,增加消費者的數量可以提升消費能力。pulsar支持累計ack(無法在共享訂閱模式下使用),也能提升其消費能力。
1 跨地域復制
與Kafka類似,Pulsar也提供了跨地域復制的解決方案。在多個數據中心間,按命名空間級別進行配置(該命名空間下所有topic),在任意個集群間進行復制。如下圖:
Topic T1可在三個集群生產,消息先存儲到本地集群,然后立即異步復制到其他集群,復制的延遲依賴數據中心間的RTT。每個集群擁有該Topic的所有消息,所以對每個消費端來說,相當于消費本地集群的消息。由于cluster-C沒有消費端,所有的消息僅在cluster-A,cluster-B兩個集群消費。它的工作機制是在 Broker 內部,為跨地域的數據復制啟動了一組內嵌的額外生產者和消費者(Replicator)。當外部消息產生后,內嵌的消費者會讀取消息;讀取完成后,調用內嵌的生產者將消息立即發送到遠端的數據中心,遠端的數據中心消費完成后,會ack該消息,通過cursor保存復制位置,確保復制中斷后,會繼續從斷點處復制(原理和Kafka類似)。
需要注意的時,pulsar會標記數據是否是復制過來的,避免多機房間循環復制。
2 多租戶
對于Kafka,不同的業務系統之間,可能需要搭建多套集群,這會增加運維的難度,另外Kafka的集群支持的主題數有限(幾千個左右)。pulsar在設計之初就考慮到多租戶的特性,一個集群可以運行上百萬個topic,通過租戶進行隔離。
在一個 Pulsar 集群中,有三個概念:tenant(圖中的 property 是之前的一種叫法,現在更習慣將其稱為:tenant)、namespace、topic。前面介紹過,一個topic的完整的命名包含這三個要素:{persistent|non-persistent}://tenant/namespace/topic
多租戶滿足企業級特性要求,設計上主要有以下特點:
- 使用身份驗證、授權和 ACL(訪問控制列表)確保其安全性
- 為每個租戶強制執行存儲配額
- 支持在運行時更改隔離機制,從而實現操作成本低和管理簡單。