Flink作業如何進行性能調優
概念說明及監控查看
- 消費組積壓
消費組積壓可通過topic最新數據offset減去該消費組已提交最大offset計算得出,說明的是該消費組當前待消費的數據總量。
如果Flink作業對接的是kafka專享版,則可通過云監控服務(CES)進行查看。具體可選擇“云服務監控 > 分布式消息服務 > kafka專享版” ,單擊“kafka實例名稱 > 消費組” ,選擇具體的消費組名稱,查看消費組的指標信息。
- 反壓狀態
反壓狀態是通過周期性對taskManager線程的棧信息采樣,計算被阻塞在請求輸出Buffer的線程比率來確定,默認情況下,比率在0.1以下為OK,0.1到0.5為LOW,超過0.5則為HIGH。
- 時延
Source端會周期性地發送帶當前時間戳的LatencyMarker,下游算子接收到該標記后,通過當前時間減去標記中帶的時間戳的方式,計算時延指標。算子的反壓狀態和時延可以通過Flink UI或者作業任務列表查看,一般情況下反壓和高時延成對出現:
性能分析
由于Flink的反壓機制,流作業在存在性能問題的情況下,會導致數據源消費速率跟不上生產速率,從而引起Kafka消費組的積壓。在這種情況下,可以通過算子的反壓和時延,確定算子的性能瓶頸點。
- 作業最后一個算子(Sink)反壓正常(綠色),前面算子反壓高(紅色)

該場景說明性能瓶頸點在sink,此時需要根據具體數據源具體優化,比如對于JDBC數據源,可以通過調整寫出批次(connector.write.flush.max-rows)、JDBC參數重寫(rewriteBatchedStatements=true)等進行優化。
- 作業非倒數第二個算子反壓高(紅色)

該場景說明性能瓶頸點在Vertex2算子,可以通過查看該算子描述,確認該算子具體功能,以進行下一步優化。
- 所有算子反壓都正常(綠色),但存在數據堆積

該場景說明性能瓶頸點在Source,主要是受數據讀取速度影響,此時可以通過增加Kafka分區數并增加source并發解決。
- 作業一個算子反壓高(紅色),而其后續的多個并行算子都不存在反壓(綠色)

該場景說明性能瓶頸在Vertex2或者Vertex3,為了進一步確定具體瓶頸點算子,可以在FlinkUI頁面開啟inPoolUsage監控。如果某個算子并發對應的inPoolUsage長時間為100%,則該算子大概率為性能瓶頸點,需分析該算子以進行下一步優化。
inPoolUsage監控

性能調優
rocksdb狀態調優
topN排序、窗口聚合計算以及流流join等都涉及大量的狀態操作,因而如果發現這類算子存在性能瓶頸,可以嘗試優化狀態操作的性能。主要可以嘗試通過如下方式優化:
1.增加狀態操作內存,降低磁盤IO
- 增加單slot cu資源數
- 配置優化參數:
-taskmanager.memory.managed.fraction=xx
-state.backend.rocksdb.block.cache-size=xx
-state.backend.rocksdb.writebuffer.size=xx
2.開啟微批模式,避免狀態頻繁操作
配置參數:
-
table.exec.mini-batch.enabled=true
-
table.exec.mini-batch.allow-latency=xx
-
table.exec.mini-batch.size=xx
3.使用超高IO本地盤規格機型,加速磁盤操作
group agg單點及數據傾斜調優
按天聚合計算或者group by key不均衡場景下,group聚合計算存在單點或者數據傾斜問題,此時,可以通過將聚合計算拆分成Local-Global進行優化。配置方式為設置調優參數: table.optimizer.aggphase-strategy=TWO_PHASE
count distinct優化
- 在count distinct關聯key比較稀疏場景下,即使使用Local-Global,單點問題依然非常嚴重,此時可以通過配置以下調優參數進行分桶拆分優化:
table.optimizer.distinct-agg.split.enabled=true
table.optimizer.distinct-agg.split.bucket-num=xx
- 使用filter替換case when:
例如:
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone')THEN user_id ELSE NULL END) AS app_uv
可調整為
COUNT(DISTINCT user_id) FILTER(WHERE flag IN ('android', 'iphone')) AS app_uv
維表join優化
維表join根據左表進入的每條記錄join關聯鍵,先在緩存中匹配,如果匹配不到,則從遠程拉取。因而,可以通過如下方式優化:
- 增加JVM內存并增加緩存記錄條數
- 維表設置索引,加快查詢速度
如何在一個Flink作業中將數據寫入到不同的Elasticsearch集群中?
在對應的Flink作業中添加如下SQL語句。
create source stream ssource(xx);
create sink stream es1(xx) with (xx);
create sink stream es2(xx) with (xx);
insert into es1 select * from ssource;
insert into es2 select * from ssource;
Flink作業重啟后,如何保證不丟失數據?
DLI Flink提供了完整可靠的Checkpoint/Savepoint機制,您可以利用該機制,保證在手動重啟或者作業異常重啟場景下,不丟失數據。
- 為了避免系統故障導致作業異常自動重啟后,丟失數據:
?對于Flink SQL作業,您可以勾選“開啟Checkpoint”,并合理配置Checkpoint間隔(權衡執行Checkpoint對業務性能的影響以及異常恢復的時長),同時勾選“異常自動重啟”,并勾選“從Checkpoint恢復”。配置后,作業異常重啟,會從最新成功的Checkpoint文件恢復內部狀態和消費位點,保證數據不丟失及聚合算子等內部狀態的精確一致語義。同時,為了保證數據不重復,建議使用帶主鍵數據庫或者文件系統作為目標數據源,否則下游處理業務需要加上去重邏輯(最新成功Checkpoint記錄位點到異常時間段內的數據會重復消費)。
?對于Flink Jar作業,在代碼中開啟Checkpoint,同時如果有自定義的狀態需要保存,您還需要實現ListCheckpointed接口,并為每個算子設置唯一ID。然后在作業配置中,勾選“從Checkpoint恢復”,并準確配置Checkpoint路徑。
說明Flink Checkpoint機制可以保證Flink平臺可感知內部狀態的精確一致,但對于自定義Source/Sink或者有狀態算子,需要合理實現ListCheckpointed接口,來保證業務數據需要的可靠性。
- 為了避免因業務修改等需要,手動重啟作業后,不丟失數據:
?對于無內部狀態的作業,您可以配置kafka數據源的啟動時間或者消費位點到作業停止之前。
?對于有內部狀態的作業,您可以在停止作業時,勾選“觸發保存點”。成功后,再次啟動作業時,開啟“恢復保存點”,作業將從選擇的savepoint文件中恢復消費位點及狀態。同時,由于Flink Checkpoint和Savepoint生成機制及格式一致,因而,也可以通過Flink作業列表“操作”列中的“更多”>“導入保存點”,導入OBS中最新成功的Checkpoint,并從中恢復。