1. 簡介

Google Cloud Dataflow
上次更新時間:2023 年 7 月 5 日
什麼是 Dataflow?
Dataflow 是可執行各種資料處理模式的代管服務。此網站上的文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的說明。
Apache Beam SDK 是開放原始碼程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行管道。Apache Beam 說明文件為 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。
快速進行串流資料分析
Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。
簡化營運和管理工作
Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。
減少總持有成本
Dataflow 同時擁有自動調度資源功能和具絕佳成本效益的批次處理功能,可提供幾近無限的容量,讓您有效管理季節性與激增的工作負載,而不必擔心超支。
主要功能
自動管理資源及動態重新平衡工作
Dataflow 可自動佈建及管理資源處理作業,盡可能地縮短延遲時間及提升使用率,讓您不必以手動方式啟動或保留執行個體。此外,Dataflow 也會自動分割工作,並將這項作業最佳化,藉此動態重新平衡延遲的工作。您不必去找「快速鍵」,也不用再預先處理輸入資料。
自動水平調度資源
自動水平調度工作站的資源,以更優異的整體性價比達到最佳總處理量。
批次處理的彈性資源排程定價
針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並保證在六小時內擷取出來,進入執行階段。
您將執行的內容
搭配 JupyterLab 筆記本使用 Apache Beam 互動式執行器,即可在「讀取-求值-印出迴圈」(REPL) 工作流程中,反覆開發管道、檢查管道圖,以及剖析個別 PCollection。這些 Apache Beam 筆記本可透過 Vertex AI Workbench 取得,這項代管服務會代管筆記本虛擬機器,並預先安裝最新的資料科學和機器學習架構。
本程式碼研究室著重於 Apache Beam Notebooks 導入的功能。
課程內容
- 如何建立筆記本執行個體
- 建立基本管道
- 從不受限的來源讀取資料
- 將資料視覺化
- 從筆記本啟動 Dataflow 工作
- 儲存筆記本
軟硬體需求
- 已啟用計費功能的 Google Cloud Platform 專案。
- 已啟用 Google Cloud Dataflow 和 Google Cloud PubSub。
2. 開始設定
- 在 Cloud 控制台的專案選取器頁面中,選取或建立 Cloud 專案。
確認您已啟用下列 API:
- Dataflow API
- Cloud Pub/Sub API
- Compute Engine
- Notebooks API
您可以前往「API 與服務」頁面進行確認。
在本指南中,我們會從 Pub/Sub 訂閱項目讀取資料,因此請務必確認 Compute Engine 預設服務帳戶具備編輯者角色,或授予該帳戶 Pub/Sub 編輯者角色。
3. 開始使用 Apache Beam 筆記本
啟動 Apache Beam 筆記本執行個體
- 在控制台中啟動 Dataflow:
- 使用左側選單選取「工作台」頁面。
- 確認您位於「User-managed notebooks」(由使用者管理的筆記本) 分頁。
- 按一下工具列中的「新增筆記本」。
- 依序選取「Apache Beam」>「Without GPUs」(不含 GPU)。
- 在「New notebook」(新筆記本) 頁面中,選取筆記本 VM 的子網路,然後按一下「Create」(建立)。
- 連結啟用後,按一下「Open JupyterLab」。Vertex AI Workbench 會建立新的 Apache Beam 筆記本執行個體。
4. 建立管道
建立筆記本執行個體
依序前往「File」>「New」>「Notebook」,然後選取 Apache Beam 2.47 以上版本。
開始在筆記本中新增程式碼
- 將每個區段的程式碼複製並貼到筆記本的新儲存格中
- 執行儲存格

搭配 JupyterLab 筆記本使用 Apache Beam 互動式執行器,即可在「讀取-求值-印出迴圈」(REPL) 工作流程中,反覆開發管道、檢查管道圖,以及剖析個別 PCollection。
Apache Beam 已安裝在筆記本執行個體上,因此請在筆記本中加入 interactive_runner 和 interactive_beam 模組。
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
如果筆記本使用其他 Google 服務,請新增下列匯入陳述式:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
設定互動選項
下列範例會將資料擷取時間設為 60 秒。如要加快疊代速度,請將時間長度設為較短,例如「10 秒」。
ib.options.recording_duration = '60s'
如需其他互動式選項,請參閱 interactive_beam.options 類別。
使用 InteractiveRunner 物件初始化管道。
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
讀取及顯示資料
下例顯示 Apache Beam 管道,該管道會建立指定 Pub/Sub 主題的訂閱項目,並從該訂閱項目讀取資料。
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
管道會依來源的視窗計算字詞。這會建立固定時間區間,每個時間區間的持續時間為 10 秒。
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
資料經過視窗化處理後,系統會計算每個視窗中的字詞。
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
以視覺化方式呈現資料
show() 方法會在筆記本中將產生的 PCollection 視覺化。
ib.show(windowed_word_counts, include_window_info=True)

如要顯示資料的視覺化效果,請將 visualize_data=True 傳遞至 show() 方法。新增儲存格:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
您可以對視覺化內容套用多個篩選器。您可以透過下列視覺化圖表依標籤和軸篩選:

5. 使用 Pandas DataFrame
Apache Beam 筆記本中另一個實用的視覺化項目是 Pandas DataFrame。以下範例會先將字詞轉換為小寫,然後計算每個字詞的頻率。
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
collect() 方法會以 Pandas DataFrame 格式提供輸出內容。
ib.collect(windowed_lower_word_counts, include_window_info=True)

6. (選用) 從筆記本啟動 Dataflow 工作
- 如要在 Dataflow 上執行工作,您需要其他權限。確認 Compute Engine 預設服務帳戶具備編輯者角色,或授予下列 IAM 角色:
- Dataflow 管理員
- Dataflow 工作者
- 儲存空間管理員,以及
- 服務帳戶使用者 (roles/iam.serviceAccountUser)
如要進一步瞭解角色,請參閱說明文件。
- (選用) 使用筆記本執行 Dataflow 工作前,請重新啟動核心、重新執行所有儲存格,並驗證輸出內容。
- 移除下列匯入陳述式:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- 新增下列匯入陳述式:
from apache_beam.runners import DataflowRunner
- 移除下列錄音時間長度選項:
ib.options.recording_duration = '60s'
- 將下列項目新增至管道選項。您需要調整 Cloud Storage 位置,指向您已擁有的 bucket,也可以為此建立新的 bucket。您也可以從
us-central1變更區域值。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- 在
beam.Pipeline()的建構函式中,將InteractiveRunner替換為DataflowRunner。p是建立管道時的管道物件。
p = beam.Pipeline(DataflowRunner(), options=options)
- 從程式碼中移除互動式呼叫。舉例來說,從程式碼中移除
show()、collect()、head()、show_graph()和watch()。 - 如要查看任何結果,請新增接收器。上一節中,我們在筆記本中將結果視覺化,但這次我們要在筆記本以外的 Dataflow 中執行工作。因此,我們需要外部位置來提供結果。在本範例中,我們會將結果寫入 GCS (Google Cloud Storage) 中的文字檔。由於這是串流管道,且有資料時間區間,因此我們想為每個時間區間建立一個文字檔。如要達成這個目標,請在管道中加入下列步驟:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- 在管道程式碼結尾新增
p.run()。 - 現在請檢查筆記本程式碼,確認您已納入所有變更。內容大致如下:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- 執行儲存格。
- 您應該會看見類似下方的輸出內容:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- 如要驗證工作是否正在執行,請前往 Dataflow 的「Jobs」(工作) 頁面。清單中應該會顯示新工作。這項工作約需 5 到 10 分鐘才會開始處理資料。
- 資料處理完成後,請前往 Cloud Storage,然後前往 Dataflow 儲存結果的目錄 (您定義的
output_gcs_location)。您應該會看到文字檔案清單,每個視窗對應一個檔案。
- 下載檔案並檢查內容。其中應包含字詞清單和字詞出現次數。或者,您也可以使用指令列介面檢查檔案。如要這麼做,請在筆記本的新儲存格中執行下列指令:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- 畫面會顯示類似以下的輸出內容:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 大功告成!別忘了清除並停止您建立的工作 (請參閱本程式碼研究室的最後一個步驟)。
如要瞭解如何在互動式筆記本上執行這項轉換,請參閱筆記本執行個體中的 Dataflow Word Count 筆記本。
或者,您也可以將筆記本匯出為可執行指令碼,按照先前的步驟修改產生的 .py 檔案,然後將管道部署至 Dataflow 服務。
7. 正在儲存筆記本
您建立的筆記本會儲存在執行中的筆記本例項。如果在開發期間重設或關閉筆記本執行個體,只要新筆記本是在 /home/jupyter 目錄下建立,就會保留下來。不過,如果刪除筆記本執行個體,系統也會一併刪除這些筆記本。
如要保留筆記本以供日後使用,請將筆記本下載到工作站本機、儲存到 GitHub,或匯出為其他檔案格式。
8. 清除
使用完 Apache Beam 筆記本執行個體後,請關閉筆記本執行個體,並停止串流工作 (如有執行),藉此清除在 Google Cloud 上建立的資源。
或者,如果您特地為了本程式碼研究室建立專案,也可以完全關閉專案。