1. Kafka架構
Producer,生產者,負責將消息發布到特定的Topic上,并將消息均衡分布到所有的partition。
Broker Cluster,Broker集群,由多個Broker節點組成。
Broker,Kafka的服務器節點,消息的物理存儲,負責生產者和消費者的讀寫請求。
Broker controller,Broker控制節點,實現Broker基本功能,同時負責管理Broker,包括partition的分配和選舉,Broker的監控(0.9版本之后新增)。
Zookeeper,集群的協調器,保存Broker,主題和分區的元數據信息。
Consumer group,消費組,由一組消費者組成,負責均衡的消費Partition的消息。
Consumer,消費者,訂閱一個或者多個主題,并按照順序拉取消息并處理。
Topic,主題,消息的邏輯歸類,表示同一類消息,包含多個partition。
Partition,分區,消息存儲的物理單位,是一個可追加的log文件,一個分區只歸屬某個主題。分區采用多副本機制(Replica),leader副本負責讀寫請求,follower副本同步leader副本數據,當leader副本出現故障,從follower中選舉中選舉一個,繼續對外提供服務,實現失效轉移。
2. 生產消息
消息的生產和發送基本流程:
2.1. 消息創建
生產者創建一條消息,消息需要包括Topic,Key(可以為null),Value信息,Partition可選。
首先將消息按照指定的序列化器對key,value進行分別序列化,將對象轉化成字節數組(建議不要自定義的,一般使用StringSerializer)。
2.2. 分區選擇
一個Topic有多個分區,該消息到底要寫入到哪個分區呢,如果分配不好,就會讓某些partition很忙,某些partition很閑,這就涉及到partition負載均衡,有以下幾種策略:
1、指定分區,也就是在消息創建的時候指定分區,此時分區器就不會處理,直接發往指定的分區即可。
2、根據key值指定,如果key為null,采用輪詢的策略選擇分區;如果key不為空,則采用默認的分區策略,對key進行散列化,映射到對應的分區上。因為相同的key值總會分配到同一分區上,默認策略可能會導致某個分區的的負載特點大,比如70%的消息都來自于某個客戶,我們又使用了客戶的id作為key,那么這些消息都會寫入同一個partition,導致該partition的負載大。
3、自定義分區策略,可以自定義特定的算法進行處理,可以避免上述默認策略的問題。
2.3. 消息發送
經過分區器的消息并沒有立即發送給Broker,而是先放到內存緩存區保存起來,當達緩存區大小到某個閥值(通過buffer.memory設定緩存區的大小),批量的發送給Broker(leader分區所在的Broker),這樣設計的目的就是為了提高吞吐量,但是也不可避免的帶來一定的延時。批量消息可以通過同步/異步的方式發送給Broker,同步模式會阻塞當前的線程,等待Broker的響應;異步模式是通過回調的方式接受Broker的結果。
2.4. 消息確認
我們知道Broker的分區是多副本的,寫入多少個分區副本才算消息發送成功了呢,生產者通過設置acks參數決定。
1、acks=0,生產者在成功寫入悄息之前不會等待任何來自服務器的響應,也就是發送出去后不關注Broker有沒有接受到。這種方式可以提高吞吐量,但是可靠性差,容易丟消息。
2、acks=1,生產者發送消息后,只要Leader副本接受到消息后,就算成功。這種吞吐量不如前一種,但是可靠性有提升,但也存在丟數據的風險,如果Leader副本接受到數據后,Follower還沒來得及拉取,Leader副本就崩潰了,那么這些消息就會丟失。
3、acks=all,生產者發送消息后,需要所有可用的副本(ISR列表)都接受到信息,才算成功。很明顯,這種模式的可靠性最高,但是會影響吞吐量。
2.5. 元數據更新
其實還有一個問題沒有回答,就是生產者如何知道主題,分區的信息,又是如何知道Leader副本的分布在哪些節點。這些都是通過生產者的元數據更新功能來完成。 元數據是指 Kafka 集群的元數據,這些元數據具體記錄了集群中有哪些主題,這些主題有 哪些分區,每個分區的 leader 副本分配在哪個節點上, follower 副本分配在哪些節點上,控制器節點又是哪一個等信息。這些數據在每個Broker上都存在,生產者會挑選一個leastLoadedNode,向該節點發送元素請求進行更新。
3. 存儲消息
生產者將消息發送到Broker(準確的說,是Leader分區所在的Broker),Broker持久化數據并同步到其他的副本。
3.1. 寫入
partition分區實質是日志存儲文件(log),當接受到寫入請求,將數據追加到日志文件的尾部。在寫入的時候,確保順序寫入,比如上圖中,A消息在B消息之前發送到Broker,那么A消息會保存在B消息前。同一個partition中,使用偏移量(offset)記錄消息的順序,偏移量對于消息的消費和回溯非常重要。
3.2. 復制
消息首先要寫入到分區的Leader副本,Follower副本發起同步請求,復制數據。前面講了,生產者對于acks有三種配置,acks=1,僅需要Leader寫入成功即可,acks=all是需要所有可用的副本寫入成功才能返回成功(有點類似mysql的半同步,同步概念)。那什么是可用的副本呢?
ISR(in-sync-replicas),同步副本。Follower副本同步Leader副本的數據是有一定的滯后的,當滯后在一定“容忍”范圍,就認為是ISR;如果超出了“容忍”范圍,就會從ISR清單中剔除出去,這樣就能確保不會因為這些"壞"副本,導致遲遲無法返回響應。這個"容忍"度可以通過以下兩個參數配置。
1、rerplica.lag.time.max.ms表示超過多少ms沒有請求,那么leader就有理由認為這個follower是不是掛了。
2、rerplica.lag.max.messages表示落后多少條消息就會移除。(0.9之后版本將不用該參數了)
ISR列表是動態的,當某個Follower達到“容忍”的范圍內,又會重新加入ISR列表。同步副本個數是一個很重要的監控指標,同步副本數越少,表示該分區的高可用就越低,如果為0,表示所有Follower副本都不"可用",可靠性將無法得到保證。
3.3. 存儲與索引
在一個大文件中查找和刪除消息是很費時的,Kafka會把分區分成若干個片段保存,每個片段可以設置大小,當達到上限,就關閉當前文件,打開一個新的文件。
我們來測算下,比如每個片段大小為1GB,每天每個分區產生10GB的數據,并設置數據保留1天,單個分區保留10個分片,如果單臺Broker分配的分區個數為100,那就是1000個片段,Broker會為分區的每個片段打開一個文件句柄(無論這些分片是不是在讀寫),就是1000個文件句柄,很可能就突破了linux的打開最大句柄數的限制(默認是1024,對于Kafka一般要設置50W+)。所以,分區數不是越多也好,是有限制的。建議單臺Broker不要超過200個分區,否則會影響性能。
這么多片段文件,如何快速定位呢,比如需要找到分區0的偏移量為100的片段位置,為了解決這個問題,Kafka為每個分區維護了一個索引,會把偏移量映射的片段文件保存在索引文件中,通過二分法能快速定位到片段文件。需要注意的是,索引文件也可以切片,每個分片也是一個打開的文件句柄,有會增加打開文件句柄的量。
3.4. 清理
這些片段是不能永久保存的,達到一定的條件,就需要清理。目前主要包括基于時間,基于日志大小,基于日志偏移量三種模式。
1、基于時間模式,檢查日志文件中是否有保留時間超過設定的的閥值(log.retention.hours, log.retention.minutes ,log.retention ms三個值配合),符合條件則刪除。比如log.retention.hours設置為24,則保留僅1天的片段數據,其他的都刪除。注意,活躍的片段(正在寫入的片段)是無法刪的,以下幾種模式也一樣。
2、基于日志大小模式,檢查片段文件的大小是否超過設定的閥值(通過log.segment.bytes設置單個片段的大小),符合條件則刪除。
3、基于日志起始偏移量模式,檢查片段文件的偏移量(結束位置)是否小于設置的偏移量閥值,符合條件則刪除。
4. 消費消息
消費者通過訂閱主題,輪詢拉取Broker的消息進行消費。
4.1. 消費群組
業務的應用服務器為了實現高并發,一般都是集群化部署,每臺應用服務器都可看做一個消費者,有些消息我們希望僅被消費一次(類似activemq的隊列),比如說訂單處理;有些消息則可以被重復消費(類似activemq的主題),比如說商品的價格,消費后放到本地緩存。那Kafka是如何滿足不同的消費模式呢?那就是消費組。
消費者從屬于消費群組,主題的某個分區消息,只能被消費組下的某個消費者消費,某個消費者可已同時消費多個分區消息。由于在寫入的時候,就保證了區分消息的唯一性,所以在從屬群組的兩個消費者不會重復消費到同一消息。
從屬不同群組的消費者是沒有這個限制,可以對同一主題下的分區消息進行消息。
上圖所示,消費者如果大于分區數,那么多余的消費者是無法參與消費的,比如消費者4。
4.2. 輪詢拉取
消費者采用的輪詢發送poll請求,批量拉取Leader分區消息。消費端和生產端一樣,也是通過元數據更新,獲取Broker,主題,以及分區信息的,就不再詳述。
消費者每次拉取的消息數據是可以根據實際情況設置的。
- min.bytes表示一次拉取的最小字節數;
- max.wait.ms表示當消息數據小于最小字節數時,等待的最大時長;max.partition.fetch.bytes,每個分區返回給消費者的最大字節數。
消費者需要上報當前消費消息的分區偏移量,來記錄消費的位置,以便下次從該位置繼續消費。上報偏移量實際就是發送_consumer_offset特殊的主題消息,該消息包含每個分區的偏移量,在老的版本保存在zookeeper中,新版本保存在控制器Broker中,目前有兩種提交偏移量的方式。
1、自動提交,通過設置定時時間(參數auto.commit.interval.ms),自動上報上一次poll的消息的offset,無論該消息是否被處理。這種模式較簡單,但是無法保證客戶端最后處理消息偏移量和提交的偏移量的一致性。以下圖為例:
上次提交poll后的提交的偏移量是5,消費者已獲取6-9的消息,但是在處理8的時候出現了故障。消費群組通過再均衡,分配其他的消費者繼續消費,該消費者就會從6開始獲取消息,這樣就會導致6-8的消息有重復消費。
2、手動提交,通過調用接口,由客戶端程序控制上報的時機,每次消息處理完成后,可以上報一次,最大程度上保證兩者偏移量的一致性。
4.3. 負載均衡
一個群組中有多個消費者,那么如何讓每個消費者均衡的消費每個分區呢。主要有以下兩種策略:
1、Range,該策略將每個主題下的分區平均分給消費者,比如主題A下有三個分區(分區0,分區1,分區2),消費群組包含兩個消費者(消費者1,消費者2),那么消費者1分配到分區0,分區2,消費者2分配到分區1。這種策略比較簡單,但是缺點也很明顯,僅考慮到一個主題下分區均衡,如果群組消費多個主題,就無法保障均衡。
2、RangeRobin,該策略把群組內所有訂閱的主題分區,以及所有消費者進行字典排序,通過輪詢的方式依次分配給每個消費者,該策略綜合考慮所有主題和消費者,最大程度上實現相對均衡。但是也不是最優的,如果消費者訂閱的主題不同,那么也存在負載不均的情況。
其他的還有sticky策略,以及自定義策略。
4.4. 再均衡(rebalance)
在消費者變化(正常或者異常退出,或者加入),主題或者分區發送變化時,協調器都會觸發再均衡。最常見的情況,是消費者發送變化,比如消費者出現異常,服務在心跳間隔(參數session.timeout.ms)沒有收到心跳請求;消費者接受消息后,處理的時間過長,下一次fetch的時間超時了設定閥值(max.poll.interval.ms)等。
rebalance會導致很多問題,包括重復消費,以及消息丟失,重復消費的場景我們在上面已經描述過了,再看下消息丟失的情況。
提交的消息偏移量大于客戶端實際的處理消息,如圖所示,6-8的消息會丟失。
對于重復消費,客戶端需要進行冪等性保護;對于消息丟失,通過消息回溯的方式補償。
一旦發送再均衡,同一消費組內的消費者需要重新協調分配,以便盡量保證公平性,這個過程會有一定的耗時,阻塞Kafka的消費,影響吞吐量,所以需要根據場景分析,避免頻繁的再均衡。
5. 可靠性
在企業級應用中,各業務交互錯綜復雜,不能因為組件某個節點的故障,導致全網的不可用,可靠性是對組件的基本要求,Kafka的架構設計,也同樣充分考慮到了可靠性要求。
5.1. 生產消息重發
前面在講生產消息時,生產者可以通過acks配置實現不同的確認策略,我們說acks=all時,Leader和Follower分區副本都同步完成后, 才能返回確認,可靠性最高。但是無論哪一種策略,都會存在消息發送失敗的情況,如果消息發送失敗,生產者會根據錯誤碼以及錯誤類型進行不同的處理,可以分為可重試異常和不可重試異常兩種,比如對于一些臨時性的錯誤(如網絡抖動,leader選舉等),生產者會嘗試重發,重發的次數和間隔的時間可通過 retries 和 retry.backoff.ms設置,這對于消息的可靠性非常重要,這類稱之為可重試異常;有些機制性錯誤,比如消息太大,無論重試多少次都無可能解決的,這類稱之為不可重試異常。對于這類異常需要進行調整,無法依靠Kafka自身的機制解決。
5.2. 分區均衡
分區有多個副本,那么這些副本在Broker上如何分布就顯得尤為重要,一方面解決讀寫負載均衡的問題,如果Leader分區不均衡,可能某些Broker負載過大,某些Broker又過于清閑;另一方面解決可靠性問題,Leader和Follower副本需要分布在不同的節點上,確保Broker節點不可用時,轉移到其他節點繼續服務。
Kafka首先對Leader分區副本以及Broker進行排序,通過輪詢的方式將leader分區均衡的分配到Broker上,比如有4個分區(分區0,分區1,分區2,分區3),3個Broker節點(BrokerA,BrokerB,BrokerC),那么BrokerA將分配分區0,分區3,BrokerB將分配分區1,BrokerC將分配分區2。
其次對Follower分區副本進行分配,采用Leader分區所在Broker+1的方式。比如分區0有兩個Follower副本(follower0,follower1),follower0副本將分配到到BrokerB,follower1副本將分配到BrokerC上,以此類推。
在實際的實施過程中,Broker節點會部署不同的機架,分區均衡時,也需要考慮這些因素(設置Broker.rack參數)。
5.3. 控制器節點
分區變化時,誰來負責這些分區的均衡呢。在老版本中,是在協調器zookeeper實現,鑒于zookeeper的性能問題,在新版本中增加了控制器節點來實現該功能。
控制器節點其實也是普通的Broker節點,只不過增加控制線程,也就是說任何一個Broker節點都有資格成為控制器節點。當第一個Broker啟動后,會嘗試到zookeeper注冊,如果zookeeper發現還沒有控制器,就會創建/controller目錄,寫入Broker相關信息。其他節點啟動后,也會嘗試注冊,但是發現已經被注冊過了,就認可其結果。
控制節點負責分區的均衡以及分區的選舉,是集群的"大腦",自身的高可用尤其重要。控制節點出現故障,或者網絡故障與zookeeper失去聯系,zookeeper會發出變更通知,其他的Broker通過watch,得到通知后,嘗試讓自己成為新的控制節點,第一個在zookeeper創建成功的節點將成為新的控制節點,其他的節點將接受到"節點已存在"異常。
新的控制節點將擁有一個數值更大的controller epoch。當故障的控制節點恢復后,不知道已經改朝換代了,可能還認為自己是控制節點,但是其他節點發現其controller epoch較小,會拒絕承認,這樣就避免了"腦裂"的情況。控制節點除了負責分區的均衡,還需要完成其他重要的工作,其中一個就是分區的選舉。
5.4. 分區選舉
為了確保分區的高可用,Kafka對分區采用多副本的模式(默認是3,復制系數可以配置),控制器通過遍歷獲取每個Broker,獲取其副本情況。
當某個Broker退出集群,該Broker節點上的Leader分區副本也將不可用,就需要選舉一個新的leader副本接替。前面講過每個分區維護一個ISR(同步副本)列表,控制節點就會從該列表中挑選一個作為新Leader副本(一般是列表位置的第一個),并通知到其所在的Broker以及其他Follower副本,新Leader副本接管讀寫請求,其他的Follower副本從新的Leader副本上同步。
如果同步副本為0怎么處理, 這是就面臨了一個兩難的選擇,如果從非同步副本中選取一個作為新Leader,該副本延遲同步的消息必將丟失;如果不選擇新的leader,那么就無法讀寫,需要等Leader副本恢復,可用性差。 可用性和數據一致性(可靠性),沒有一個統一最優解決方案,需要結合應用的場景進行選擇(設置unclean.leader.election.enable參數)。 就算同步副本不為0,如果生產者沒有用acks=all的策略,在分區選舉中,也可能會造成消息的丟失。對于可靠性要求高的系統,acks需要設置為all。
5.5. 數據一致性
Kafka的數據讀寫都是在Leader的副本上,這個和其他一些組件(比如zookeeper,mysql,redis)不一致,一般我們認為,讀寫分離能減輕主節點的負載,那Kafka為何不這么做呢?
首先,Kafka的基于partition的設計,每個Broker均衡分布分區的Leader副本,沒有所謂主從節點的概念,使得Broker的讀寫的負載是相對均衡的。
其次,Leader副本讀寫能確保數據的一致性,follower副本同步有一定的延時性,如果多個消費者從不同的follower讀取數據,無法保證數據的一致性。從leader副本消費時,有個高水位偏移量的限制,不能超過這個偏移量,如圖所示。
Leader副本以延時最大的Follower的消息偏移量為基準(高水位),作為消費端可消費的最大偏移量。
5.6. 消息回溯
消息寫入Broker后,會保存一段時間。在這段時間內,如果消費者導致消息丟失(比如消費者過早的提交了偏移量,發送故障,進行rebalance),可以通過發送seek指令到消費者,告訴其從具體的偏移量開始重新消費。
5.7. 小結
可靠性都是相對的,不能簡單的說"是"或者"否",在使用場景或者方式的提前下,最大程度提高可靠性。另外,如果一味強調可靠性,那么必定會損失其他方面的性能。
比如acks設置為all,由于Follower同步的延遲性,會影響吞吐量;設置Follower副本數過多,寫入時同步延遲增加,同時也會導致打開的文件句柄超限;分區選舉時,unclean.leader.election.enable設置為false,可能會導致無法讀寫;
6. 高吞吐
Kafka是用來做日志收集等大數據流式傳遞的工具,每秒處理幾十萬條數據,所以高吞吐是其天然的基因,以下從其設計架構上分析如何實現高吞吐。
6.1. 多分區
將主題的消息分成多個分區,并均衡負載到各個Broker節點上,通過橫向擴展Broker,增加分區的個數,就能大大提升了讀寫的吞吐。前面我們講過由于文件句柄的問題,單個Broker節點的分區數不能過多;另外分區過多,同步和選舉的時間也會拉長。那一個主題下分配多少分區是合適的呢,我們需要綜合考慮一下幾個因素:
1、主題的吞吐量
2、單個Broker包含分區數
3、單個分區讀寫的吞吐量
比如有10個生產者,每個每秒生產100MB的數據,也就是每秒總生產1GB的數據。單個Broker寫入數據200MB/s,平均分布4個Leader分區,也就是每個分區的寫入速度為50MB/s,不考慮帶寬的影響,如果不讓消息在產生端有積壓,需要20分區。再假設每個消費者讀取的速率是20MB/s,那就需要有50個消費者,也就是對應50個分區,才能讓50個消費者同時讀取。綜合評估下來,實現1GB/s的讀寫吞吐量,需要50個分區。
分區數建議提前規劃好,無法縮容,只能擴容,且擴容時會帶來分區以及消費者的rebalance,對于負載不均的情況,還需要手工調整,成本較高。
6.2. 順序寫入
Kafka采用的是追加的方式順序寫入數據,它并沒有像其他組件先寫入應用緩存,然后刷盤到磁盤,而是直接寫入到文件系統(僅依靠操作系統的頁面緩存)。
平時我們會產生一些定式思維,比如緩存的讀寫一定比磁盤的快,多線程一定比單線程性能高,實際上,這些都是有前提條件,或者場景約束的,順序磁盤寫入就比隨機內存寫入快,而Kafka正是順序寫入,另外操作系統的頁面緩存也提升了寫入速度,所以這個比不是Kafka高吞吐的瓶頸。Kafka這么設計目的是減少成本,內存的成本要遠遠大于磁盤的成本。
6.3. 零拷貝
所謂的零拷貝就是將數據從磁盤文件直接復制到網卡設備中,而不需要經由應用程序轉發。
從上圖可知,零拷貝可以節省兩次拷貝,通過DMA ( Direct Memory Access )技術將文件內容復制到內核模式下的ReadBuffer 中,沒有數據被復制到Socket Buffer,僅有包含數據的位置和長度的信息的文件描述符被加到Socket Buffer 中。零拷貝在內核模式下完成數據的傳遞,效率非常高。 Kafka為何能采用零拷貝,這是因為Kafka不對消息做任何處理,包括解析,拆包,序列化等,生產出的消息時什么樣子,讀取的時候也是這個樣子,僅做消息的搬運工。