用戶可以基于Flink的API進行二次開發,構建自己的應用Jar包并提交到DLI的隊列運行,DLI完全兼容開源社區接口。此功能需要用戶自己編寫并構建應用Jar包,適合對Flink二次開發有一定了解,并對流計算處理復雜度要求較高的用戶。
前提條件
- 確保已創建獨享隊列。創建DLI獨享隊列,在購買隊列時,勾選“專屬資源模式”即可。
- 創建Flink Jar作業,訪問其他外部數據源時,如訪問OpenTSDB、HBase、Kafka、DWS、RDS、CSS、CloudTable、DCS Redis、DDS Mongo等,需要先創建跨源連接,打通作業運行隊列到外部數據源之間的網絡。
當前Flink作業支持訪問的外部數據源詳情請參考跨源連接和跨源分析概述。
創建跨源連接操作請參見增強型跨源連接。
創建完跨源連接后,可以通過“隊列管理”頁面,單擊“操作”列“更多”中的“測試地址連通性”,驗證隊列到外部數據源之間的網絡連通是否正常。詳細操作可以參考 測試地址連通性。
- 用戶運行Flink Jar作業時,需要將二次開發的應用代碼構建為Jar包,上傳到已經創建的OBS桶中。并在DLI“數據管理”>“程序包管理”頁面創建程序包,具體請參考 創建程序包。
說明DLI不支持下載功能,如果需要更新已上傳的數據文件,可以將本地文件更新后重新上傳。
- 由于DLI服務端已經內置了Flink的依賴包,并且基于開源社區版本做了安全加固。為了避免依賴包兼容性問題或日志輸出及轉儲問題,打包時請注意排除以下文件:
a.系統內置的依賴包,或者在Maven或者Sbt構建工具中將scope設為provided
b.日志配置文件(例如:“log4j.properties”或者“logback.xml”等)
c.日志輸出實現類JAR包(例如:log4j等)
創建Flink Jar作業
1.在DLI管理控制臺的左側導航欄中,單擊“作業管理”>“Flink作業”,進入“Flink作業”頁面。
2.在“Flink作業”頁面右上角單擊“新建作業”,彈出“新建作業”對話框。
3.配置作業信息。
詳見下表:作業配置信息
| 參數 | 參數說明 |
|---|---|
| 類型 | 選擇Flink Jar。 |
| 名稱 | 作業名稱,只能由英文、中文、數字、中劃線和下劃線組成,并且長度為1~57字節。 說明 作業名稱必須是唯一的。 |
| 描述 | 作業的相關描述,且長度為0~512字節。 |
4.單擊“確定”,進入“編輯”頁面。
5.選擇隊列。Flink Jar作業只能運行在獨享隊列上。
說明
Flink Jar作業只能運行在預先創建的獨享隊列上。
如果“所屬隊列”下拉框中無可用的獨享隊列,請先創建一個獨享隊列并將該隊列綁定到當前用戶
6.配置Flink Jar作業參數。
詳見下表:參數說明
| 名稱 | 描述 |
|---|---|
| 所屬隊列 | 默認選擇“共享隊列”,可以按需選擇自定義的隊列。 |
| 應用程序 | 用戶自定義的程序包。在選擇程序包之前需要將對應的Jar包上傳至OBS桶中,并在“數據管理>程序包管理”中創建程序包,具體操作請參考創建程序包。內置依賴包請參考數據管理-程序包管理-內置依賴包中的Flink 1.7.2依賴包和Flink 1.10依賴包。 |
| 主類 | 指定加載的Jar包類名,如KafkaMessageStreaming。 默認:根據Jar包文件的Manifest文件指定。 指定:必須輸入“類名”并確定類參數列表(參數間用空格分隔)。 說明 當類屬于某個包時,主類路徑需要包含完整包路徑,例如:packagePath.KafkaMessageStreaming |
| 參數 | 指定類的參數列表,參數之間使用空格分隔。 Flink參數支持全局變量替換。例如,在“全局配置”>“全局變量”中新增全局變量windowsize,Flink Jar作業就可以添加參數-windowsSize {{windowsize}}。 |
| 依賴jar包 | 用戶自定義的依賴程序包。依賴的相關程序包將會被放置到集群classpath下。 在選擇程序包之前需要將對應的Jar包上傳至OBS桶中,并在“數據管理>程序包管理”中創建程序包,包類型選擇“jar”。具體操作請參考創建程序包。 內置依賴包請參考數據管理-程序包管理-內置依賴包中的Flink 1.7.2依賴包和Flink 1.10依賴包。 |
| 其他依賴文件 | 用戶自定義的依賴文件。其他依賴文件需要自行在代碼中引用。 在選擇依賴文件之前需要將對應的文件上傳至OBS桶中,并在“數據管理>程序包管理”中創建程序包,包類型沒有限制。具體操作請參考創建程序包。 通過在應用程序中添加以下內容可訪問對應的依賴文件。其中,“fileName”為需要訪問的文件名,“ClassName”為需要訪問該文件的類名。 ClassName.class.getClassLoader().getResource("userData/fileName") |
| Flink版本 | 選擇Flink版本前,需要先選擇所屬的隊列。 |
| 優化參數 | 用戶自定義的優化參數。參數格式為key=value。 Flink優化參數支持全局變量替換。例如,在“全局配置”>“全局變量”中新增全局變量phase,Flink Jar作業就可以添加優化參數table.optimizer.agg-phase.strategy={{phase}}。 |
7.配置作業參數。
詳見下表:參數說明
| 名稱 | 描述 |
|---|---|
| CU數量 | 一個CU為1核4G的資源量。CU數量范圍為2~400個。 |
| 管理單元 | 設置管理單元的CU數,支持設置1~4個CU,默認值為1個CU。 |
| 并行數 | 作業中每個算子的最大并行數。 說明 并行數不能大于計算單元(CU數量-管理單元CU數量)的4倍。 并行數最好大于用戶作業里設置的并發數,否則有可能提交失敗。 |
| TaskManager配置 | 用于設置TaskManager資源參數。 勾選后需配置下列參數: “單TM所占CU數”:每個TaskManager占用的資源數量。 “單TM Slot”:每個TaskManager包含的Slot數量。 |
| 保存作業日志 | 設置是否將作業運行時的日志信息保存到OBS桶。 注意 該參數建議勾選,否則作業運行完成后不會生成運行日志,后續如果作業運行異常則無法獲取運行日志進行定位。 勾選后需配置下列參數: “OBS 桶” :選擇OBS桶用于保存用戶作業日志信息。如果選擇的OBS桶是未授權狀態,需要單擊“OBS授權”。 |
| 作業異常告警 | 設置是否將作業異常告警信息,如作業出現運行異常或者欠費情況,以SMN的方式通知用戶。 勾選后需配置下列參數: “SMN主題”: |
| 異常自動重啟 | 設置是否啟動異常自動重啟功能,當作業異常時將自動重啟并恢復作業。 勾選后需配置下列參數: “異常重試最大次數”:配置異常重試最大次數。單位為“次/小時”。 ? 無限:無限次重試。 ? 有限:自定義重試次數。 “從Checkpoint恢復”:從保存的checkpoint恢復作業。 勾選該參數后,還需要選擇“Checkpoint路徑”。 “Checkpoint路徑”:選擇checkpoint保存路徑。必須和應用程序中配置的Checkpoint地址相對應。且不同作業的路徑不可一致,否則無法獲取準確的Checkpoint。 |
8.單擊右上角“保存”,保存作業和相關參數。
9.單擊右上角“啟動”,進入“啟動Flink作業”頁面,確認作業規格,單擊“立即啟動”,啟動作業。
啟動作業后,系統將自動跳轉到Flink作業管理頁面,新創建的作業將顯示在作業列表中,在“狀態”列中可以查看作業狀態。作業提交成功后,狀態將由“提交中”變為“運行中”。運行完成后顯示“已完成”。
如果作業狀態為“提交失敗”或“運行異常”,表示作業提交或運行失敗。用戶可以在作業列表中的“狀態”列中,將鼠標移動到狀態圖標上查看錯誤信息,單擊
可以復制錯誤信息。根據錯誤信息解決故障后,重新提交。
說明其他功能按鈕說明如下:
另存為:將新建作業另存為一個新作業。