1. 體系結構概述
Kafka主題用于對記錄進行組織。記錄由生產者生成,由消費者使用。生產者將記錄發送到 Kafka 代理,后者存儲數據。
主題跨代理對記錄進行分區。在使用記錄時,每個分區最多可使用一個消費者來實現數據并行處理。
復制用于在節點之間復制分區。這可以防止節點(代理)發生服務中斷。將副本組之間的單個分區指定為分區領導者。生產者流量將根據由ZooKeeper管理的狀態路由到每個節點的領導者。
2. 調優方案選擇
Kafka性能體現在兩個主要方面:吞吐量和延遲。吞吐量是指數據的最大處理速率,通常吞吐量越高越好。延遲是指存儲或檢索數據所花費的時間。通常,延遲越低越好。在吞吐量、延遲和應用基礎結構的開銷方面找到適當的平衡可能會有難度。
根據追求的是高吞吐量、低延遲還是此兩者,性能要求可能符合以下三種常見情況中的一種:
- 高吞吐量,低延遲。此方案要求同時滿足高吞吐量和低延遲(大約100毫秒)。服務可用性監視就是這種應用場景的一個例子。
- 高吞吐量,高延遲。此方案要求滿足高吞吐量(大約1.5 GBps),但可以容許較高的延遲(< 250 毫秒)。這種應用場景的一個例子是引入遙測數據進行近實時的處理,例如安全與入侵檢測應用程序。
- 低吞吐量,低延遲。此方案要求提供低延遲(< 10毫秒)以完成實時處理,但可以容許較低的吞吐量。在線拼寫和語法檢查就是這種應用場景的一個例子。
3. 生產者配置優化
以下部分重點介紹一些用于優化 Kafka 生產者性能的最重要通用配置屬性。
批大小
Kafka生產者將作為一個單元發送的消息組(稱為批)匯編到一起,以將其存儲在單個存儲分區中。批大小表示在傳輸該組之前必須達到的字節數。增大 batch.size 參數可以提高吞吐量,因為這可以降低網絡和 IO 請求的處理開銷。負載較輕時,增大批大小可能會增大 Kafka 發送延遲,因為生產者需要等待某個批準備就緒。負載較重時,建議增大批大小以改善吞吐量和延遲。
生產者確認參數
生產者所需的acks 配置確定在將某個寫入請求視為已完成之前,分區領先者所需的確認數目。此設置會影響數據可靠性,其值為 0、1 或 -1。值 -1 表示必須收到所有副本的確認,才能將寫入請求視為已完成。設置 acks = -1 能夠更可靠地保證數據不會丟失,但同時也會導致延遲增大,吞吐量降低。如果應用場景要求提供較高的吞吐量,請嘗試設置 acks = 0 或 acks = 1。請記住,不確認所有副本可能會降低數據可靠性。
壓縮
可將Kafka生產者配置為先壓縮消息,然后再將消息發送到代理。compression.type 設置指定要使用的壓縮編解碼器。支持的壓縮編解碼器為“gzip”、“snappy”和“lz4”。如果磁盤容量存在限制,則壓縮是有利的做法,應予以考慮。
在 gzip 和 snappy 這兩個常用的壓縮編解碼器中,gzip 的壓縮率更高,它可以降低磁盤用量,但代價是使 CPU 負載升高。snappy 編解碼器的壓縮率更低,但造成的 CPU 開銷更低。可以根據代理磁盤或生產者的 CPU 限制來決定使用哪個編解碼器。gzip 數據壓縮率比 snappy 高 5 倍。
使用數據壓縮會增加磁盤中可存儲的記錄數。如果生產者與代理使用的壓縮格式不匹配,則數據壓縮也會增大CPU開銷。因為數據在發送之前必須經過壓縮,并在處理之前經過解壓縮。
4. 代理設置
以下部分重點介紹一些用于優化 Kafka 代理性能的最重要設置。
磁盤數目
存儲磁盤的 IOPS(每秒輸入/輸出操作次數)和每秒讀/寫字節數有限制。創建新分區時,Kafka 會將每個新分區存儲在現有分區最少的磁盤上,以便在可用磁盤之間平衡分區的數目。盡管有存儲策略進行調節,但在處理每個磁盤上的數百個分區副本時,Kafka 很容易就會使可用磁盤吞吐量達到飽和。此時,需要在吞吐量與成本之間進行取舍。如果應用場景需要更大的吞吐量,請創建一個可為每個代理提供更多托管磁盤的群集。
主題和分區的數目
Kafka生產者將寫入主題。Kafka 消費者讀取主題。主題與日志相關聯,該日志是磁盤上的數據結構。Kafka 將生產者中的記錄追加到主題日志的末尾。主題日志包括分散在多個文件之間的多個分區。而這些文件又分散在多個 Kafka 群集節點之間。消費者可以按照自己的節奏從 Kafka 主題中讀取內容,并可以在主題日志中選擇自己的位置(偏移)。
每個 Kafka 分區是在系統上的一個日志文件,生產者線程可以同時寫入到多個日志。同樣,由于每個消費者線程從一個分區讀取消息,因此也能并行處理從多個分區使用消息的操作。
提高分區密度(每個代理的分區數)會增大與元數據操作以及每個分區領先者及其后繼者之間的分區請求/響應相關的開銷。即使不存在流動的數據,分區副本也仍會從領先者提取數據,導致需要通過網絡額外處理發送和接收請求。
對于 HDInsight 中的 Apache Kafka 群集 2.1 和 2.4 以及更高版本,我們建議最多為每個代理提供 2000 個分區(包括副本)。增加每個代理的分區數會降低吞吐量,并可能導致主題不可用。
副本數
較高的復制因子會導致分區領先者與后繼者之間的請求數增加。因而,較高的復制因子會消耗更多的磁盤和CPU來處理額外的請求,并增大寫入延遲,降低吞吐量。
我們建議對Azure HDInsight 中的 Kafka 至少使用 3 倍的復制因子。大部分 Azure 區域有三個容錯域。在只有兩個容錯域的區域中,用戶應使用 4 倍復制因子。
5. 消費者配置
以下部分重點介紹一些用于優化 Kafka 消費者性能的最重要通用配置屬性。
消費者數量
良好的做法是讓分區數量等于消費者數量。如果消費者數量小于分區數量,則少數消費者將從多個分區讀取數據,從而增大消費者延遲。
如果消費者數量大于分區數量,則會浪費消費者資源,因為這些消費者將處于空閑狀態。
避免頻繁進行消費者重新平衡
發生分區所有權更改(即消費者橫向擴展或縱向縮減)、中介崩潰(因為中介是消費者組的組協調者)、消費者崩潰、添加新主題或添加新分區時,會觸發消費者重新平衡。在重新平衡期間,消費者無法使用,因此會增大延遲。
如果消費者可以在session.timeout.ms 內向中介發送檢測信號,則認為消費者是存活的。否則,認為消費者已消亡或有故障。這會導致消費者重新平衡。消費者的 session.timeout.ms 越低,我們檢測到這些故障的速度就越快。
如果session.timeout.ms 太低,消費者可能會由于某些情況(例如,處理一批消息花費了較長時間,或 JVM GC 暫停時間太長)而遇到重復的不必要重新平衡。如果你的某個消費者花費了過多時間處理消息,你可以通過以下方法解決此問題:使用 max.poll.interval.ms 提高消費者在獲取更多記錄之前可以保持空閑狀態的時長上限,或使用配置參數 max.poll.records 減小返回的最大批大小。
批處理
與生產者一樣,可為消費者添加批處理。可以通過更改配置fetch.min.bytes,來配置消費者在每個提取請求中可獲取的數據量。該參數定義了消費者提取響應中預期的最小字節數。增大此值會減少對中介發出的提取請求數量,從而降低額外的開銷。默認情況下,此值為 1。類似地,還有另一個配置 fetch.max.wait.ms。如果提取請求沒有足夠的消息(根據 fetch.min.bytes 的大小),它將等待基于配置 fetch.max.wait.ms 的等待時間過期。
注意在少數情況下,當消費者無法處理消息時,它看起來就很緩慢。如果在發生異常后你不提交偏移量,消費者將停滯在無限循環中的特定偏移量而不會前進,從而增大消費者端的滯后時間。
6. Linux OS 優化
以下部分重點介紹一些用于優化linux OS 的最重要通用配置屬性。
內存映射
vm.max_map_count 定義了進程可以擁有的最大mmap 數量。默認情況下,在 HDInsight Apache Kafka 群集 Linux VM 上,該值為 65535。
在Kafka 中,每個日志段需要一對 index/timeindex 文件,其中每個文件消耗 1 個 mmap。也就是說,每個日志段使用 2 個 mmap。因此,如果每個分區托管一個日志段,則該分區至少需要 2 個 mmap。每個分區的日志段數取決于段大小、負載強度、保留策略和滾動期,該數量通常大于 1。
Mmap value = 2 ((partition size)/(segment size))*(partitions)
如果所需的mmap 值超過 vm.max_map_count,中介將引發“映射失敗”異常。
為避免此異常,請使用以下命令檢查VM中 mmap 的大小,并根據需要在每個工作器節點上增大該大小。
# command to find number of index files:
find . -name '*index' | wc -l
# command to view vm.max_map_count for a process:
cat /proc/[kafka-pid]/maps | wc -l
# command to set the limit of vm.max_map_count:
sysctl -w vm.max_map_count=<new_mmap_value>
# This will make sure value remains, even after vm is rebooted:
echo 'vm.max_map_count=<new_mmap_value>' >> /etc/sysctl.conf
sysctl -p
注意請注意不要將此值設置得過高,因為這會占用 VM 上的內存。通過設置 MaxDirectMemory 來確定 JVM 可以在內存映射中使用的內存量。默認值為 64MB。有可能會達到此限制。可以通過 Ambari 將 -XX:MaxDirectMemorySize=amount of memory used 添加到 JVM 設置來增大此值。需要知道節點上正在使用的內存量,以及是否有足夠的可用 RAM 來支持增大此值。