1. 總覽

什麼是 Dataflow?
Dataflow 是可執行各種資料處理模式的代管服務。此網站上的文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的說明。
Apache Beam SDK 是開放原始碼程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行管道。Apache Beam 說明文件為 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。
快速進行串流資料分析
Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。
簡化營運和管理工作
Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。
減少總持有成本
Dataflow 同時擁有自動調度資源功能和具絕佳成本效益的批次處理功能,可提供幾近無限的容量,讓您有效管理季節性與激增的工作負載,而不必擔心超支。
主要功能
自動管理資源及動態重新平衡工作
Dataflow 可自動佈建及管理資源處理作業,盡可能地縮短延遲時間及提升使用率,讓您不必以手動方式啟動或保留執行個體。此外,Dataflow 也會自動分割工作,並將這項作業最佳化,藉此動態重新平衡延遲的工作。您不必去找「快速鍵」,也不用再預先處理輸入資料。
自動水平調度資源
自動水平調度工作站的資源,以更優異的整體性價比達到最佳總處理量。
批次處理的彈性資源排程定價
針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並保證在六小時內擷取出來,進入執行階段。
本教學課程改編自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
課程內容
- 如何使用 Java SDK 建立含有 Apache Beam 的 Maven 專案
- 使用 Google Cloud Platform Console 執行範例管道
- 如何刪除相關聯的 Cloud Storage bucket 和當中內容
軟硬體需求
您會如何使用本教學課程?
您對使用 Google Cloud Platform 服務的體驗有何評價?
2. 設定和需求
自修實驗室環境設定
請記住專案 ID,這是所有 Google Cloud 專案中不重複的名稱 (上述名稱已遭占用,因此不適用於您,抱歉!)。本程式碼研究室稍後會將其稱為 PROJECT_ID。
- 接著,您必須在 Cloud 控制台中啟用帳單,才能使用 Google Cloud 資源。
完成本程式碼研究室的費用應該不高,甚至完全免費。請務必按照「清除」部分的指示操作,瞭解如何停用資源,避免在本教學課程結束後繼續產生帳單費用。Google Cloud 新使用者可參加價值$300 美元的免費試用計畫。
啟用 API
按一下畫面左上方的「選單」圖示。

從下拉式選單中選取「API 和服務」>「資訊主頁」。

選取「+ 啟用 API 和服務」。

在搜尋框中搜尋「Compute Engine」。在隨即顯示的結果清單中,按一下「Compute Engine API」。

在 Google Compute Engine 頁面中,按一下「啟用」

啟用後,按一下箭頭即可返回。
現在請搜尋下列 API 並啟用:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. 建立新的 Cloud Storage bucket
在 Google Cloud Platform Console 中,按一下畫面左上方的「選單」圖示:

向下捲動,然後在「Storage」子區段中選取「Cloud Storage」>「Browser」:

現在應該會看到 Cloud Storage 瀏覽器,如果您使用的專案目前沒有任何 Cloud Storage bucket,系統會邀請您建立新的 bucket。按下「建立值區」按鈕即可建立值區:

輸入 bucket 的名稱。如對話方塊所述,Cloud Storage 中所有值區的名稱皆不得重複。因此,如果您選擇「test」等顯而易見的名稱,可能會發現其他人已建立同名 bucket,並收到錯誤訊息。
此外,bucket 名稱也須遵守某些字元規則。只要值區名稱的開頭和結尾是英文字母或數字,中間只使用連字號,就沒問題。如果您嘗試使用特殊字元,或嘗試以英文字母或數字以外的字元做為值區名稱的開頭或結尾,對話方塊會提醒您相關規則。

輸入 bucket 的專屬名稱,然後按一下「建立」。如果選擇的名稱已在使用中,系統會顯示上方的錯誤訊息。成功建立 bucket 後,瀏覽器會顯示新的空白 bucket:

當然,您看到的值區名稱會有所不同,因為所有專案的值區名稱都不得重複。
4. 啟動 Cloud Shell
啟用 Cloud Shell
- 在 Cloud 控制台,點選「啟用 Cloud Shell」 圖示
。
如果您是首次啟動 Cloud Shell,系統會顯示中繼畫面 (摺疊式螢幕下方),說明這個指令列環境。點選「繼續」後,這則訊息日後就不會再出現。以下是這個初次畫面的樣子:
佈建並連至 Cloud Shell 預計只需要幾分鐘。
這部虛擬機器搭載各種您需要的開發工具,並提供永久的 5GB 主目錄,而且可在 Google Cloud 運作,大幅提升網路效能並強化驗證功能。本程式碼研究室幾乎所有工作都可在瀏覽器或 Chromebook 上完成。
連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。
- 在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list
指令輸出
Credentialed Accounts
ACTIVE ACCOUNT
* <my_account>@<my_domain.com>
To set the active account, run:
$ gcloud config set account `ACCOUNT`
gcloud config list project
指令輸出
[core] project = <PROJECT_ID>
如未設定,請輸入下列指令手動設定專案:
gcloud config set project <PROJECT_ID>
指令輸出
Updated property [core/project].
5. 建立 Maven 專案
啟動 Cloud Shell 後,請使用 Apache Beam 適用的 Java SDK 建立 Maven 專案,開始進行操作。
Apache Beam 是用於資料管道的開放原始碼程式設計模型。您可以使用 Apache Beam 程式來定義這些管道,並選擇 Dataflow 等執行器來執行管道。
在殼層中執行 mvn archetype:generate 指令,如下所示:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
執行指令後,您應該會在目前的目錄下看到名為 first-dataflow 的新目錄。包含 Maven 專案,其中包含 Java 適用的 Cloud Dataflow SDK 和範例管道。first-dataflow
6. 在 Cloud Dataflow 上執行文字處理管道
首先,請將專案 ID 和 Cloud Storage 值區名稱儲存為環境變數。您可以在 Cloud Shell 中執行此操作。請務必將 <your_project_id> 替換為您自己的專案 ID。
export PROJECT_ID=<your_project_id>
現在我們將對 Cloud Storage bucket 執行相同操作。請記得將 <your_bucket_name> 替換為您在先前步驟中建立值區時使用的專屬名稱。
export BUCKET_NAME=<your_bucket_name>
切換至 first-dataflow/ 目錄。
cd first-dataflow
我們將執行名為 WordCount 的管道,這個管道會讀取文字、將文字行代碼化為個別字詞,然後計算每個字詞出現的頻率。首先,我們會執行管道,並在執行期間查看每個步驟的運作情形。
在殼層或終端機視窗中執行 mvn compile exec:java 指令,啟動管道。對於 --project, --stagingLocation, 和 --output 引數,下列指令會參照您在本步驟稍早設定的環境變數。
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
工作執行期間,請在工作清單中找出該工作。
在 Google Cloud Platform Console 中開啟 Cloud Dataflow 網頁版 UI。畫面會顯示 wordcount 工作,狀態為「執行中」:

現在來看看管道參數。首先,請按一下工作名稱:

選取工作後,您可以查看執行圖。管道的執行圖會以包含轉換名稱和部分狀態資訊的方塊,呈現管道中的各個轉換。點選各步驟右上角的尖角,即可查看詳細資訊:

現在來看看管道在每個步驟中轉換資料的方式:
- 讀取:在這個步驟中,管道會從輸入來源讀取資料。在本例中,這是 Cloud Storage 中的文字檔,內含莎士比亞戲劇《李爾王》全文。管道會逐行讀取檔案,並輸出每個
PCollection,其中文字檔案中的每一行都是集合中的元素。 - CountWords:
CountWords步驟包含兩個部分。首先,它會使用名為ExtractWords的平行 do 函式 (ParDo),將每行代碼化為個別字詞。ExtractWords 的輸出內容是新的 PCollection,其中每個元素都是一個字。下一個步驟Count會使用 Java SDK 提供的轉換作業,傳回鍵/值組合,其中鍵是獨一無二的字詞,值則是出現次數。以下是實作CountWords的方法,您可以在 GitHub 上查看完整的 WordCount.java 檔案:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements:這會叫用下方複製的
FormatAsTextFn,將每個鍵/值組合格式化為可列印的字串。
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts:在這個步驟中,我們會將可列印的字串寫入多個分片文字檔案。
我們會在幾分鐘後查看管道產生的輸出內容。
現在請查看圖表右側的「Job info」頁面,其中包含我們在 mvn compile exec:java 指令中加入的管道參數。


您也可以查看管道的自訂計數器,在本例中,這會顯示執行期間目前遇到的空白行數。您可以在管道中新增計數器,追蹤應用程式專屬指標。

您可以點選控制台底部的「記錄」圖示,查看具體的錯誤訊息。

面板預設會顯示回報整體工作狀態的「Job Log」訊息。您可以使用「最低嚴重性」選取器,篩選工作進度和狀態訊息。

選取圖表中的管道步驟,即可查看程式碼產生的記錄,以及在該管道步驟中執行的所產生程式碼。
如要返回「Job Logs」(工作記錄),請按一下圖表以外的地方,或使用右側面板中的「Close」(關閉) 按鈕來取消選取步驟。
您可以使用記錄分頁中的「工作站記錄」按鈕,查看執行管道的 Compute Engine 執行個體工作站記錄。工作站記錄包含程式碼產生的記錄行,以及執行程式碼時產生的 Dataflow 程式碼。
如果您嘗試對管道中的失敗進行偵錯,通常「工作人員記錄」中會有額外的記錄,有助於解決問題。請注意,這些記錄是匯總所有工作站的記錄,可以篩選及搜尋。

Dataflow 監控介面只會顯示最近的記錄訊息。如要查看所有記錄,請按一下記錄窗格右側的 Google Cloud Observability 連結。

以下摘要說明可在「Monitoring」(監控) →「Logs」(記錄) 頁面中,查看的各種記錄類型:
- job-message 記錄包含 Dataflow 各種元件產生的工作層級訊息,例如自動調度資源設定、工作站或關閉時間、工作步驟進度以及工作錯誤。源於當機使用者程式碼的工作站等級錯誤以及存在於 worker 記錄裡的工作站等級錯誤,也會一併傳播至 job-message 記錄。
- worker 記錄由 Dataflow 工作站產生。工作站負責執行大部分的管道工作,像是將 ParDo 套用到資料。worker 記錄包含由您的程式碼和 Dataflow 記錄的訊息。
- worker-startup 記錄存在於大部分的 Dataflow 工作中,可擷取與啟動程序相關的訊息。啟動程序包括從 Cloud Storage 下載工作的 Jar,接著啟動工作站。如果啟動工作站時發生問題,可以查看這類記錄。
- shuffler 記錄包含來自合併平行管道作業結果的工作站訊息。
- docker 和 kubelet 記錄包含與 Dataflow 工作站使用的公開技術相關的訊息。
在下一個步驟中,我們會檢查工作是否成功。
7. 確認工作是否成功
在 Google Cloud Platform Console 中開啟 Cloud Dataflow 網頁版 UI。
畫面會先後顯示 wordcount 工作的「Status」(狀態) 為「Running」(執行中) 和「Succeeded」(成功):

這項工作約需 3 到 4 分鐘才能執行完畢。
還記得您執行管道並指定輸出值 bucket 嗎?現在來看看結果 (因為您一定很想知道《李爾王》中每個字詞的出現次數!)。返回 Google Cloud Platform Console 中的 Cloud Storage 瀏覽器。在您的值區中,您應該會看見工作所建立的輸出檔案和暫存檔案:

8. 關閉資源
您可以透過 Google Cloud Platform Console 關閉資源。
在 Google Cloud Platform 主控台中開啟 Cloud Storage 瀏覽器。

勾選您建立的值區旁的核取方塊,然後按一下「DELETE」(刪除),即可永久刪除值區及其內容。


9. 恭喜!
您已瞭解如何使用 Cloud Dataflow SDK 建立 Maven 專案、使用 Google Cloud Platform 控制台執行範例管道,以及刪除相關聯的 Cloud Storage 值區及其內容。
瞭解詳情
- Dataflow 說明文件:https://cloud.google.com/dataflow/docs/
授權
這項內容採用的授權為創用 CC 姓名標示 3.0 通用授權和 Apache 2.0 授權。