通過Kafka原理介紹的介紹,大家對kafka的原理也有了一定的了解了,那么我們怎么根據這些原理來更好的使用kafka呢?我們大家都知道世界上沒有銀彈,如果什么都去做就會什么都做不好,我們想要用好一個產品,就要對這個產品進行一些定制化的調整,下面分別就服務端、生產者、消費者三個部分給大家介紹一些對我們業務使用性能有影響的關鍵配置。
關鍵配置
- 優化性能我們有三板斧:批量,壓縮,異步
- 批量用來減少io次數
- 壓縮用來減少網絡流量
- 異步用來減少等待時間
服務端配置
|
配置 |
key |
默認值 |
|
最大能夠接收的消息體大小 |
1000012 |
|
|
最小的ISR數量 |
2 |
|
|
壓縮算法 |
producer |
|
|
服務器返回數據的最大值 |
52428800 |
|
|
等待請求響應的最長時間 |
30000 |
生產者配置
|
配置 |
key |
默認值 |
|
是否等待消息提交 |
1 |
|
|
壓縮 |
none |
|
|
消息收集數量 |
16384 |
|
|
消息收集時間 |
0 |
|
|
等待請求響應的最長時間 |
30000 |
|
|
send()返回時間上限 |
120000 |
|
|
重試次數(瞬時錯誤) |
0 |
|
|
發送未確認請求最大數量 |
5 |
|
|
緩沖內存 |
33554432 |
還記得前面介紹的isr的副本吧 ,主節點會將收到的消息復制到isr的副本中,那么怎么確認這個消息提交成功了呢。
- 生產者可以在延遲和持久性中, 決定是否等待消息提交. 這個可以在生產者中的acks配置設置。
- 可以選擇是否等待0, 1 or all(-1)個ISR副本反饋。(當只有少數ISR存活時,會存在ISR失效導致數據丟失問題,可以通過配置最小的ISR數量(transaction.state.log.min.isr)來減少問題)
- 端到端的塊壓縮功能,通過compression.type配置。
- 由于數據以壓縮格式存儲在代理上,有效提取的偏移量是壓縮的消息邊界。因此,對于壓縮數據,消耗的偏移量將每次提前一個壓縮消息。如果使用者發生故障,這可能會產生重復消費的副作用。
- 批量發送
- batch.size和linger.ms滿足任意條件就會觸發發送。
- 消息大小要小于上限max.message.bytes配置,否則實際消息大于服務端值的時候會發送失敗 。
- 通過batch.size(消息收集數量)和linger.ms(消息收集時間)配置。
- 等待請求響應的最長時間:request.timeout.ms。
- 調用send()返回后報告成功或失敗時間的上限:delivery.timeout.ms
- 這限制了記錄在發送之前將被延遲的總時間,等待來自代理的確認的時間(如果期望)以及允許可重發的發送失敗的時間。如果遇到不可恢復的錯誤,重試已用盡則生產者可能會報告未能早于此配置發送記錄。此配置的值應大于或等于request.timeout.ms和linger.ms的總和 。
- 調重試次數(瞬時錯誤):retries
- 如果設置大于零的值將導致客戶端重新發送其發送失敗并帶有潛在的瞬時錯誤的任何記錄。請注意,允許重試而不將max.in.flight.requests.per.connection設置為1可能會更改記錄的順序,因為如果將兩個批次發送到單個分區,并且第一個批次失敗并被重試,但是第二個批次成功,則第二批中的數據可能會首先出現。
- 另外請注意,如果由delivery.timeout.ms配置的超時在成功確認之前首先到期,則在重試次數用完之前,生產請求將失敗。用戶通常應該不設置此配置,而是使用delivery.timeout.ms來控制重試行為。
- 客戶端在阻塞之前將通過單個連接發送的未確認請求的最大數量:max.in.flight.requests.per.connection。
- 緩沖內存:buffer.memory
- 生產者可以用來緩沖等待發送到服務器的記錄的總內存字節。如果記錄的生產的速度超過了將記錄發送到服務器的速度,則生產者將阻塞max.block.ms,此后它將引發異常。
消費者配置
|
配置 |
key |
默認值 |
|
返回數據的最大值 |
52428800 |
|
|
每個分區返回數據的最大值 |
1048576 |
|
|
poll()返回的最大記錄數 |
500 |
|
|
poll()之間的最大延遲 |
300000 |
|
|
等待請求響應的最長時間 |
305000 |
|
|
自動提交開關 |
true |
|
|
自動提交時間間隔 |
5000 |
- 批量消費
- fetch.max.bytes,服務器一次返回數據的最大值(如果第一條拉取的記錄就大于該值,也是可以拉取的),客戶端版本小于0.10.2的時候要大于max.message.bytes配置 。
- replica.fetch.max.bytes,嘗試為每個分區獲取的消息的字節數(如果第一條拉取的記錄就大于該值,也是可以拉取的)。
- 一次調用poll()返回的最大記錄數: max.poll.records。
- 調用poll()之間的最大延遲:max.poll.interval.ms,
- 控制拉取下來的記錄的最大處理時間。
- 0.10.1版本之前,如果真實處理時間大于session.timeout.ms,則會觸發服務端協調器對于該消費組中其他消費者與分區之間對應關系的重新平衡。
- 0.10.1版本之后,kafka引入了單獨的心跳線程。
- 等待請求響應的最長時間:request.timeout.ms
- 要大于max.poll.interval.ms配置,否則有可能在客戶端處理完本輪數據,開始下一輪拉取之時的間隔大于request.timeout.ms時間,觸發連接超時。
- 自動提交
- 開關:enable.auto.commit 。
- 自動提交的時間間隔:auto.commit.interval.ms。
- 為true的時候,自動提交系統將在auto.commit.interval.ms 時間間隔后自動提交處理完的偏移量。
最佳實踐
生產者
- transaction.state.log.min.isr設置的比kafka服務器失效率大。
- compression.type在單個數據量大或者設置批量上傳數據時設置,推薦lz4。
- batch.size、 linger.ms生產者對吞吐量要求高,丟失容忍度高情況下配置
- 批量數據要比max.message.bytes 小(如果配置了compression.type,就是壓縮后的數據)。
- acks :
- 對數據傳輸速度要求高,丟失容忍度高的時候設置0,
- 對數據傳輸速度要求高,丟失容忍度低的時候設置1,
- 不接受數據丟失的時候設置all或者-1,
- delivery.timeout.ms需要大于linger.ms與request.timeout.ms之和。
注意:
- 我們知道isr數量影響消息持久性和可見性的一個重要因素,太小了會因為機器失效導致集群不可用,大了就需要更多的機器資源
- 有批量就有丟失,有異步就有丟失
消費者
- 拉取間隙:
- max.poll.interval.ms要小于request.timeout.ms(2.0版本之前)
- fetch.max.bytes、 replica.fetch.max.bytes需要比max.message.bytes 大(0.10.2版本之前)
- 批量拉取:
- max.poll.records要考慮批量數據的傳輸和客戶端處理時間小于max.poll.interval.ms和request.timeout.ms。
- 批量確認:
- enable.auto.commit、auto.commit.interval.ms高吞吐、接受數據重復消費的情況下設置,auto.commit.interval.ms需要小于max.poll.interval.ms 和request.timeout.ms 。
注意:
- fetch.max.bytes、 replica.fetch.max.bytes在0.10.2版本之前需要比max.message.bytes 大,否則會引發消息讀取失敗和服務器間分區數據同步失敗。
消息順序性
消息組件有很重要的兩個功能:消息的順序性和消息的不丟失性,kafka什么配置可以做到這兩點呢?
- 發送消息的時候根據業務id來指定要發送到的分區,可以保證該id消息順序性。
- max.in.flight.requests.per.connection設置為1。
消息不丟失(最少一次)
- 生產者
- transaction.state.log.min.isr設置為n/2+1。
- batch.size設置為1。
- acks 設置為all或者-1。
- retries設置為Integer.MAX_VALUE。(重試時間超出delivery.timeout.ms時候還是會丟失)
- 消費者
- enable.auto.commit 設置為false。
- 每次處理完數據之后在提交確認偏移量。
注意:
- 有批量,必存在丟失的可能性
參考文獻
- 《kafka中文手冊》- 構架設計
- Efficient data transfer through zero copy
- Linux 內核的文件 Cache 管理機制介紹
- kafka documentation
- 消息中間件部署及比較
- Kafka Consumers: Reading Data from Kafka
- Getting Started with the New Apache Kafka 0.9 Consumer Client Design
- Kafka Kafka 設計解析(四):Kafka Consumer 解析
- kafka中的ISR、AR又代表什么?ISR伸縮又是什么?