在 Cloud Dataflow 中執行大數據文字處理管道

1. 總覽

Cloud-Dataflow.png

什麼是 Dataflow?

Dataflow 是可執行各種資料處理模式的代管服務。此網站上的文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的說明。

Apache Beam SDK 是開放原始碼程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行管道。Apache Beam 說明文件為 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。

快速進行串流資料分析

Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。

簡化營運和管理工作

Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。

減少總持有成本

Dataflow 同時擁有自動調度資源功能和具絕佳成本效益的批次處理功能,可提供幾近無限的容量,讓您有效管理季節性與激增的工作負載,而不必擔心超支。

主要功能

自動管理資源及動態重新平衡工作

Dataflow 可自動佈建及管理資源處理作業,盡可能地縮短延遲時間及提升使用率,讓您不必以手動方式啟動或保留執行個體。此外,Dataflow 也會自動分割工作,並將這項作業最佳化,藉此動態重新平衡延遲的工作。您不必去找「快速鍵」,也不用再預先處理輸入資料。

自動水平調度資源

自動水平調度工作站的資源,以更優異的整體性價比達到最佳總處理量。

批次處理的彈性資源排程定價

針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並保證在六小時內擷取出來,進入執行階段。

本教學課程改編自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

課程內容

  • 如何使用 Java SDK 建立含有 Apache Beam 的 Maven 專案
  • 使用 Google Cloud Platform Console 執行範例管道
  • 如何刪除相關聯的 Cloud Storage bucket 和當中內容

軟硬體需求

您會如何使用本教學課程?

僅閱讀 閱讀並完成練習

您對使用 Google Cloud Platform 服務的體驗有何評價?

新手 中級 熟練

2. 設定和需求

自修實驗室環境設定

  1. 登入 Cloud 控制台,建立新專案或重複使用現有專案。(如果沒有 Gmail 或 G Suite 帳戶,請先建立帳戶)。

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

請記住專案 ID,這是所有 Google Cloud 專案中不重複的名稱 (上述名稱已遭占用,因此不適用於您,抱歉!)。本程式碼研究室稍後會將其稱為 PROJECT_ID

  1. 接著,您必須在 Cloud 控制台中啟用帳單,才能使用 Google Cloud 資源。

完成本程式碼研究室的費用應該不高,甚至完全免費。請務必按照「清除」部分的指示操作,瞭解如何停用資源,避免在本教學課程結束後繼續產生帳單費用。Google Cloud 新使用者可參加價值$300 美元的免費試用計畫。

啟用 API

按一下畫面左上方的「選單」圖示。

2bfc27ef9ba2ec7d.png

從下拉式選單中選取「API 和服務」>「資訊主頁」

5b65523a6cc0afa6.png

選取「+ 啟用 API 和服務」

81ed72192c0edd96.png

在搜尋框中搜尋「Compute Engine」。在隨即顯示的結果清單中,按一下「Compute Engine API」。

3f201e991c7b4527.png

在 Google Compute Engine 頁面中,按一下「啟用」

ac121653277fa7bb.png

啟用後,按一下箭頭即可返回。

現在請搜尋下列 API 並啟用:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager API

3. 建立新的 Cloud Storage bucket

Google Cloud Platform Console 中,按一下畫面左上方的「選單」圖示:

2bfc27ef9ba2ec7d.png

向下捲動,然後在「Storage」子區段中選取「Cloud Storage」>「Browser」

2b6c3a2a92b47015.png

現在應該會看到 Cloud Storage 瀏覽器,如果您使用的專案目前沒有任何 Cloud Storage bucket,系統會邀請您建立新的 bucket。按下「建立值區」按鈕即可建立值區:

a711016d5a99dc37.png

輸入 bucket 的名稱。如對話方塊所述,Cloud Storage 中所有值區的名稱皆不得重複。因此,如果您選擇「test」等顯而易見的名稱,可能會發現其他人已建立同名 bucket,並收到錯誤訊息。

此外,bucket 名稱也須遵守某些字元規則。只要值區名稱的開頭和結尾是英文字母或數字,中間只使用連字號,就沒問題。如果您嘗試使用特殊字元,或嘗試以英文字母或數字以外的字元做為值區名稱的開頭或結尾,對話方塊會提醒您相關規則。

3a5458648cfe3358.png

輸入 bucket 的專屬名稱,然後按一下「建立」。如果選擇的名稱已在使用中,系統會顯示上方的錯誤訊息。成功建立 bucket 後,瀏覽器會顯示新的空白 bucket:

3bda986ae88c4e71.png

當然,您看到的值區名稱會有所不同,因為所有專案的值區名稱都不得重複。

4. 啟動 Cloud Shell

啟用 Cloud Shell

  1. 在 Cloud 控制台,點選「啟用 Cloud Shell」 圖示 H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfp1RzVad_4nCa0Zz5LtwBlUZFXFCWFrmrWZLqg1MkZz2LdgUDQ

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

如果您是首次啟動 Cloud Shell,系統會顯示中繼畫面 (摺疊式螢幕下方),說明這個指令列環境。點選「繼續」後,這則訊息日後就不會再出現。以下是這個初次畫面的樣子:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

佈建並連至 Cloud Shell 預計只需要幾分鐘。

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

這部虛擬機器搭載各種您需要的開發工具,並提供永久的 5GB 主目錄,而且可在 Google Cloud 運作,大幅提升網路效能並強化驗證功能。本程式碼研究室幾乎所有工作都可在瀏覽器或 Chromebook 上完成。

連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。

  1. 在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list

指令輸出

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

指令輸出

[core]
project = <PROJECT_ID>

如未設定,請輸入下列指令手動設定專案:

gcloud config set project <PROJECT_ID>

指令輸出

Updated property [core/project].

5. 建立 Maven 專案

啟動 Cloud Shell 後,請使用 Apache Beam 適用的 Java SDK 建立 Maven 專案,開始進行操作。

Apache Beam 是用於資料管道的開放原始碼程式設計模型。您可以使用 Apache Beam 程式來定義這些管道,並選擇 Dataflow 等執行器來執行管道。

在殼層中執行 mvn archetype:generate 指令,如下所示:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

執行指令後,您應該會在目前的目錄下看到名為 first-dataflow 的新目錄。包含 Maven 專案,其中包含 Java 適用的 Cloud Dataflow SDK 和範例管道。first-dataflow

6. 在 Cloud Dataflow 上執行文字處理管道

首先,請將專案 ID 和 Cloud Storage 值區名稱儲存為環境變數。您可以在 Cloud Shell 中執行此操作。請務必將 <your_project_id> 替換為您自己的專案 ID。

 export PROJECT_ID=<your_project_id>

現在我們將對 Cloud Storage bucket 執行相同操作。請記得將 <your_bucket_name> 替換為您在先前步驟中建立值區時使用的專屬名稱。

 export BUCKET_NAME=<your_bucket_name>

切換至 first-dataflow/ 目錄。

 cd first-dataflow

我們將執行名為 WordCount 的管道,這個管道會讀取文字、將文字行代碼化為個別字詞,然後計算每個字詞出現的頻率。首先,我們會執行管道,並在執行期間查看每個步驟的運作情形。

在殼層或終端機視窗中執行 mvn compile exec:java 指令,啟動管道。對於 --project, --stagingLocation,--output 引數,下列指令會參照您在本步驟稍早設定的環境變數。

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

工作執行期間,請在工作清單中找出該工作。

Google Cloud Platform Console 中開啟 Cloud Dataflow 網頁版 UI。畫面會顯示 wordcount 工作,狀態為「執行中」

3623be74922e3209.png

現在來看看管道參數。首先,請按一下工作名稱:

816d8f59c72797d7.png

選取工作後,您可以查看執行圖。管道的執行圖會以包含轉換名稱和部分狀態資訊的方塊,呈現管道中的各個轉換。點選各步驟右上角的尖角,即可查看詳細資訊:

80a972dd19a6f1eb.png

現在來看看管道在每個步驟中轉換資料的方式:

  • 讀取:在這個步驟中,管道會從輸入來源讀取資料。在本例中,這是 Cloud Storage 中的文字檔,內含莎士比亞戲劇《李爾王》全文。管道會逐行讀取檔案,並輸出每個 PCollection,其中文字檔案中的每一行都是集合中的元素。
  • CountWordsCountWords 步驟包含兩個部分。首先,它會使用名為 ExtractWords 的平行 do 函式 (ParDo),將每行代碼化為個別字詞。ExtractWords 的輸出內容是新的 PCollection,其中每個元素都是一個字。下一個步驟 Count 會使用 Java SDK 提供的轉換作業,傳回鍵/值組合,其中鍵是獨一無二的字詞,值則是出現次數。以下是實作 CountWords 的方法,您可以在 GitHub 上查看完整的 WordCount.java 檔案:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements:這會叫用下方複製的 FormatAsTextFn,將每個鍵/值組合格式化為可列印的字串。
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts:在這個步驟中,我們會將可列印的字串寫入多個分片文字檔案。

我們會在幾分鐘後查看管道產生的輸出內容。

現在請查看圖表右側的「Job info」頁面,其中包含我們在 mvn compile exec:java 指令中加入的管道參數。

9723815a1f5bf08b.png

208a7f0d6973acf6.png

您也可以查看管道的自訂計數器,在本例中,這會顯示執行期間目前遇到的空白行數。您可以在管道中新增計數器,追蹤應用程式專屬指標。

a2e2800e2c6893f8.png

您可以點選控制台底部的「記錄」圖示,查看具體的錯誤訊息。

23c64138a1027f8.png

面板預設會顯示回報整體工作狀態的「Job Log」訊息。您可以使用「最低嚴重性」選取器,篩選工作進度和狀態訊息。

94ba42015fdafbe2.png

選取圖表中的管道步驟,即可查看程式碼產生的記錄,以及在該管道步驟中執行的所產生程式碼。

如要返回「Job Logs」(工作記錄),請按一下圖表以外的地方,或使用右側面板中的「Close」(關閉) 按鈕來取消選取步驟。

您可以使用記錄分頁中的「工作站記錄」按鈕,查看執行管道的 Compute Engine 執行個體工作站記錄。工作站記錄包含程式碼產生的記錄行,以及執行程式碼時產生的 Dataflow 程式碼。

如果您嘗試對管道中的失敗進行偵錯,通常「工作人員記錄」中會有額外的記錄,有助於解決問題。請注意,這些記錄是匯總所有工作站的記錄,可以篩選及搜尋。

5a53c244f28d5478.png

Dataflow 監控介面只會顯示最近的記錄訊息。如要查看所有記錄,請按一下記錄窗格右側的 Google Cloud Observability 連結。

2bc704a4d6529b31.png

以下摘要說明可在「Monitoring」(監控) →「Logs」(記錄) 頁面中,查看的各種記錄類型:

  • job-message 記錄包含 Dataflow 各種元件產生的工作層級訊息,例如自動調度資源設定、工作站或關閉時間、工作步驟進度以及工作錯誤。源於當機使用者程式碼的工作站等級錯誤以及存在於 worker 記錄裡的工作站等級錯誤,也會一併傳播至 job-message 記錄。
  • worker 記錄由 Dataflow 工作站產生。工作站負責執行大部分的管道工作,像是將 ParDo 套用到資料。worker 記錄包含由您的程式碼和 Dataflow 記錄的訊息。
  • worker-startup 記錄存在於大部分的 Dataflow 工作中,可擷取與啟動程序相關的訊息。啟動程序包括從 Cloud Storage 下載工作的 Jar,接著啟動工作站。如果啟動工作站時發生問題,可以查看這類記錄。
  • shuffler 記錄包含來自合併平行管道作業結果的工作站訊息。
  • dockerkubelet 記錄包含與 Dataflow 工作站使用的公開技術相關的訊息。

在下一個步驟中,我們會檢查工作是否成功。

7. 確認工作是否成功

Google Cloud Platform Console 中開啟 Cloud Dataflow 網頁版 UI。

畫面會先後顯示 wordcount 工作的「Status」(狀態) 為「Running」(執行中) 和「Succeeded」(成功):

4c408162416d03a2.png

這項工作約需 3 到 4 分鐘才能執行完畢。

還記得您執行管道並指定輸出值 bucket 嗎?現在來看看結果 (因為您一定很想知道《李爾王》中每個字詞的出現次數!)。返回 Google Cloud Platform Console 中的 Cloud Storage 瀏覽器。在您的值區中,您應該會看見工作所建立的輸出檔案和暫存檔案:

25a5d3d4b5d0b567.png

8. 關閉資源

您可以透過 Google Cloud Platform Console 關閉資源。

在 Google Cloud Platform 主控台中開啟 Cloud Storage 瀏覽器。

2b6c3a2a92b47015.png

勾選您建立的值區旁的核取方塊,然後按一下「DELETE」(刪除),即可永久刪除值區及其內容。

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. 恭喜!

您已瞭解如何使用 Cloud Dataflow SDK 建立 Maven 專案、使用 Google Cloud Platform 控制台執行範例管道,以及刪除相關聯的 Cloud Storage 值區及其內容。

瞭解詳情

授權

這項內容採用的授權為創用 CC 姓名標示 3.0 通用授權和 Apache 2.0 授權。