適用場景
當您進行作業開發時,如果某些任務的參數有差異、但處理邏輯全部一致,在這種情況下您可以通過For Each算子避免重復開發作業。
For Each算子可指定一個子作業循環執行,并通過數據集對子作業中的參數進行循環替換。關鍵參數如下:
-
子作業:選擇需要循環執行的作業。
-
數據集:即不同子任務的參數值的集合。可以是給定的數據集,如“[['1'],['3'],['2']] ”;也可以是EL表達式如
#{Job.getNodeOutput('preNodeName')}即前一個節點的輸出值。
-
作業運行參數:參數名即子作業中定義的變量;參數值一般配置為數據集中的某組數據,每次運行中會將參數值傳遞到子作業以供使用。例如參數值填寫為:#{Loop.current[0]},即將數據集中每組數據的第一個數值遍歷傳遞給子作業。
For Each算子舉例如下圖所示。從圖中可以看出,子作業“foreach”中的參數名為“result”,參數值為一維數組數據集“[['1'],['3'],['2']] ”的遍歷(即第一次循環為1,第二次循環為3,第三次循環為2)。
for each算子


For Each算子與EL表達式
要想使用好For Each算子,您必須對EL表達式有所了解。EL表達式用法請參考表達式概述。
下面為您展示For Each算子常用的一些EL表達式。
- #{Loop.dataArray} :For循環算子輸入的數據集,是一個二維數組。
- #{Loop.current}:由于For循環算子在處理數據集的時候,是一行一行進行處理的,那Loop.current就表示當前處理到的某行數據,Loop.current是一個一維數組,一般定義格式為#{Loop.current[0]}、#{Loop.current[1]}或其它,0表示遍歷到當前行的第一個值。
- #{Loop.offset}:For循環算子在處理數據集時當前的偏移量,從0開始。
- #{Job.getNodeOutput('preNodeName')}:獲取前面節點的輸出。
使用案例
案例場景
因數據規整要求,需要周期性地將多組DLI源數據表數據導入到對應的DLI目的表,如下表所示。
需要導入的列表情況
| 源數據表名 | 目的表名 |
|---|---|
| a_new | a |
| b_2 | b |
| c_3 | c |
| d_1 | d |
| c_5 | e |
| b_1 | f |
如果通過SQL節點分別執行導入腳本,需要開發大量腳本和節點,導致重復性工作。在這種情況下,我們可以使用For Each算子進行循環作業,節省開發工作量。
配置方法
- 準備源表和目的表。為了便于后續作業運行驗證,需要先創建DLI源數據表和目的表,并給源數據表插入數據。
- 創建DLI表。您可以在DataArts Studio數據開發中,新建DLI SQL腳本執行以下SQL命令,也可以在數據湖探索(DLI)服務控制臺中的SQL編輯器中執行以下SQL命令:
/* 創建數據表 */
CREATE TABLE a_new (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE b_2 (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE c_3 (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE d_1 (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE c_5 (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE b_1 (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE a (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE b (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE c (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE d (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE e (name STRING, score INT) STORED AS PARQUET;
CREATE TABLE f (name STRING, score INT) STORED AS PARQUET;
- 給源數據表插入數據。您可以在DataArts Studio數據開發模塊中,新建DLI SQL腳本執行以下SQL命令,也可以在數據湖探索(DLI)服務控制臺中的SQL編輯器中執行以下SQL命令:
/* 源數據表插入數據 */
INSERT INTO a_new VALUES ('ZHAO','90'),('QIAN','88'),('SUN','93');
INSERT INTO b_2 VALUES ('LI','94'),('ZHOU','85');
INSERT INTO c_3 VALUES ('WU','79');
INSERT INTO d_1 VALUES ('ZHENG','87'),('WANG','97');
INSERT INTO c_5 VALUES ('FENG','83');
INSERT INTO b_1 VALUES ('CEHN','99');
- 準備數據集數據。您可以通過以下方式之一獲取數據集:
- 您可以將上表數據導入到DLI表中,然后將SQL腳本讀取的結果作為數據集。
- 您可以將上表數據保存在OBS的CSV文件中,然后通過DLI SQL或DWS SQL創建OBS外表關聯這個CSV文件,然后將OBS外表查詢的結果作為數據集。DLI創建外表請參見,DWS創建外表請參見。
- 您可以將上表數據保存在HDFS的CSV文件中,然后通過HIVE SQL創建Hive外表關聯這個CSV文件,然后將HIVE外表查詢的結果作為數據集。DLI創建外表請參見。
本例以方式1進行說明,將上表中的數據導入到DLI表(Table_List)中。您可以在DataArts Studio數據開發模塊中,新建DLI SQL腳本執行以下SQL命令導入數據,也可以在數據湖探索(DLI)服務控制臺中的SQL編輯器中執行以下SQL命令:
/* 創建數據表TABLE_LIST,然后插入表1數據,最后查看生成的表數據 */
CREATE TABLE Table_List (Source STRING, Destination STRING) STORED AS PARQUET;
INSERT INTO Table_List VALUES ('a_new','a'),('b_2','b'),('c_3','c'),('d_1','d'),('c_5','e'),('b_1','f');
SELECT * FROM Table_List;
生成的Table_List表數據如下:
詳見下圖:Table_List表數據


- 創建要循環運行的子作業ForeachDemo。在本次操作中,定義循環執行的是一個包含了DLI SQL節點的任務。
- 進入DataArts Studio數據開發模塊選擇“作業開發”頁面,新建作業ForeachDemo,然后選擇DLI SQL節點,編排下圖所示的作業。
DLI SQL的語句中把要替換的變量配成{}這種參數的形式。在下面的SQL語句中,所做的操作是把{Source}表中的數據全部導入{Destination}中,{fromTable}、${toTable} 就是要替換的變量參數。SQL語句為:
INSERT INTO **{Destination} select * from **{Source};
說明此處不能使用EL表達式#{Job.getParam("job_param_name")} ,因為此表達式只能直接獲取當前作業里配置的參數的value,并不能獲取到父作業傳遞過來的參數值,也不能獲取到工作空間里面配置的全局變量,作用域僅為本作業。
而表達式${job_param_name},既可以獲取到父作業傳遞過來的參數值,也可以獲取到全局配置的變量。
詳見下圖:循環執行子作業


- 配置完成SQL語句后,在子作業中配置作業參數。此處僅需要配置參數名,用于主作業ForeachDemo_master中的For Each節點識別子作業參數;參數值無需填寫。
詳見下圖:配置子作業參數


- 配置完成后保存作業。
- 創建For Each算子所在的主作業ForeachDemo_master。
- 進入DataArts Studio數據開發模塊選擇“作業開發”頁面,新建數據開發主作業ForeachDemo_master。選擇DLI SQL節點和For Each節點,選中連線圖標

并拖動,編排下圖所示的作業。
詳見下圖:編排作業


- 配置DLI SQL節點屬性,此處配置為SQL語句,語句內容如下所示。DLI SQL節點負責讀取DLI表Table_List中的內容作為數據集。
SELECT * FROM Table_List;
詳見下圖:DLI SQL節點配置


- 配置For Each節點屬性。
? 子作業:子作業選擇步驟2已經開發完成的子作業“ForeachDemo”。
? 數據集:數據集就是DLI SQL節點的Select語句的執行結果。使用EL表達式 #{Job.getNodeOutput('preDLI')} ,其中preDLI為前一個節點的名稱。
? 作業運行參數:用于將數據集中的數據傳遞到子作業以供使用。Source對應的是數據集Table_List表的第一列,Destination是第二列,所以配置的EL表達式分別為 #{Loop.current[0]} 、 #{Loop.current[1]} 。
詳見下圖:配置For Each算子


- 配置完成后保存作業。
- 測試運行主作業。
- 點擊主作業畫布上方的“測試運行”按鈕,測試作業運行情況。主作業運行后,會通過For Each節點自動調用運行子作業。
- 點擊左側導航欄中的“實例監控”,進入實例監控中查看作業運行情況。等待作業運行成功后,就能查看For Each節點生成的子作業實例,由于數據集中有6行數據,所以這里就對應產生了6個子作業實例。
詳見下圖:查看作業實例


- 查看對應的6個DLI目的表中是否已被插入預期的數據。您可以在DataArts Studio數據開發模塊中,新建DLI SQL腳本執行以下SQL命令導入數據,也可以在數據湖探索(DLI)服務控制臺中的SQL編輯器中執行以下SQL命令:
/* 查看表a數據,其他表數據請修改命令后運行 */
SELECT * FROM a;
將查詢到的表數據與給源數據表插入數據步驟中的數據進行對比,可以發現數據插入符合預期。
詳見下圖:目的表數據

