การใช้สมุดบันทึกกับ Google Cloud Dataflow

1. บทนำ

Cloud-Dataflow.png

Google Cloud Dataflow

อัปเดตล่าสุด: 5-ก.ค.-2023

Dataflow คืออะไร

Dataflow เป็นบริการที่มีการจัดการสำหรับการดำเนินการรูปแบบการประมวลผลข้อมูลที่หลากหลาย เอกสารในเว็บไซต์นี้จะแสดงวิธีทำให้ไปป์ไลน์การประมวลผลข้อมูลแบบกลุ่มและแบบสตรีมใช้งานได้โดยใช้ Dataflow รวมถึงวิธีการใช้ฟีเจอร์ของบริการ

Apache Beam SDK เป็นโมเดลการเขียนโปรแกรมโอเพนซอร์สที่ช่วยให้คุณพัฒนาทั้งไปป์ไลน์การประมวลผลแบบกลุ่มและแบบสตรีมได้ คุณสร้างไปป์ไลน์ด้วยโปรแกรม Apache Beam แล้วเรียกใช้ในบริการ Dataflow เอกสารประกอบของ Apache Beam มีข้อมูลเชิงแนวคิดและเอกสารอ้างอิงเชิงลึกสำหรับรูปแบบการเขียนโปรแกรม, SDK และตัวเรียกใช้อื่นๆ ของ Apache Beam

การวิเคราะห์ข้อมูลการสตรีมอย่างรวดเร็ว

Dataflow ช่วยให้พัฒนา Data Pipeline สำหรับสตรีมมิงได้อย่างรวดเร็วและง่ายขึ้นโดยมีเวลาในการตอบสนองของข้อมูลที่ต่ำลง

ลดความซับซ้อนของการดำเนินงานและการจัดการ

ช่วยให้ทีมมุ่งเน้นที่การเขียนโปรแกรมแทนการจัดการคลัสเตอร์เซิร์ฟเวอร์ เนื่องจากแนวทางแบบ Serverless ของ Dataflow จะช่วยลดค่าใช้จ่ายในการดำเนินการจากภาระงานด้านวิศวกรรมข้อมูล

ลดต้นทุนรวมในการเป็นเจ้าของ

การปรับขนาดทรัพยากรโดยอัตโนมัติควบคู่กับความสามารถในการประมวลผลแบบกลุ่มที่เพิ่มประสิทธิภาพด้านต้นทุนหมายความว่า Dataflow มีความสามารถในการจัดการภาระงานตามฤดูกาลและภาระงานที่เพิ่มขึ้นอย่างรวดเร็วโดยไม่เสียค่าใช้จ่ายมากเกินไป

ฟีเจอร์หลัก

การจัดการทรัพยากรแบบอัตโนมัติและการปรับสมดุลงานแบบไดนามิก

Dataflow จะจัดสรรและจัดการทรัพยากรการประมวลผลโดยอัตโนมัติเพื่อลดเวลาในการตอบสนองและเพิ่มการใช้งานสูงสุด คุณจึงไม่จำเป็นต้องเปิดใช้อินสแตนซ์หรือจองด้วยตนเอง นอกจากนี้ การแบ่งพาร์ติชันงานยังเป็นแบบอัตโนมัติและเพิ่มประสิทธิภาพเพื่อจัดสมดุลงานที่ล่าช้าแบบไดนามิก โดยไม่ต้องค้นหา "คีย์ลัด" หรือประมวลผลข้อมูลนำเข้าล่วงหน้า

การปรับขนาดอัตโนมัติแนวนอน

การปรับขนาดทรัพยากรผู้ปฏิบัติงานแบบอัตโนมัติในแนวนอนเพื่อเพิ่มประสิทธิภาพอัตราการส่งข้อมูลให้สูงสุด ซึ่งจะทำให้ราคาต่อประสิทธิภาพโดยรวมดีขึ้น

ราคาการจัดตารางทรัพยากรแบบยืดหยุ่นสำหรับการประมวลผลแบบเป็นชุด

สำหรับการประมวลผลที่มีความยืดหยุ่นในเวลาการจัดกำหนดการงาน เช่น งานที่ทำข้ามคืน การจัดกำหนดการทรัพยากรแบบยืดหยุ่น (FlexRS) จะมีราคาที่ต่ำกว่าสำหรับการประมวลผลแบบกลุ่ม ระบบจะจัดคิวงานที่ยืดหยุ่นเหล่านี้พร้อมรับประกันว่าจะดึงข้อมูลมาดำเนินการภายในกรอบเวลา 6 ชั่วโมง

สิ่งที่คุณจะดำเนินการในส่วนนี้

การใช้ Apache Beam Interactive Runner กับสมุดบันทึก JupyterLab ช่วยให้คุณพัฒนาไปป์ไลน์ได้แบบวนซ้ำ ตรวจสอบกราฟไปป์ไลน์ และแยกวิเคราะห์ PCollection แต่ละรายการในเวิร์กโฟลว์ Read-Eval-Print-Loop (REPL) สมุดบันทึก Apache Beam เหล่านี้พร้อมใช้งานผ่าน Vertex AI Workbench ซึ่งเป็นบริการที่มีการจัดการที่โฮสต์เครื่องเสมือนของสมุดบันทึกที่ติดตั้งเฟรมเวิร์กวิทยาศาสตร์ข้อมูลและแมชชีนเลิร์นนิงล่าสุดไว้ล่วงหน้า

Codelab นี้มุ่งเน้นที่ฟังก์ชันการทำงานที่เปิดตัวโดยสมุดบันทึก Apache Beam

สิ่งที่คุณจะได้เรียนรู้

  • วิธีสร้างอินสแตนซ์ Notebook
  • การสร้างไปป์ไลน์พื้นฐาน
  • การอ่านข้อมูลจากแหล่งที่มาแบบไม่จำกัด
  • การแสดงข้อมูลเป็นภาพ
  • เปิดใช้งานงาน Dataflow จาก Notebook
  • การบันทึก Notebook

สิ่งที่คุณต้องมี

  • โปรเจ็กต์ Google Cloud Platform ที่เปิดใช้การเรียกเก็บเงิน
  • เปิดใช้ Google Cloud Dataflow และ Google Cloud Pub/Sub

2. การเริ่มตั้งค่า

  1. ใน Cloud Console ให้เลือกหรือสร้างโปรเจ็กต์ Cloud ในหน้าตัวเลือกโปรเจ็กต์

ตรวจสอบว่าคุณได้เปิดใช้ API ต่อไปนี้แล้ว

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

คุณตรวจสอบได้โดยไปที่หน้า API และบริการ

ในคู่มือนี้ เราจะอ่านข้อมูลจากการสมัครใช้บริการ Pub/Sub ดังนั้นโปรดตรวจสอบว่าบัญชีบริการเริ่มต้นของ Compute Engine มีบทบาทเอดิเตอร์ หรือให้บทบาทเอดิเตอร์ Pub/Sub แก่บัญชี

3. การเริ่มต้นใช้งาน Notebook ของ Apache Beam

การเปิดใช้อินสแตนซ์สมุดบันทึก Apache Beam

  1. เปิดใช้ Dataflow ในคอนโซลโดยทำดังนี้

  1. เลือกหน้าเวิร์กเบนช์โดยใช้เมนูด้านซ้าย
  2. ตรวจสอบว่าคุณอยู่ในแท็บสมุดบันทึกที่ผู้ใช้จัดการ
  3. คลิก New Notebook ในแถบเครื่องมือ
  4. เลือก Apache Beam > ไม่มี GPU
  5. ในหน้า Notebook ใหม่ ให้เลือกเครือข่ายย่อยสำหรับ VM ของ Notebook แล้วคลิกสร้าง
  6. คลิกเปิด JupyterLab เมื่อลิงก์ใช้งานได้ Vertex AI Workbench จะสร้างอินสแตนซ์ Notebook ของ Apache Beam ใหม่

4. การสร้างไปป์ไลน์

การสร้างอินสแตนซ์ Notebook

ไปที่ไฟล์ > ใหม่ > Notebook แล้วเลือกเคอร์เนลที่เป็น Apache Beam 2.47 ขึ้นไป

เริ่มเพิ่มโค้ดลงใน Notebook

  • คัดลอกและวางโค้ดจากแต่ละส่วนภายในเซลล์ใหม่ใน Notebook
  • เรียกใช้เซลล์

6bd3dd86cc7cf802.png

การใช้ Apache Beam Interactive Runner กับสมุดบันทึก JupyterLab ช่วยให้คุณพัฒนาไปป์ไลน์ได้แบบวนซ้ำ ตรวจสอบกราฟไปป์ไลน์ และแยกวิเคราะห์ PCollection แต่ละรายการในเวิร์กโฟลว์ Read-Eval-Print-Loop (REPL)

ระบบจะติดตั้ง Apache Beam ในอินสแตนซ์ Notebook ดังนั้นให้รวมโมดูล interactive_runner และ interactive_beam ไว้ใน Notebook

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

หาก Notebook ใช้บริการอื่นๆ ของ Google ให้เพิ่มคำสั่งนำเข้าต่อไปนี้

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

การตั้งค่าตัวเลือกการโต้ตอบ

คำสั่งต่อไปนี้จะตั้งค่าระยะเวลาการจับภาพข้อมูลเป็น 60 วินาที หากต้องการทำซ้ำเร็วขึ้น ให้ตั้งค่าระยะเวลาให้สั้นลง เช่น "10 วินาที"

ib.options.recording_duration = '60s'

ดูตัวเลือกแบบอินเทอร์แอกทีฟเพิ่มเติมได้ที่คลาส interactive_beam.options

เริ่มต้นไปป์ไลน์โดยใช้ออบเจ็กต์ InteractiveRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

การอ่านและการแสดงข้อมูลเป็นภาพ

ตัวอย่างต่อไปนี้แสดงไปป์ไลน์ Apache Beam ที่สร้างการสมัครใช้บริการหัวข้อ Pub/Sub ที่ระบุและอ่านจากการสมัครใช้บริการ

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

ไปป์ไลน์จะนับคำตามหน้าต่างจากแหล่งที่มา โดยจะสร้างการแบ่งช่วงเวลาแบบคงที่ซึ่งแต่ละช่วงเวลาจะมีความยาว 10 วินาที

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

หลังจากจัดหน้าต่างข้อมูลแล้ว ระบบจะนับคำตามหน้าต่าง

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

การแสดงข้อมูลเป็นภาพ

show() method จะแสดงภาพ PCollection ที่ได้ใน Notebook

ib.show(windowed_word_counts, include_window_info=True)

เมธอด show ที่แสดงภาพ PCollection ในรูปแบบตาราง

หากต้องการแสดงภาพข้อมูล ให้ส่ง visualize_data=True ไปยังเมธอด show() เพิ่มเซลล์ใหม่

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

คุณใช้ตัวกรองหลายรายการกับภาพได้ การแสดงภาพต่อไปนี้ช่วยให้คุณกรองตามป้ายกำกับและแกนได้

เมธอด show ที่แสดงภาพ PCollection เป็นชุดองค์ประกอบ UI ที่กรองได้

5. การใช้ Pandas DataFrame

การแสดงภาพอีกอย่างที่มีประโยชน์ในสมุดบันทึก Apache Beam คือ Pandas DataFrame ตัวอย่างต่อไปนี้จะแปลงคำเป็นตัวพิมพ์เล็กก่อน แล้วจึงคำนวณความถี่ของแต่ละคำ

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

เมธอด collect() จะแสดงผลลัพธ์ใน Pandas DataFrame

ib.collect(windowed_lower_word_counts, include_window_info=True)

เมธอด collect ที่แสดง PCollection ใน Pandas DataFrame

6. (ไม่บังคับ) เรียกใช้งาน Dataflow จาก Notebook

  1. หากต้องการเรียกใช้งานใน Dataflow คุณต้องมีสิทธิ์เพิ่มเติม ตรวจสอบว่าบัญชีบริการเริ่มต้นของ Compute Engine มีบทบาทผู้แก้ไข หรือให้บทบาท IAM ต่อไปนี้
  • ผู้ดูแลระบบ Dataflow
  • ผู้ปฏิบัติงานโฟลว์ข้อมูล
  • ผู้ดูแลระบบพื้นที่เก็บข้อมูล และ
  • ผู้ใช้บัญชีบริการ (roles/iam.serviceAccountUser)

ดูข้อมูลเพิ่มเติมเกี่ยวกับบทบาทได้ในเอกสารประกอบ

  1. (ไม่บังคับ) ก่อนใช้ Notebook เพื่อเรียกใช้ Dataflow Jobs ให้รีสตาร์ทเคอร์เนล เรียกใช้เซลล์ทั้งหมดอีกครั้ง และยืนยันเอาต์พุต
  2. นำคำสั่งนำเข้าต่อไปนี้ออก
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. เพิ่มคำสั่งนำเข้าต่อไปนี้
from apache_beam.runners import DataflowRunner
  1. นำตัวเลือกระยะเวลาการบันทึกต่อไปนี้ออก
ib.options.recording_duration = '60s'
  1. เพิ่มข้อมูลต่อไปนี้ลงในตัวเลือกไปป์ไลน์ คุณจะต้องปรับตำแหน่ง Cloud Storage ให้ชี้ไปยัง Bucket ที่คุณเป็นเจ้าของอยู่แล้ว หรือจะสร้าง Bucket ใหม่เพื่อวัตถุประสงค์นี้ก็ได้ นอกจากนี้ คุณยังเปลี่ยนค่าภูมิภาคจาก us-central1 ได้ด้วย
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. ในตัวสร้างของ beam.Pipeline() ให้แทนที่ InteractiveRunner ด้วย DataflowRunner p คือออบเจ็กต์ไปป์ไลน์จากการสร้างไปป์ไลน์
p = beam.Pipeline(DataflowRunner(), options=options)
  1. นำการเรียกแบบอินเทอร์แอกทีฟออกจากโค้ด เช่น นำ show(), collect(), head(), show_graph() และ watch() ออกจากโค้ด
  2. หากต้องการดูผลลัพธ์ คุณจะต้องเพิ่ม Sink ในส่วนก่อนหน้า เราได้แสดงภาพผลลัพธ์ในสมุดบันทึก แต่คราวนี้เราจะเรียกใช้ชื่องานนอกสมุดบันทึกนี้ใน Dataflow ดังนั้นเราจึงต้องใช้ตำแหน่งภายนอกสำหรับผลลัพธ์ ในตัวอย่างนี้ เราจะเขียนผลลัพธ์ลงในไฟล์ข้อความใน GCS (Google Cloud Storage) เนื่องจากนี่คือไปป์ไลน์การสตรีมที่มีการจัดหน้าต่างข้อมูล เราจึงต้องสร้างไฟล์ข้อความ 1 ไฟล์ต่อหน้าต่าง หากต้องการดำเนินการนี้ ให้เพิ่มขั้นตอนต่อไปนี้ลงในไปป์ไลน์
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. เพิ่ม p.run() ที่ท้ายโค้ดไปป์ไลน์
  2. ตอนนี้ให้ตรวจสอบโค้ด Notebook เพื่อยืนยันว่าคุณได้รวมการเปลี่ยนแปลงทั้งหมดแล้ว โดยควรมีลักษณะคล้ายกับตัวอย่างต่อไปนี้
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. เรียกใช้เซลล์
  2. คุณควรเห็นเอาต์พุตที่คล้ายกับเอาต์พุตต่อไปนี้
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. หากต้องการตรวจสอบว่างานกำลังทำงานอยู่หรือไม่ ให้ไปที่หน้างานสำหรับ Dataflow คุณควรเห็นงานใหม่ในรายการ งานจะใช้เวลาประมาณ 5-10 นาทีในการเริ่มประมวลผลข้อมูล
  2. เมื่อประมวลผลข้อมูลแล้ว ให้ไปที่ Cloud Storage แล้วไปที่ไดเรกทอรีที่ Dataflow จัดเก็บผลลัพธ์ (output_gcs_location ที่คุณกำหนด) คุณควรเห็นรายการไฟล์ข้อความ โดยมี 1 ไฟล์ต่อหน้าต่าง bfcc5ce9e46a8b14.png
  3. ดาวน์โหลดไฟล์และตรวจสอบเนื้อหา โดยควรมีรายการคำที่จับคู่กับจำนวนคำ หรือจะใช้อินเทอร์เฟซบรรทัดคำสั่งเพื่อตรวจสอบไฟล์ก็ได้ โดยทำได้โดยเรียกใช้คำสั่งต่อไปนี้ในเซลล์ใหม่ใน Notebook
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. คุณจะเห็นเอาต์พุตที่คล้ายกับตัวอย่างต่อไปนี้

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. เท่านี้ก็เรียบร้อย อย่าลืมล้างข้อมูลและหยุดงานที่คุณสร้างขึ้น (ดูขั้นตอนสุดท้ายของ Codelab นี้)

ดูตัวอย่างวิธีทำการแปลงนี้ในสมุดบันทึกแบบอินเทอร์แอกทีฟได้ที่สมุดบันทึกการนับคำของ Dataflow ในอินสแตนซ์สมุดบันทึก

หรือคุณจะส่งออก Notebook เป็นสคริปต์ที่เรียกใช้งานได้ แก้ไขไฟล์ .py ที่สร้างขึ้นโดยใช้ขั้นตอนก่อนหน้า แล้วติดตั้งใช้งานไปป์ไลน์ไปยังบริการ Dataflow ก็ได้

7. การบันทึกสมุดบันทึก

ระบบจะบันทึกสมุดบันทึกที่คุณสร้างไว้ในอินสแตนซ์ Notebook ที่กำลังทำงาน หากคุณรีเซ็ตหรือปิดอินสแตนซ์ Notebook ระหว่างการพัฒนา ระบบจะเก็บ Notebook ใหม่เหล่านั้นไว้ตราบใดที่สร้างไว้ในไดเรกทอรี /home/jupyter อย่างไรก็ตาม หากลบอินสแตนซ์ Notebook ระบบจะลบ Notebook เหล่านั้นด้วย

หากต้องการเก็บสมุดบันทึกไว้ใช้ในอนาคต ให้ดาวน์โหลดสมุดบันทึกไปยังเวิร์กสเตชัน บันทึกไว้ใน GitHub หรือส่งออกไปยังรูปแบบไฟล์อื่น

8. การล้างข้อมูล

หลังจากใช้ Notebook อินสแตนซ์ Apache Beam เสร็จแล้ว ให้ล้างข้อมูลทรัพยากรที่คุณสร้างใน Google Cloud โดยปิด Notebook อินสแตนซ์และหยุดงานสตรีมมิง หากคุณเรียกใช้งาน

หรือหากสร้างโปรเจ็กต์เพื่อวัตถุประสงค์เดียวในการทำ Codelab นี้ คุณก็ปิดโปรเจ็กต์ทั้งหมดได้เช่นกัน