在 Cloud Dataflow 中執行大數據文字處理管道

1. 總覽

Cloud-Dataflow.png

什麼是 Dataflow?

Dataflow 是可執行各種資料處理模式的代管服務。此網站上的說明文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的指示。

Apache Beam SDK 是開放原始碼的程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行這些管道。Apache Beam 說明文件針對 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。

高速進行串流資料分析

Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。

簡化營運和管理作業

Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。

降低總持有成本

Dataflow 同時提供自動調度資源功能和成本效益最佳化的批次處理功能,可提供近乎無限的容量,讓您管理季節性與激增的工作負載,而不必擔心超支。

主要特色

自動管理資源及動態重新平衡工作

Dataflow 可自動佈建及管理處理資源,盡量縮短延遲時間和增加使用率,讓您不必手動啟動或保留執行個體。另外,工作分區也會自動化及最佳化,以動態地重新平衡延遲的工作。不必再去找「快速鍵」或預先處理輸入資料

水平自動調度資源

自動水平調度工作站的資源,以更優異的整體性價比達到最佳處理量。

針對批次處理提供彈性的資源排程定價

針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),您可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並在六小時內擷取出來,進入執行階段。

本教學課程沿用自 https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven

課程內容

  • 如何使用 Java SDK 透過 Apache Beam 建立 Maven 專案
  • 使用 Google Cloud Platform 主控台執行範例管道
  • 如何刪除相關聯的 Cloud Storage 值區及其內容

軟硬體需求

您會如何使用這個教學課程?

僅供閱讀 閱讀並完成練習

根據您使用 Google Cloud Platform 服務的經驗,您會給予什麼評價?

新手 中級 還算容易

2. 設定和需求

自修環境設定

  1. 登入 Cloud 控制台建立新專案,或是重複使用現有專案。(如果您還沒有 Gmail 或 G Suite 帳戶,請先建立帳戶)。

dMbN6g9RawQj_VXCSYpdYncY-DbaRzr2GbnwoV7jFf1u3avxJtmGPmKpMYgiaMH-qu80a_NJ9p2IIXFppYk8x3wyymZXavjglNLJJhuXieCem56H30hwXtd8PvXGpXJO9gEUDu3cZw

ci9Oe6PgnbNuSYlMyvbXF1JdQyiHoEgnhl4PlV_MFagm2ppzhueRkqX4eLjJllZco_2zCp0V0bpTupUSKji9KkQyWqj11pqit1K1faS1V6aFxLGQdkuzGp4rsQTan7F01iePL5DtqQ

8-tA_Lheyo8SscAVKrGii2coplQp2_D1Iosb2ViABY0UUO1A8cimXUu6Wf1R9zJIRExL5OB2j946aIiFtyKTzxDcNnuznmR45vZ2HMoK3o67jxuoUJCAnqvEX6NgPGFjCVNgASc-lg

提醒您,專案 ID 是所有 Google Cloud 專案的專屬名稱 (已經有人使用上述名稱,很抱歉對您不符!)。稍後在本程式碼研究室中會稱為 PROJECT_ID

  1. 接下來,您需要在 Cloud 控制台中啟用計費功能,才能使用 Google Cloud 資源。

執行這個程式碼研究室並不會產生任何費用,如果有的話。請務必依照「清除所用資源」一節指示本節將說明如何關閉資源,這樣您就不會產生本教學課程結束後產生的費用。Google Cloud 的新使用者符合 $300 美元免費試用計畫的資格。

啟用 API

按一下畫面左上方的「選單」圖示。

2bfc27ef9ba2ec7d.png

選取「API 與」服務 >下拉式選單

5b65523a6cc0afa6.png

選取「+ 啟用 API 和服務」

81ed72192c0edd96.png

搜尋「Compute Engine」。按一下「Compute Engine API」出現在畫面上的結果清單中

3f201e991c7b4527.png

在 Google Compute Engine 頁面上,按一下「啟用」

ac121653277fa7bb.png

啟用後,按一下箭頭即可返回。

現在請搜尋並啟用下列 API:

  • Cloud Dataflow
  • Stackdriver
  • Cloud Storage
  • Cloud Storage JSON
  • BigQuery
  • Cloud Pub/Sub
  • Cloud Datastore
  • Cloud Resource Manager API

3. 建立新的 Cloud Storage 值區

Google Cloud Platform Console 中,按一下畫面左上方的「選單」圖示:

2bfc27ef9ba2ec7d.png

向下捲動,然後選取「Cloud Storage」>「Cloud Storage」儲存空間瀏覽器瀏覽器:

2b6c3a2a92b47015.png

您現在應該會看到 Cloud Storage 瀏覽器,並假設您使用的專案目前沒有任何 Cloud Storage 值區,您就會收到建立新值區的邀請。按一下「Create bucket」(建立值區) 按鈕即可建立值區:

a711016d5a99dc37.png

輸入值區名稱。如對話方塊註記,則所有 Cloud Storage 中的值區名稱均不得重複。因此,如果選擇簡單明瞭的名稱 (例如「test」),可能會發現其他使用者已建立該名稱的值區,因此您會收到錯誤訊息。

另外,關於值區名稱中可使用哪些字元,也有一些相關規則。值區名稱的開頭和結尾都是英文字母或數字,中間只使用破折號。如果您嘗試使用特殊字元,或試圖將值區名稱開頭或結尾不是字母或數字,對話方塊會顯示規則提醒。

3a5458648cfe3358.png

輸入值區的專屬名稱,然後按下「建立」。如果您選取的項目已在使用中,系統會顯示上方顯示的錯誤訊息。成功建立值區後,系統會在瀏覽器中前往新的空白值區:

3bda986ae88c4e71.png

當然,您看到的值區名稱不同,因為所有專案中的值區名稱都不得重複。

4. 啟動 Cloud Shell

啟用 Cloud Shell

  1. 在 Cloud 控制台中,按一下「啟用 Cloud Shell」圖示 H7JlbhKGHITmsxhQIcLwoe5HXZMhDlYue4K-SPszMxUxDjIeWfOHBfxDHYpmLQTzUmQ7Xx8o6OJUlANnQF0iBuUyfpXXLZUzLzLZLZDZLZD

zlNW0HehB_AFW1qZ4AyebSQUdWm95n7TbnOr7UVm3j9dFcg6oWApJRlC0jnU1Mvb-IQp-trP1Px8xKNwt6o3pP6fyih947sEhOFI4IRF0W7WZk6hFqZDUGXQQXrw21GuMm2ecHrbzQ

如果您從未啟動 Cloud Shell,您會看見中繼畫面 (需捲動位置),說明螢幕內容。如果出現這種情況,請按一下「繼續」 (之後不會再顯示)。以下是單次畫面的外觀:

kEPbNAo_w5C_pi9QvhFwWwky1cX8hr_xEMGWySNIoMCdi-Djx9AQRqWn-__DmEpC7vKgUtl-feTcv-wBxJ8NwzzAp7mY65-fi2LJo4twUoewT1SUjd6Y3h81RG3rKIkqhoVlFR-G7w

佈建並連線至 Cloud Shell 只需幾分鐘的時間。

pTv5mEKzWMWp5VBrg2eGcuRPv9dLInPToS-mohlrqDASyYGWnZ_SwE-MzOWHe76ZdCSmw0kgWogSJv27lrQE8pvA5OD6P1I47nz8vrAdK7yR1NseZKJvcxAZrPb8wRxoqyTpD-gbhA

這部虛擬機器都裝載了您需要的所有開發工具。提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,大幅提高網路效能和驗證能力。在本程式碼研究室中,您的大部分作業都可以透過瀏覽器或 Chromebook 完成。

連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。

  1. 在 Cloud Shell 中執行下列指令,確認您已通過驗證:
gcloud auth list

指令輸出

 Credentialed Accounts
ACTIVE  ACCOUNT
*       <my_account>@<my_domain.com>

To set the active account, run:
    $ gcloud config set account `ACCOUNT`
gcloud config list project

指令輸出

[core]
project = <PROJECT_ID>

如果尚未設定,請使用下列指令進行設定:

gcloud config set project <PROJECT_ID>

指令輸出

Updated property [core/project].

5. 建立 Maven 專案

Cloud Shell 啟動後,讓我們使用 Apache Beam 適用的 Java SDK 建立 Maven 專案。

Apache Beam 是資料管道適用的開放原始碼程式設計模型。您可以使用 Apache Beam 程式定義這些管道,並選擇 Dataflow 等執行器來執行管道。

請在殼層中執行 mvn archetype:generate 指令,如下所示:

  mvn archetype:generate \
     -DarchetypeGroupId=org.apache.beam \
     -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
     -DarchetypeVersion=2.46.0 \
     -DgroupId=org.example \
     -DartifactId=first-dataflow \
     -Dversion="0.1" \
     -Dpackage=org.apache.beam.examples \
     -DinteractiveMode=false

執行指令之後,您應該會在目前的目錄下看到名為 first-dataflow 的新目錄。first-dataflow 包含 Maven 專案,其中包含 Java 適用的 Cloud Dataflow SDK 和範例管道。

6. 在 Cloud Dataflow 上執行文字處理管道

首先,請將專案 ID 和 Cloud Storage 值區名稱儲存為環境變數。您可以在 Cloud Shell 中執行這項操作。請務必將 <your_project_id> 換成您自己的專案 ID。

 export PROJECT_ID=<your_project_id>

現在我們要對 Cloud Storage 值區執行相同操作。請記得將 <your_bucket_name> 替換為您在先前步驟中,建立值區時使用的不重複名稱。

 export BUCKET_NAME=<your_bucket_name>

切換至 first-dataflow/ 目錄。

 cd first-dataflow

我們會執行一個名為 WordCount 的管線,用於讀取文字、將文字行代碼化為個別字詞,並計算每個字詞的出現頻率。首先我們會執行管道,並在管道運作期間查看每個步驟的狀況。

在殼層或終端機視窗中執行 mvn compile exec:java 指令,啟動管道。下列指令會針對 --project, --stagingLocation,--output 引數,參照您先前在這個步驟中設定的環境變數。

 mvn compile exec:java \
      -Pdataflow-runner compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--project=${PROJECT_ID} \
      --stagingLocation=gs://${BUCKET_NAME}/staging/ \
      --output=gs://${BUCKET_NAME}/output \
      --runner=DataflowRunner \
      --region=us-central1 \
      --gcpTempLocation=gs://${BUCKET_NAME}/temp"

請在工作執行期間,在工作清單中尋找工作。

Google Cloud Platform 主控台中開啟 Cloud Dataflow 網頁版 UI。您應該會看到字詞計數工作的狀態為 Running

3623be74922e3209.png

接著來看看管道參數請先點選工作名稱:

816d8f59c72797d7.png

選取工作後,您可以查看執行圖。管道執行圖會以內含轉換名稱和部分狀態資訊的方塊,呈現管道中的各項轉換。您可以點選每個步驟右上角的衣夾圖示,查看詳細資訊:

80a972dd19a6f1eb.png

以下說明管道如何在每個步驟轉換資料:

  • 讀取:在這個步驟中,管道會從輸入來源讀取資料。在這個例子中,這是來自 Cloud Storage 的文字檔案,內含莎士比亞全行文字的《King Lear》全文。我們的管道會逐行讀取檔案,並輸出 PCollection,文字檔案中的每一行都是集合中的元素。
  • CountWordsCountWords 步驟包含兩個部分。首先,它會使用名為 ExtractWords 的平行執行函式 (ParDo),將每一行代碼化為個別字詞。ExtractWords 的輸出內容是新的 PCollection,其中每個元素都是一個字詞。下一步 Count 會使用 Java SDK 提供的轉換,其會傳回鍵/值組合,其中鍵為不重複字詞,值是出現次數。以下是實作 CountWords 的方法,您可以在 GitHub 上查看完整的 WordCount.java 檔案:
 /**
   * A PTransform that converts a PCollection containing lines of text into a PCollection of
   * formatted word counts.
   *
   * <p>Concept #3: This is a custom composite transform that bundles two transforms (ParDo and
   * Count) as a reusable PTransform subclass. Using composite transforms allows for easy reuse,
   * modular testing, and an improved monitoring experience.
   */
  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }
  • MapElements:這會叫用 FormatAsTextFn (在下方複製),並將每個鍵、值組合的格式設定為可列印字串。
  /** A SimpleFunction that converts a Word and Count into a printable string. */
  public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
    @Override
    public String apply(KV<String, Long> input) {
      return input.getKey() + ": " + input.getValue();
    }
  }
  • WriteCounts:在這個步驟中,我們會將可列印的字串寫入多個資料分割文字檔案中。

我們稍後會探討管道產生的輸出內容。

現在請查看圖表右側的「Job info」(工作資訊) 頁面,其中包含我們在 mvn compile exec:java 指令中加入的管道參數。

9723815a1f5bf08b.png

208a7f0d6973acf6.png

您也可以查看管道的自訂計數器,在這個案例中,顯示到目前為止在執行期間遇到多少空白行。您可以為管道新增計數器,以便追蹤應用程式特定指標。

a2e2800e2c6893f8.png

您可以按一下控制台底部的「記錄檔」圖示查看特定錯誤訊息。

23c64138a1027f8.png

面板預設會顯示回報整體工作狀態的工作記錄訊息。您可以使用「最低嚴重性」選取器篩選工作進度和狀態訊息。

94ba42015fdafbe2.png

選取圖表中的管道步驟後,檢視畫面會變更為程式碼產生的記錄檔,以及在管道步驟中執行的所產生程式碼。

如要返回「工作記錄」,請按一下圖表以外的地方或使用右側面板中的「關閉」按鈕,取消選取步驟。

您可以使用記錄檔分頁中的「Worker Logs」按鈕,查看執行管道的 Compute Engine 執行個體工作站記錄檔。工作站記錄包含程式碼產生的記錄檔行,以及 Dataflow 產生的執行程式碼程式碼。

如果您嘗試針對管道中失敗進行偵錯,工作站記錄檔中通常會有額外記錄,協助解決問題。提醒您,這些記錄檔是所有工作站的匯總資料,而且您可以篩選及搜尋。

5a53c244f28d5478.png

Dataflow 監控介面只會顯示最近的記錄訊息。按一下記錄檔窗格右側的 Google Cloud Observability 連結,即可查看所有記錄。

2bc704a4d6529b31.png

以下摘要說明可在「Monitoring」→「記錄」頁面中查看的各種記錄類型:

  • job-message 記錄包含 Dataflow 的各種元件產生的工作層級訊息,例如自動調度資源設定、工作站啟動或關閉的時間、工作步驟進度,以及工作錯誤。因當機使用者程式碼而出現在 worker 記錄中的工作站層級錯誤,也會傳播到 job-message 記錄。
  • worker 記錄是由 Dataflow 工作站產生。工作站會執行大部分的管道工作,例如將 ParDo 套用到資料。Worker 記錄包含由您的程式碼和 Dataflow 記錄的訊息。
  • worker-startup 記錄存在於大部分的 Dataflow 工作中,可擷取與啟動程序相關的訊息。啟動程序包括從 Cloud Storage 下載工作的 Jar,然後啟動工作站。如果啟動工作站時發生問題,可以查看這些記錄。
  • shuffler 記錄包含來自合併平行管道作業結果的工作站訊息。
  • dockerkubelet 記錄檔包含與這些公開技術相關的訊息,這些技術會用於 Dataflow 工作站。

在下一個步驟中,我們會檢查工作是否成功。

7. 確認工作是否成功

Google Cloud Platform 主控台中開啟 Cloud Dataflow 網頁版 UI。

會先看到字詞計數工作的狀態為「執行中」,然後才顯示「成功」

4c408162416d03a2.png

執行工作大約需要 3 到 4 分鐘。

還記得自己執行管道並指定輸出值區的時機嗎?讓我們看看搜尋結果 (因為您不想知道《King Lear》中每個字出現多少次?!)。返回 Google Cloud Platform 控制台的 Cloud Storage 瀏覽器。在您的值區中,您應該會看見工作所建立的輸出檔案和暫存檔案:

25a5d3d4b5d0b567.png

8. 關閉資源

您可以從 Google Cloud Platform 主控台關閉資源。

在 Google Cloud Platform 主控台中開啟 Cloud Storage 瀏覽器。

2b6c3a2a92b47015.png

選取您建立的值區旁邊的核取方塊,然後按一下「DELETE」(刪除),即可永久刪除值區及其內容。

2f7780bdf10b69ba.png

8051ef293a8e5cfe.png

9. 恭喜!

您已瞭解如何使用 Cloud Dataflow SDK 建立 Maven 專案、使用 Google Cloud Platform 控制台執行範例管道,以及刪除相關聯的 Cloud Storage 值區和內容。

瞭解詳情

授權

本作品採用創用 CC 姓名標示 3.0 通用授權,以及 Apache 2.0 授權。