將 Notebooks 與 Google Cloud Dataflow 搭配使用

1. 簡介

Cloud-Dataflow.png

Google Cloud Dataflow

上次更新時間:2023 年 7 月 5 日

什麼是 Dataflow?

Dataflow 是可執行各種資料處理模式的代管服務。此網站上的說明文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的指示。

Apache Beam SDK 是開放原始碼的程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行這些管道。Apache Beam 說明文件針對 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。

高速進行串流資料分析

Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。

簡化營運和管理作業

Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。

降低總持有成本

Dataflow 同時提供自動調度資源功能和成本效益最佳化的批次處理功能,可提供近乎無限的容量,讓您管理季節性與激增的工作負載,而不必擔心超支。

主要特色

自動管理資源及動態重新平衡工作

Dataflow 可自動佈建及管理處理資源,盡量縮短延遲時間和增加使用率,讓您不必手動啟動或保留執行個體。另外,工作分區也會自動化及最佳化,以動態地重新平衡延遲的工作。不必再去找「快速鍵」或預先處理輸入資料

水平自動調度資源

自動水平調度工作站的資源,以更優異的整體性價比達到最佳處理量。

批次處理的彈性資源排程定價

針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),您可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並在六小時內擷取出來,進入執行階段。

活動詳情

將 Apache Beam 互動式執行器與 JupyterLab 筆記本搭配使用,即可反覆開發管道、檢查管道圖,以及在讀取 eval-print-loop (REPL) 工作流程中剖析個別 PCollection。這些 Apache Beam 筆記本可透過 Vertex AI Workbench 存取。Vertex AI Workbench 是代管服務,託管了預先安裝最新數據資料學和機器學習架構的筆記本虛擬機器。

本程式碼研究室著重於 Apache Beam 筆記本推出的功能。

課程內容

  • 如何建立筆記本執行個體
  • 建立基本管道
  • 從無限制來源讀取資料
  • 將資料視覺化
  • 從筆記本啟動 Dataflow 工作
  • 正在儲存筆記本

軟硬體需求

  • 已啟用帳單功能的 Google Cloud Platform 專案。
  • 已啟用 Google Cloud Dataflow 和 Google Cloud PubSub。

2. 開始設定

  1. 在 Cloud 控制台的專案選取器頁面中,選取或建立 Cloud 專案。

請確認您已啟用下列 API:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

您可以查看 API 的「服務」頁面。

本指南將讀取 Pub/Sub 訂閱項目中的資料,因此請確保 Compute Engine 預設服務帳戶具備「編輯者」角色,或為該帳戶授予「Pub/Sub 編輯者」角色。

3. 開始使用 Apache Beam 筆記本

啟動 Apache Beam 筆記本執行個體

  1. 透過控制台啟動 Dataflow:

  1. 使用左側選單中的「Workbench」Workbench頁面。
  2. 確認您目前位於「User-managed notebooks」(由使用者管理的筆記本) 分頁。
  3. 按一下工具列中的「新增筆記本」
  4. 選取「Apache Beam」>沒有 GPU
  5. 在「New notebook」(新增筆記本) 頁面中,選取筆記本 VM 的子網路,然後按一下「Create」(建立)
  6. 連結生效後,按一下「Open JupyterLab」。Vertex AI Workbench 會建立新的 Apache Beam 筆記本執行個體。

4. 建立管道

建立筆記本執行個體

瀏覽至 [檔案] > [新增 >「筆記本」並選取 Apache Beam 2.47 以上版本的核心。

開始在筆記本中新增程式碼

  • 複製程式碼,並貼到筆記本的新儲存格中。
  • 執行儲存格

6bd3dd86cc7cf802.png

將 Apache Beam 互動式執行器與 JupyterLab 筆記本搭配使用,即可反覆開發管道、檢查管道圖,以及在讀取 eval-print-loop (REPL) 工作流程中剖析個別 PCollection。

Apache Beam 已安裝在您的筆記本執行個體,因此請在筆記本中加入 interactive_runnerinteractive_beam 模組。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

如果您的筆記本使用其他 Google 服務,請新增下列匯入陳述式:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

設定互動選項

下列指令會將資料擷取時間長度設為 60 秒。如果您想加快疊代速度,請將時長設為較短的持續時間,例如「10 秒」。

ib.options.recording_duration = '60s'

如需其他互動式選項,請參閱 active_beam.options 類別

使用 InteractiveRunner 物件將管道初始化。

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

解讀資料並以視覺化方式呈現

以下範例顯示的 Apache Beam 管道會建立對指定 Pub/Sub 主題的訂閱項目並從該訂閱項目讀取資料。

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

管道會從來源計算每個時段的字詞。這會建立固定的時間區間,每個時間區間的長度為 10 秒。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

將資料劃分後,系統就會按回溯期計算字詞。

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

以視覺化方式呈現資料

show() 方法會以視覺化方式呈現筆記本中產生的 PCollection。

ib.show(windowed_word_counts, include_window_info=True)

以表格形式視覺化 PCollection 的顯示方法。

如要以視覺化方式呈現資料,請將 visualize_data=True 傳遞至 show() 方法。新增儲存格:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

您可以對圖表套用多個篩選器。下列圖表可讓您依標籤和軸進行篩選:

以視覺化方式將 PCollection 視覺化為豐富的可篩選 UI 元素組合的顯示方法。

5. 使用 Pandas DataFrame

Apache Beam 筆記本中還有另一種實用的視覺化功能,是 Pandas DataFrame。下列範例會先將字詞轉換為小寫,然後計算每個字詞出現的頻率。

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

collect() 方法會在 Pandas DataFrame 中提供輸出內容。

ib.collect(windowed_lower_word_counts, include_window_info=True)

代表 Pandas DataFrame 中 PCollection 的收集方法。

6. (選用) 從筆記本啟動 Dataflow 工作

  1. 如要在 Dataflow 中執行工作,您必須具備其他權限。請確認 Compute Engine 預設服務帳戶具備「編輯者」角色,或為該帳戶授予下列 IAM 角色:
  • Dataflow 管理員
  • Dataflow 工作者
  • Storage 管理員
  • 服務帳戶使用者 (roles/iam.serviceAccountUser)

如要進一步瞭解角色,請參閱說明文件

  1. (選用) 使用筆記本執行 Dataflow 工作之前,請先重新啟動核心、重新執行所有儲存格並驗證輸出內容。
  2. 移除下列匯入陳述式:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. 新增下列匯入陳述式:
from apache_beam.runners import DataflowRunner
  1. 移除下列錄製時間長度選項:
ib.options.recording_duration = '60s'
  1. 將以下內容新增至管道選項。您將需要調整 Cloud Storage 位置,使其指向您已擁有的值區,或者您可以為了上述目的建立新值區。您也可以將區域值從 us-central1 變更為其他值。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. beam.Pipeline() 的建構函式中,將 InteractiveRunner 替換為 DataflowRunnerp 是建立管道時使用的管道物件。
p = beam.Pipeline(DataflowRunner(), options=options)
  1. 從程式碼中移除互動式呼叫。舉例來說,從程式碼中移除 show()collect()head()show_graph()watch()
  2. 您必須新增接收器,才能查看任何結果。在上一節中,我們要在筆記本中以視覺化方式呈現結果,但這次工作將在 Dataflow 中,從這個筆記本以外的位置執行。因此,我們的結果需要有外部位置。在這個範例中,我們會將結果寫入 GCS (Google Cloud Storage) 中的文字檔案。由於這是具有資料時間區間設定的串流管道,因此我們需要為每個視窗建立一個文字檔案。為達成此目標,請在管道中新增下列步驟:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. 在管道程式碼結尾處新增 p.run()
  2. 現在請檢查筆記本程式碼,確認您已納入所有變更。看起來應該會像這樣:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. 執行儲存格。
  2. 您應該會看到類似以下的輸出內容:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. 如要確認工作是否正在執行,請前往 Dataflow 的「Jobs」(工作) 頁面。清單中應會顯示新工作。工作大約會在 5 到 10 分鐘內開始處理資料,
  2. 系統處理完畢後,請前往 Cloud Storage,然後前往 Dataflow 儲存結果的目錄 (您定義的 output_gcs_location)。畫面上應會顯示文字檔清單,每個視窗包含一個檔案。bfcc5ce9e46a8b14.png
  3. 下載檔案並檢查內容。這個值應包含和計數的配對字詞清單。或者,您也可以使用指令列介面檢查檔案。方法是在筆記本的新儲存格中執行下列指令:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. 畫面會顯示類似以下的輸出內容:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. 大功告成!別忘了清除並停止您建立的工作 (請參閱本程式碼研究室的最後一個步驟)。

如需瞭解如何在互動式筆記本上執行這項轉換作業的範例,請參閱筆記本執行個體中的 Dataflow WordCount 筆記本。

或者,您也可以將筆記本匯出為可執行的指令碼,利用上述步驟修改產生的 .py 檔案,然後將管道部署至 Dataflow 服務。

7. 正在儲存筆記本

您建立的筆記本會儲存在本機的執行筆記本執行個體中。如果您在開發期間重設或關閉筆記本執行個體,只要這些新筆記本在 /home/jupyter 目錄下建立,系統就會保留下來。不過,如果筆記本執行個體遭到刪除,這些筆記本也會遭到刪除。

如要保留筆記本供日後使用,請將筆記本下載到本機工作站、儲存至 GitHub,或匯出為其他檔案格式。

8. 清除所用資源

使用完 Apache Beam 筆記本執行個體後,請關閉筆記本執行個體停止串流工作 (如果您已執行的話),清除您在 Google Cloud 建立的資源。

或者,如果您只為了本程式碼研究室而建立專案,也可以完全關閉專案