1. 總覽
什麼是 Dataflow?
Dataflow 是可執行各種資料處理模式的代管服務。此網站上的說明文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的指示。
Apache Beam SDK 是開放原始碼的程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行這些管道。Apache Beam 說明文件針對 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。
高速進行串流資料分析
Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。
簡化營運和管理作業
Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。
降低總持有成本
Dataflow 同時提供自動調度資源功能和成本效益最佳化的批次處理功能,可提供近乎無限的容量,讓您管理季節性與激增的工作負載,而不必擔心超支。
主要特色
自動管理資源及動態重新平衡工作
Dataflow 可自動佈建及管理處理資源,盡量縮短延遲時間和增加使用率,讓您不必手動啟動或保留執行個體。另外,工作分區也會自動化及最佳化,以動態地重新平衡延遲的工作。不必再去找「快速鍵」或預先處理輸入資料
水平自動調度資源
自動水平調度工作站的資源,以更優異的整體性價比達到最佳處理量。
針對批次處理提供彈性的資源排程定價
針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),您可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並在六小時內擷取出來,進入執行階段。
本教學課程沿用自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven
課程內容
- 如何使用 Java SDK 透過 Apache Beam 建立 Maven 專案
- 使用 Google Cloud Platform 主控台執行範例管道
- 如何刪除相關聯的 Cloud Storage 值區及其內容
軟硬體需求
您會如何使用這個教學課程?
根據您使用 Google Cloud Platform 服務的經驗,您會給予什麼評價?
2. 設定和需求
自修環境設定
提醒您,專案 ID 是所有 Google Cloud 專案的專屬名稱 (已經有人使用上述名稱,很抱歉對您不符!)。稍後在本程式碼研究室中會稱為 PROJECT_ID
。
- 接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。
執行這個程式碼研究室並不會產生任何費用,如果有的話。請務必依照「清除所用資源」一節指示本節將說明如何關閉資源,這樣您就不會產生本教學課程結束後產生的費用。Google Cloud 的新使用者符合 $300 美元免費試用計畫的資格。
啟用 API
按一下畫面左上方的「選單」圖示。
選取「API 與」服務 >下拉式選單。
選取「+ 啟用 API 和服務」。
搜尋「Compute Engine」。按一下「Compute Engine API」出現在畫面上的結果清單中
在 Google Compute Engine 頁面上,按一下「啟用」
啟用後,按一下箭頭即可返回。
現在請搜尋並啟用下列 API:
- Cloud Dataflow
- Stackdriver
- Cloud Storage
- Cloud Storage JSON
- BigQuery
- Cloud Pub/Sub
- Cloud Datastore
- Cloud Resource Manager API
3. 建立新的 Cloud Storage 值區
在 Google Cloud Platform Console 中,按一下畫面左上方的「選單」圖示:
向下捲動,然後選取「Cloud Storage」>「Cloud Storage」儲存空間瀏覽器瀏覽器:
您現在應該會看到 Cloud Storage 瀏覽器,並假設您使用的專案目前沒有任何 Cloud Storage 值區,您就會收到建立新值區的邀請。按一下「Create bucket」(建立值區) 按鈕即可建立值區:
輸入值區名稱。如對話方塊註記,則所有 Cloud Storage 中的值區名稱均不得重複。因此,如果選擇簡單明瞭的名稱 (例如「test」),可能會發現其他使用者已建立該名稱的值區,因此您會收到錯誤訊息。
另外,關於值區名稱中可使用哪些字元,也有一些相關規則。值區名稱的開頭和結尾都是英文字母或數字,中間只使用破折號。如果您嘗試使用特殊字元,或試圖將值區名稱開頭或結尾不是字母或數字,對話方塊會顯示規則提醒。
輸入值區的專屬名稱,然後按下「建立」。如果您選取的項目已在使用中,系統會顯示上方顯示的錯誤訊息。成功建立值區後,系統會在瀏覽器中前往新的空白值區:
當然,您看到的值區名稱不同,因為所有專案中的值區名稱都不得重複。
4. 啟動 Cloud Shell
啟用 Cloud Shell
- 在 Cloud 控制台中,按一下「啟用 Cloud Shell」圖示 。
如果您從未啟動 Cloud Shell,您會看見中繼畫面 (需捲動位置),說明螢幕內容。如果出現這種情況,請按一下「繼續」 (之後不會再顯示)。以下是單次畫面的外觀:
佈建並連線至 Cloud Shell 只需幾分鐘的時間。
這部虛擬機器都裝載了您需要的所有開發工具。提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,大幅提高網路效能和驗證能力。在本程式碼研究室中,您的大部分作業都可以透過瀏覽器或 Chromebook 完成。
連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。
- 在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list
指令輸出
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
gcloud config list project
指令輸出
[core] project = <PROJECT_ID>
如果尚未設定,請使用下列指令進行設定:
gcloud config set project <PROJECT_ID>
指令輸出
Updated property [core/project].
5. 建立 Maven 專案
Cloud Shell 啟動後,讓我們使用 Apache Beam 適用的 Java SDK 建立 Maven 專案。
Apache Beam 是資料管道適用的開放原始碼程式設計模型。您可以使用 Apache Beam 程式定義這些管道,並選擇 Dataflow 等執行器來執行管道。
請在殼層中執行 mvn archetype:generate
指令,如下所示:
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.46.0 \
-DgroupId=org.example \
-DartifactId=first-dataflow \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
執行指令之後,您應該會在目前的目錄下看到名為 first-dataflow
的新目錄。first-dataflow
包含 Maven 專案,其中包含 Java 適用的 Cloud Dataflow SDK 和範例管道。
6. 在 Cloud Dataflow 上執行文字處理管道
首先,請將專案 ID 和 Cloud Storage 值區名稱儲存為環境變數。您可以在 Cloud Shell 中執行這項操作。請務必將 <your_project_id>
換成您自己的專案 ID。
export PROJECT_ID=<your_project_id>
現在我們要對 Cloud Storage 值區執行相同操作。請記得將 <your_bucket_name>
替換為您在先前步驟中,建立值區時使用的不重複名稱。
export BUCKET_NAME=<your_bucket_name>
切換至 first-dataflow/
目錄。
cd first-dataflow
我們會執行一個名為 WordCount 的管線,用於讀取文字、將文字行代碼化為個別字詞,並計算每個字詞的出現頻率。首先我們會執行管道,並在管道運作期間查看每個步驟的狀況。
在殼層或終端機視窗中執行 mvn compile exec:java
指令,啟動管道。下列指令會針對 --project, --stagingLocation,
和 --output
引數,參照您先前在這個步驟中設定的環境變數。
mvn compile exec:java \
-Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=${PROJECT_ID} \
--stagingLocation=gs://${BUCKET_NAME}/staging/ \
--output=gs://${BUCKET_NAME}/output \
--runner=DataflowRunner \
--region=us-central1 \
--gcpTempLocation=gs://${BUCKET_NAME}/temp"
請在工作執行期間,在工作清單中尋找工作。
在 Google Cloud Platform 主控台中開啟 Cloud Dataflow 網頁版 UI。您應該會看到字詞計數工作的狀態為 Running:
接著來看看管道參數請先點選工作名稱:
選取工作後,您可以查看執行圖。管道執行圖會以內含轉換名稱和部分狀態資訊的方塊,呈現管道中的各項轉換。您可以點選每個步驟右上角的衣夾圖示,查看詳細資訊:
以下說明管道如何在每個步驟轉換資料:
- 讀取:在這個步驟中,管道會從輸入來源讀取資料。在這個例子中,這是來自 Cloud Storage 的文字檔案,內含莎士比亞全行文字的《King Lear》全文。我們的管道會逐行讀取檔案,並輸出
PCollection
,文字檔案中的每一行都是集合中的元素。 - CountWords:
CountWords
步驟包含兩個部分。首先,它會使用名為ExtractWords
的平行執行函式 (ParDo),將每一行代碼化為個別字詞。ExtractWords 的輸出內容是新的 PCollection,其中每個元素都是一個字詞。下一步Count
會使用 Java SDK 提供的轉換,其會傳回鍵/值組合,其中鍵為不重複字詞,值是出現次數。以下是實作CountWords
的方法,您可以在 GitHub 上查看完整的 WordCount.java 檔案:
/**
* A PTransform that converts a PCollection containing lines of text into a PCollection of
* formatted word counts.
*
* <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
* Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
* modular testing, and an improved monitoring experience.
*/
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
- MapElements:這會叫用
FormatAsTextFn
(在下方複製),並將每個鍵、值組合的格式設定為可列印字串。
/** A SimpleFunction that converts a Word and Count into a printable string. */
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
@Override
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}
- WriteCounts:在這個步驟中,我們會將可列印的字串寫入多個資料分割文字檔案中。
我們稍後會探討管道產生的輸出內容。
現在請查看圖表右側的「Job info」(工作資訊) 頁面,其中包含我們在 mvn compile exec:java
指令中加入的管道參數。
您也可以查看管道的自訂計數器,在這個案例中,顯示到目前為止在執行期間遇到多少空白行。您可以為管道新增計數器,以便追蹤應用程式特定指標。
您可以按一下控制台底部的「記錄檔」圖示查看特定錯誤訊息。
面板預設會顯示回報整體工作狀態的工作記錄訊息。您可以使用「最低嚴重性」選取器篩選工作進度和狀態訊息。
選取圖表中的管道步驟後,檢視畫面會變更為程式碼產生的記錄檔,以及在管道步驟中執行的所產生程式碼。
如要返回「工作記錄」,請按一下圖表以外的地方或使用右側面板中的「關閉」按鈕,取消選取步驟。
您可以使用記錄檔分頁中的「Worker Logs」按鈕,查看執行管道的 Compute Engine 執行個體工作站記錄檔。工作站記錄包含程式碼產生的記錄檔行,以及 Dataflow 產生的執行程式碼程式碼。
如果您嘗試針對管道中失敗進行偵錯,工作站記錄檔中通常會有額外記錄,協助解決問題。提醒您,這些記錄檔是所有工作站的匯總資料,而且您可以篩選及搜尋。
Dataflow 監控介面只會顯示最近的記錄訊息。按一下記錄檔窗格右側的 Google Cloud Observability 連結,即可查看所有記錄。
以下摘要說明可在「Monitoring」→「記錄」頁面中查看的各種記錄類型:
- job-message 記錄包含 Dataflow 的各種元件產生的工作層級訊息,例如自動調度資源設定、工作站啟動或關閉的時間、工作步驟進度,以及工作錯誤。因當機使用者程式碼而出現在 worker 記錄中的工作站層級錯誤,也會傳播到 job-message 記錄。
- worker 記錄是由 Dataflow 工作站產生。工作站會執行大部分的管道工作,例如將 ParDo 套用到資料。Worker 記錄包含由您的程式碼和 Dataflow 記錄的訊息。
- worker-startup 記錄存在於大部分的 Dataflow 工作中,可擷取與啟動程序相關的訊息。啟動程序包括從 Cloud Storage 下載工作的 Jar,然後啟動工作站。如果啟動工作站時發生問題,可以查看這些記錄。
- shuffler 記錄包含來自合併平行管道作業結果的工作站訊息。
- docker 和 kubelet 記錄檔包含與這些公開技術相關的訊息,這些技術會用於 Dataflow 工作站。
在下一個步驟中,我們會檢查工作是否成功。
7. 確認工作是否成功
在 Google Cloud Platform 主控台中開啟 Cloud Dataflow 網頁版 UI。
會先看到字詞計數工作的狀態為「執行中」,然後才顯示「成功」:
執行工作大約需要 3 到 4 分鐘。
還記得自己執行管道並指定輸出值區的時機嗎?讓我們看看搜尋結果 (因為您不想知道《King Lear》中每個字出現多少次?!)。返回 Google Cloud Platform 控制台的 Cloud Storage 瀏覽器。在您的值區中,您應該會看見工作所建立的輸出檔案和暫存檔案:
8. 關閉資源
您可以從 Google Cloud Platform 主控台關閉資源。
在 Google Cloud Platform 主控台中開啟 Cloud Storage 瀏覽器。
選取您建立的值區旁邊的核取方塊,然後按一下「DELETE」(刪除),即可永久刪除值區及其內容。
9. 恭喜!
您已瞭解如何使用 Cloud Dataflow SDK 建立 Maven 專案、使用 Google Cloud Platform 控制台執行範例管道,以及刪除相關聯的 Cloud Storage 值區和內容。
瞭解詳情
- Dataflow 說明文件:https://cloud.google.com/dataflow/docs/
授權
本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。