1. 簡介
Google Cloud Dataflow
上次更新時間:2023 年 7 月 5 日
什麼是 Dataflow?
Dataflow 是可執行各種資料處理模式的代管服務。此網站上的說明文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的指示。
Apache Beam SDK 是開放原始碼的程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行這些管道。Apache Beam 說明文件針對 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。
高速進行串流資料分析
Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。
簡化營運和管理作業
Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。
降低總持有成本
Dataflow 同時提供自動調度資源功能和成本效益最佳化的批次處理功能,可提供近乎無限的容量,讓您管理季節性與激增的工作負載,而不必擔心超支。
主要特色
自動管理資源及動態重新平衡工作
Dataflow 可自動佈建及管理處理資源,盡量縮短延遲時間和增加使用率,讓您不必手動啟動或保留執行個體。另外,工作分區也會自動化及最佳化,以動態地重新平衡延遲的工作。不必再去找「快速鍵」或預先處理輸入資料
水平自動調度資源
自動水平調度工作站的資源,以更優異的整體性價比達到最佳處理量。
批次處理的彈性資源排程定價
針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),您可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並在六小時內擷取出來,進入執行階段。
活動詳情
將 Apache Beam 互動式執行器與 JupyterLab 筆記本搭配使用,即可反覆開發管道、檢查管道圖,以及在讀取 eval-print-loop (REPL) 工作流程中剖析個別 PCollection。這些 Apache Beam 筆記本可透過 Vertex AI Workbench 存取。Vertex AI Workbench 是代管服務,託管了預先安裝最新數據資料學和機器學習架構的筆記本虛擬機器。
本程式碼研究室著重於 Apache Beam 筆記本推出的功能。
課程內容
- 如何建立筆記本執行個體
- 建立基本管道
- 從無限制來源讀取資料
- 將資料視覺化
- 從筆記本啟動 Dataflow 工作
- 正在儲存筆記本
軟硬體需求
- 已啟用帳單功能的 Google Cloud Platform 專案。
- 已啟用 Google Cloud Dataflow 和 Google Cloud PubSub。
2. 開始設定
- 在 Cloud 控制台的專案選取器頁面中,選取或建立 Cloud 專案。
請確認您已啟用下列 API:
- Dataflow API
- Cloud Pub/Sub API
- Compute Engine
- Notebooks API
您可以查看 API 的「服務」頁面。
本指南將讀取 Pub/Sub 訂閱項目中的資料,因此請確保 Compute Engine 預設服務帳戶具備「編輯者」角色,或為該帳戶授予「Pub/Sub 編輯者」角色。
3. 開始使用 Apache Beam 筆記本
啟動 Apache Beam 筆記本執行個體
- 透過控制台啟動 Dataflow:
- 使用左側選單中的「Workbench」Workbench頁面。
- 確認您目前位於「User-managed notebooks」(由使用者管理的筆記本) 分頁。
- 按一下工具列中的「新增筆記本」。
- 選取「Apache Beam」>沒有 GPU。
- 在「New notebook」(新增筆記本) 頁面中,選取筆記本 VM 的子網路,然後按一下「Create」(建立)。
- 連結生效後,按一下「Open JupyterLab」。Vertex AI Workbench 會建立新的 Apache Beam 筆記本執行個體。
4. 建立管道
建立筆記本執行個體
瀏覽至 [檔案] > [新增 >「筆記本」並選取 Apache Beam 2.47 以上版本的核心。
開始在筆記本中新增程式碼
- 複製程式碼,並貼到筆記本的新儲存格中。
- 執行儲存格
將 Apache Beam 互動式執行器與 JupyterLab 筆記本搭配使用,即可反覆開發管道、檢查管道圖,以及在讀取 eval-print-loop (REPL) 工作流程中剖析個別 PCollection。
Apache Beam 已安裝在您的筆記本執行個體,因此請在筆記本中加入 interactive_runner
和 interactive_beam
模組。
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
如果您的筆記本使用其他 Google 服務,請新增下列匯入陳述式:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
設定互動選項
下列指令會將資料擷取時間長度設為 60 秒。如果您想加快疊代速度,請將時長設為較短的持續時間,例如「10 秒」。
ib.options.recording_duration = '60s'
如需其他互動式選項,請參閱 active_beam.options 類別。
使用 InteractiveRunner
物件將管道初始化。
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
解讀資料並以視覺化方式呈現
以下範例顯示的 Apache Beam 管道會建立對指定 Pub/Sub 主題的訂閱項目並從該訂閱項目讀取資料。
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
管道會從來源計算每個時段的字詞。這會建立固定的時間區間,每個時間區間的長度為 10 秒。
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
將資料劃分後,系統就會按回溯期計算字詞。
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
以視覺化方式呈現資料
show()
方法會以視覺化方式呈現筆記本中產生的 PCollection。
ib.show(windowed_word_counts, include_window_info=True)
如要以視覺化方式呈現資料,請將 visualize_data=True
傳遞至 show()
方法。新增儲存格:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
您可以對圖表套用多個篩選器。下列圖表可讓您依標籤和軸進行篩選:
5. 使用 Pandas DataFrame
Apache Beam 筆記本中還有另一種實用的視覺化功能,是 Pandas DataFrame。下列範例會先將字詞轉換為小寫,然後計算每個字詞出現的頻率。
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
collect()
方法會在 Pandas DataFrame 中提供輸出內容。
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (選用) 從筆記本啟動 Dataflow 工作
- 如要在 Dataflow 中執行工作,您必須具備其他權限。請確認 Compute Engine 預設服務帳戶具備「編輯者」角色,或為該帳戶授予下列 IAM 角色:
- Dataflow 管理員
- Dataflow 工作者
- Storage 管理員
- 服務帳戶使用者 (roles/iam.serviceAccountUser)
如要進一步瞭解角色,請參閱說明文件。
- (選用) 使用筆記本執行 Dataflow 工作之前,請先重新啟動核心、重新執行所有儲存格並驗證輸出內容。
- 移除下列匯入陳述式:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- 新增下列匯入陳述式:
from apache_beam.runners import DataflowRunner
- 移除下列錄製時間長度選項:
ib.options.recording_duration = '60s'
- 將以下內容新增至管道選項。您將需要調整 Cloud Storage 位置,使其指向您已擁有的值區,或者您可以為了上述目的建立新值區。您也可以將區域值從
us-central1
變更為其他值。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- 在
beam.Pipeline()
的建構函式中,將InteractiveRunner
替換為DataflowRunner
。p
是建立管道時使用的管道物件。
p = beam.Pipeline(DataflowRunner(), options=options)
- 從程式碼中移除互動式呼叫。舉例來說,從程式碼中移除
show()
、collect()
、head()
、show_graph()
和watch()
。 - 您必須新增接收器,才能查看任何結果。在上一節中,我們要在筆記本中以視覺化方式呈現結果,但這次工作將在 Dataflow 中,從這個筆記本以外的位置執行。因此,我們的結果需要有外部位置。在這個範例中,我們會將結果寫入 GCS (Google Cloud Storage) 中的文字檔案。由於這是具有資料時間區間設定的串流管道,因此我們需要為每個視窗建立一個文字檔案。為達成此目標,請在管道中新增下列步驟:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- 在管道程式碼結尾處新增
p.run()
。 - 現在請檢查筆記本程式碼,確認您已納入所有變更。看起來應該會像這樣:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- 執行儲存格。
- 您應該會看到類似以下的輸出內容:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- 如要確認工作是否正在執行,請前往 Dataflow 的「Jobs」(工作) 頁面。清單中應會顯示新工作。工作大約會在 5 到 10 分鐘內開始處理資料,
- 系統處理完畢後,請前往 Cloud Storage,然後前往 Dataflow 儲存結果的目錄 (您定義的
output_gcs_location
)。畫面上應會顯示文字檔清單,每個視窗包含一個檔案。 - 下載檔案並檢查內容。這個值應包含和計數的配對字詞清單。或者,您也可以使用指令列介面檢查檔案。方法是在筆記本的新儲存格中執行下列指令:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- 畫面會顯示類似以下的輸出內容:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- 大功告成!別忘了清除並停止您建立的工作 (請參閱本程式碼研究室的最後一個步驟)。
如需瞭解如何在互動式筆記本上執行這項轉換作業的範例,請參閱筆記本執行個體中的 Dataflow WordCount 筆記本。
或者,您也可以將筆記本匯出為可執行的指令碼,利用上述步驟修改產生的 .py 檔案,然後將管道部署至 Dataflow 服務。
7. 正在儲存筆記本
您建立的筆記本會儲存在本機的執行筆記本執行個體中。如果您在開發期間重設或關閉筆記本執行個體,只要這些新筆記本在 /home/jupyter
目錄下建立,系統就會保留下來。不過,如果筆記本執行個體遭到刪除,這些筆記本也會遭到刪除。
如要保留筆記本供日後使用,請將筆記本下載到本機工作站、儲存至 GitHub,或匯出為其他檔案格式。
8. 清除所用資源
使用完 Apache Beam 筆記本執行個體後,請關閉筆記本執行個體並停止串流工作 (如果您已執行的話),清除您在 Google Cloud 建立的資源。
或者,如果您只為了本程式碼研究室而建立專案,也可以完全關閉專案。