將 Notebooks 與 Google Cloud Dataflow 搭配使用

1. 簡介

Cloud-Dataflow.png

Google Cloud Dataflow

上次更新時間:2023 年 7 月 5 日

什麼是 Dataflow?

Dataflow 是可執行各種資料處理模式的代管服務。此網站上的文件說明如何使用 Dataflow 部署批次和串流資料處理管道,包括使用服務功能的說明。

Apache Beam SDK 是開放原始碼程式設計模型,可讓您開發批次和串流管道。您可以使用 Apache Beam 程式建立管道,然後在 Dataflow 服務上執行管道。Apache Beam 說明文件為 Apache Beam 程式設計模型、SDK 和其他執行器提供深入的概念資訊和參考資料。

快速進行串流資料分析

Dataflow 可讓您快速執行簡化的串流資料管道開發作業,同時縮短資料延遲時間。

簡化營運和管理工作

Dataflow 不需依靠伺服器,因此可免除資料工程工作負載的營運負擔,讓團隊專注於程式設計,不必費心管理伺服器叢集。

減少總持有成本

Dataflow 同時擁有自動調度資源功能和具絕佳成本效益的批次處理功能,可提供幾近無限的容量,讓您有效管理季節性與激增的工作負載,而不必擔心超支。

主要功能

自動管理資源及動態重新平衡工作

Dataflow 可自動佈建及管理資源處理作業,盡可能地縮短延遲時間及提升使用率,讓您不必以手動方式啟動或保留執行個體。此外,Dataflow 也會自動分割工作,並將這項作業最佳化,藉此動態重新平衡延遲的工作。您不必去找「快速鍵」,也不用再預先處理輸入資料。

自動水平調度資源

自動水平調度工作站的資源,以更優異的整體性價比達到最佳總處理量。

批次處理的彈性資源排程定價

針對可彈性安排工作時間的處理作業 (例如整夜處理的工作),可以選擇使用彈性資源排程 (FlexRS),藉此以較低的價格執行批次處理作業。系統會將這些彈性工作排入佇列中,並保證在六小時內擷取出來,進入執行階段。

您將執行的內容

搭配 JupyterLab 筆記本使用 Apache Beam 互動式執行器,即可在「讀取-求值-印出迴圈」(REPL) 工作流程中,反覆開發管道、檢查管道圖,以及剖析個別 PCollection。這些 Apache Beam 筆記本可透過 Vertex AI Workbench 取得,這項代管服務會代管筆記本虛擬機器,並預先安裝最新的資料科學和機器學習架構。

本程式碼研究室著重於 Apache Beam Notebooks 導入的功能。

課程內容

  • 如何建立筆記本執行個體
  • 建立基本管道
  • 從不受限的來源讀取資料
  • 將資料視覺化
  • 從筆記本啟動 Dataflow 工作
  • 儲存筆記本

軟硬體需求

  • 已啟用計費功能的 Google Cloud Platform 專案。
  • 已啟用 Google Cloud Dataflow 和 Google Cloud PubSub。

2. 開始設定

  1. 在 Cloud 控制台的專案選取器頁面中,選取或建立 Cloud 專案。

確認您已啟用下列 API:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

您可以前往「API 與服務」頁面進行確認。

在本指南中,我們會從 Pub/Sub 訂閱項目讀取資料,因此請務必確認 Compute Engine 預設服務帳戶具備編輯者角色,或授予該帳戶 Pub/Sub 編輯者角色。

3. 開始使用 Apache Beam 筆記本

啟動 Apache Beam 筆記本執行個體

  1. 在控制台中啟動 Dataflow:

  1. 使用左側選單選取「工作台」頁面。
  2. 確認您位於「User-managed notebooks」(由使用者管理的筆記本) 分頁。
  3. 按一下工具列中的「新增筆記本」
  4. 依序選取「Apache Beam」>「Without GPUs」(不含 GPU)
  5. 在「New notebook」(新筆記本) 頁面中,選取筆記本 VM 的子網路,然後按一下「Create」(建立)
  6. 連結啟用後,按一下「Open JupyterLab」。Vertex AI Workbench 會建立新的 Apache Beam 筆記本執行個體。

4. 建立管道

建立筆記本執行個體

依序前往「File」>「New」>「Notebook」,然後選取 Apache Beam 2.47 以上版本。

開始在筆記本中新增程式碼

  • 將每個區段的程式碼複製並貼到筆記本的新儲存格中
  • 執行儲存格

6bd3dd86cc7cf802.png

搭配 JupyterLab 筆記本使用 Apache Beam 互動式執行器,即可在「讀取-求值-印出迴圈」(REPL) 工作流程中,反覆開發管道、檢查管道圖,以及剖析個別 PCollection。

Apache Beam 已安裝在筆記本執行個體上,因此請在筆記本中加入 interactive_runnerinteractive_beam 模組。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

如果筆記本使用其他 Google 服務,請新增下列匯入陳述式:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

設定互動選項

下列範例會將資料擷取時間設為 60 秒。如要加快疊代速度,請將時間長度設為較短,例如「10 秒」。

ib.options.recording_duration = '60s'

如需其他互動式選項,請參閱 interactive_beam.options 類別

使用 InteractiveRunner 物件初始化管道。

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

讀取及顯示資料

下例顯示 Apache Beam 管道,該管道會建立指定 Pub/Sub 主題的訂閱項目,並從該訂閱項目讀取資料。

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

管道會依來源的視窗計算字詞。這會建立固定時間區間,每個時間區間的持續時間為 10 秒。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

資料經過視窗化處理後,系統會計算每個視窗中的字詞。

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

以視覺化方式呈現資料

show() 方法會在筆記本中將產生的 PCollection 視覺化。

ib.show(windowed_word_counts, include_window_info=True)

show 方法會以表格形式顯示 PCollection。

如要顯示資料的視覺化效果,請將 visualize_data=True 傳遞至 show() 方法。新增儲存格:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

您可以對視覺化內容套用多個篩選器。您可以透過下列視覺化圖表依標籤和軸篩選:

show 方法會將 PCollection 顯示為一組可篩選的 UI 元素。

5. 使用 Pandas DataFrame

Apache Beam 筆記本中另一個實用的視覺化項目是 Pandas DataFrame。以下範例會先將字詞轉換為小寫,然後計算每個字詞的頻率。

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

collect() 方法會以 Pandas DataFrame 格式提供輸出內容。

ib.collect(windowed_lower_word_counts, include_window_info=True)

代表 Pandas DataFrame 中 PCollection 的 collect 方法。

6. (選用) 從筆記本啟動 Dataflow 工作

  1. 如要在 Dataflow 上執行工作,您需要其他權限。確認 Compute Engine 預設服務帳戶具備編輯者角色,或授予下列 IAM 角色:
  • Dataflow 管理員
  • Dataflow 工作者
  • 儲存空間管理員,以及
  • 服務帳戶使用者 (roles/iam.serviceAccountUser)

如要進一步瞭解角色,請參閱說明文件

  1. (選用) 使用筆記本執行 Dataflow 工作前,請重新啟動核心、重新執行所有儲存格,並驗證輸出內容。
  2. 移除下列匯入陳述式:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. 新增下列匯入陳述式:
from apache_beam.runners import DataflowRunner
  1. 移除下列錄音時間長度選項:
ib.options.recording_duration = '60s'
  1. 將下列項目新增至管道選項。您需要調整 Cloud Storage 位置,指向您已擁有的 bucket,也可以為此建立新的 bucket。您也可以從 us-central1 變更區域值。
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. beam.Pipeline() 的建構函式中,將 InteractiveRunner 替換為 DataflowRunnerp 是建立管道時的管道物件。
p = beam.Pipeline(DataflowRunner(), options=options)
  1. 從程式碼中移除互動式呼叫。舉例來說,從程式碼中移除 show()collect()head()show_graph()watch()
  2. 如要查看任何結果,請新增接收器。上一節中,我們在筆記本中將結果視覺化,但這次我們要在筆記本以外的 Dataflow 中執行工作。因此,我們需要外部位置來提供結果。在本範例中,我們會將結果寫入 GCS (Google Cloud Storage) 中的文字檔。由於這是串流管道,且有資料時間區間,因此我們想為每個時間區間建立一個文字檔。如要達成這個目標,請在管道中加入下列步驟:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. 在管道程式碼結尾新增 p.run()
  2. 現在請檢查筆記本程式碼,確認您已納入所有變更。內容大致如下:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. 執行儲存格。
  2. 您應該會看見類似下方的輸出內容:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. 如要驗證工作是否正在執行,請前往 Dataflow 的「Jobs」(工作) 頁面。清單中應該會顯示新工作。這項工作約需 5 到 10 分鐘才會開始處理資料。
  2. 資料處理完成後,請前往 Cloud Storage,然後前往 Dataflow 儲存結果的目錄 (您定義的 output_gcs_location)。您應該會看到文字檔案清單,每個視窗對應一個檔案。bfcc5ce9e46a8b14.png
  3. 下載檔案並檢查內容。其中應包含字詞清單和字詞出現次數。或者,您也可以使用指令列介面檢查檔案。如要這麼做,請在筆記本的新儲存格中執行下列指令:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. 畫面會顯示類似以下的輸出內容:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. 大功告成!別忘了清除並停止您建立的工作 (請參閱本程式碼研究室的最後一個步驟)。

如要瞭解如何在互動式筆記本上執行這項轉換,請參閱筆記本執行個體中的 Dataflow Word Count 筆記本。

或者,您也可以將筆記本匯出為可執行指令碼,按照先前的步驟修改產生的 .py 檔案,然後將管道部署至 Dataflow 服務。

7. 正在儲存筆記本

您建立的筆記本會儲存在執行中的筆記本例項。如果在開發期間重設或關閉筆記本執行個體,只要新筆記本是在 /home/jupyter 目錄下建立,就會保留下來。不過,如果刪除筆記本執行個體,系統也會一併刪除這些筆記本。

如要保留筆記本以供日後使用,請將筆記本下載到工作站本機、儲存到 GitHub,或匯出為其他檔案格式。

8. 清除

使用完 Apache Beam 筆記本執行個體後,請關閉筆記本執行個體,並停止串流工作 (如有執行),藉此清除在 Google Cloud 上建立的資源。

或者,如果您特地為了本程式碼研究室建立專案,也可以完全關閉專案