1. บทนำ
Google Cloud Dataflow
อัปเดตล่าสุด 5 ก.ค. 2023
Dataflow คืออะไร
Dataflow เป็นบริการที่มีการจัดการสำหรับการทำงานในรูปแบบการประมวลผลข้อมูลหลากหลายรูปแบบ เอกสารในเว็บไซต์นี้จะแสดงวิธีทำให้ไปป์ไลน์การประมวลผลข้อมูลแบบกลุ่มและการสตรีมมิงใช้งานได้โดยใช้ Dataflow รวมถึงวิธีการใช้ฟีเจอร์บริการ
Apache Beam SDK เป็นโมเดลการเขียนโปรแกรมโอเพนซอร์สที่ให้คุณพัฒนาได้ทั้งไปป์ไลน์แบบกลุ่มและสตรีมมิง คุณสร้างไปป์ไลน์ด้วยโปรแกรม Apache Beam แล้วเรียกใช้ไปป์ไลน์ในบริการ Dataflow เอกสารประกอบเกี่ยวกับ Apacheบีมให้ข้อมูลแนวคิดเชิงลึกและข้อมูลอ้างอิงสำหรับโมเดลการเขียนโปรแกรม Apacheบีม, SDK และโปรแกรมเรียกใช้อื่นๆ
การสตรีมการวิเคราะห์ข้อมูลอย่างรวดเร็ว
Dataflow ช่วยให้การพัฒนาไปป์ไลน์ข้อมูลสตรีมมิงเป็นไปอย่างง่ายดายและรวดเร็วโดยมีเวลาในการตอบสนองของข้อมูลต่ำลง
ลดความซับซ้อนของการดำเนินการและการจัดการ
อนุญาตให้ทีมมุ่งเน้นที่การเขียนโปรแกรมแทนการจัดการคลัสเตอร์เซิร์ฟเวอร์เนื่องจากแนวทางแบบ Serverless ของ Dataflow นำค่าใช้จ่ายในการดำเนินการออกจากภาระงานด้านวิศวกรรมข้อมูล
ลดต้นทุนรวมในการเป็นเจ้าของ
การปรับขนาดทรัพยากรอัตโนมัติควบคู่กับความสามารถในการประมวลผลแบบกลุ่มที่เพิ่มประสิทธิภาพต้นทุนทำให้ Dataflow มีขีดความสามารถที่ไม่จำกัดในระบบเสมือนจริงในการจัดการภาระงานตามฤดูกาลและพุ่งสูงโดยไม่มีการใช้จ่ายมากเกินไป
ฟีเจอร์สำคัญ
การจัดการทรัพยากรอัตโนมัติและการจัดสรรงานแบบไดนามิกใหม่
Dataflow ทำให้การจัดสรรและการจัดการทรัพยากรการประมวลผลเป็นแบบอัตโนมัติเพื่อลดเวลาในการตอบสนองและเพิ่มการใช้งานให้ได้สูงสุด คุณจึงไม่ต้องสร้างอินสแตนซ์หรือจองอินสแตนซ์ด้วยตนเอง นอกจากนี้ การแบ่งพาร์ติชันงานยังทำงานแบบอัตโนมัติและได้รับการเพิ่มประสิทธิภาพเพื่อปรับสมดุลการทำงานที่ล่าช้าแบบไดนามิก ไม่จำเป็นต้องตามหา "แป้นลัด" อีกต่อไป หรือประมวลผลข้อมูลอินพุตล่วงหน้า
การปรับขนาดอัตโนมัติในแนวนอน
การปรับขนาดทรัพยากรผู้ปฏิบัติงานอัตโนมัติในแนวนอนเพื่ออัตราการส่งข้อมูลที่ดีที่สุดช่วยให้ประสิทธิภาพโดยรวมดีขึ้น
การกำหนดราคาการกำหนดเวลาทรัพยากรที่ยืดหยุ่นสำหรับการประมวลผลแบบกลุ่ม
เพื่อการประมวลผลที่มีความยืดหยุ่นในการกำหนดตารางเวลางาน เช่น งานข้ามคืน การกำหนดเวลาทรัพยากรที่ยืดหยุ่น (FlexRS) จะเสนอราคาต่ำกว่าสำหรับการประมวลผลแบบกลุ่ม งานที่ยืดหยุ่นเหล่านี้จะอยู่ในคิวที่มีการรับประกันว่าจะมีการดึงข้อมูลเพื่อดำเนินการได้ภายใน 6 ชั่วโมง
สิ่งที่คุณจะดำเนินการในฐานะส่วนหนึ่งของ
การใช้ตัวเรียกใช้แบบอินเทอร์แอกทีฟของ Apache Beam กับสมุดบันทึก JupyterLab จะช่วยให้คุณพัฒนาไปป์ไลน์ ตรวจสอบกราฟไปป์ไลน์ และแยกวิเคราะห์ PCollections แต่ละรายการในเวิร์กโฟลว์ Read-eval-print-loop (REPL) ได้ซ้ำๆ สมุดบันทึก Apacheบีมเหล่านี้พร้อมให้ใช้งานผ่าน Vertex AI Workbench ซึ่งเป็นบริการที่มีการจัดการที่โฮสต์เครื่องเสมือนสำหรับสมุดบันทึก โดยติดตั้งเฟรมเวิร์กวิทยาศาสตร์ข้อมูลและแมชชีนเลิร์นนิงล่าสุดไว้ล่วงหน้า
Codelab นี้จะมุ่งเน้นไปที่ฟังก์ชันการทำงานของสมุดบันทึก Apacheบีม
สิ่งที่คุณจะได้เรียนรู้
- วิธีสร้างอินสแตนซ์สมุดบันทึก
- การสร้างไปป์ไลน์พื้นฐาน
- กำลังอ่านข้อมูลจากแหล่งที่มาที่ไม่มีการควบคุม
- การแสดงภาพข้อมูล
- กำลังเรียกใช้งาน Dataflow จากสมุดบันทึก
- กำลังบันทึกสมุดบันทึก
สิ่งที่คุณต้องมี
- โปรเจ็กต์ Google Cloud Platform ที่เปิดใช้การเรียกเก็บเงิน
- เปิดใช้ Google Cloud Dataflow และ Google Cloud PubSub แล้ว
2. การตั้งค่า
- ใน Cloud Console ให้เลือกหรือสร้างโปรเจ็กต์ Cloud ในหน้าตัวเลือกโปรเจ็กต์
ตรวจสอบว่าคุณได้เปิดใช้ API ต่อไปนี้
- Dataflow API
- API ของ Cloud Pub/Sub
- Compute Engine
- Notebooks API
ซึ่งตรวจสอบได้โดยการตรวจสอบ API และ หน้าบริการ
ในคู่มือนี้ เราจะอ่านข้อมูลจากการสมัครใช้บริการ Pub/Sub ดังนั้นให้ตรวจสอบว่าบัญชีบริการเริ่มต้นของ Compute Engine มีบทบาทผู้แก้ไขหรือให้สิทธิ์บทบาทผู้แก้ไข Pub/Sub
3. การเริ่มต้นใช้งานสมุดบันทึก Apacheบีม
การเปิดใช้งานอินสแตนซ์สมุดบันทึก Apacheบีม
- เปิด Dataflow ในคอนโซล:
- เลือกหน้า Workbench โดยใช้เมนูด้านซ้าย
- ตรวจสอบว่าคุณอยู่ในแท็บสมุดบันทึกที่ผู้ใช้จัดการ
- คลิกสมุดบันทึกใหม่ในแถบเครื่องมือ
- เลือก Apacheบีม > ไม่ใช้ GPU
- ในหน้าสมุดบันทึกใหม่ ให้เลือกเครือข่ายย่อยสำหรับ VM ของสมุดบันทึกแล้วคลิกสร้าง
- คลิก Open JupyterLab เมื่อลิงก์ใช้งานได้ Vertex AI Workbench จะสร้างอินสแตนซ์สมุดบันทึก Apache Beam ใหม่
4. การสร้างไปป์ไลน์
การสร้างอินสแตนซ์สมุดบันทึก
ไปที่ไฟล์ > ใหม่ > สมุดบันทึกและเลือกเคอร์เนลที่เป็น Apache Beam 2.47 ขึ้นไป
เริ่มเพิ่มโค้ดลงในสมุดบันทึก
- คัดลอกและวางโค้ดจากแต่ละส่วนภายในเซลล์ใหม่ในสมุดบันทึก
- เรียกใช้เซลล์
การใช้ตัวเรียกใช้แบบอินเทอร์แอกทีฟของ Apache Beam กับสมุดบันทึก JupyterLab จะช่วยให้คุณพัฒนาไปป์ไลน์ ตรวจสอบกราฟไปป์ไลน์ และแยกวิเคราะห์ PCollections แต่ละรายการในเวิร์กโฟลว์ Read-eval-print-loop (REPL) ได้ซ้ำๆ
Apache Beam จะติดตั้งบนอินสแตนซ์สมุดบันทึกของคุณ ดังนั้นให้รวมโมดูล interactive_runner
และ interactive_beam
ไว้ในสมุดบันทึกของคุณ
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
หากสมุดบันทึกใช้บริการอื่นๆ ของ Google ให้เพิ่มคำสั่งการนำเข้าต่อไปนี้
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
การตั้งค่าตัวเลือกการโต้ตอบ
ข้อมูลต่อไปนี้ตั้งค่าระยะเวลาการเก็บข้อมูลเป็น 60 วินาที หากต้องการปรับปรุงให้เร็วขึ้น ให้ตั้งค่าระยะเวลาให้ต่ำลง เช่น "10 วินาที"
ib.options.recording_duration = '60s'
สำหรับตัวเลือกแบบอินเทอร์แอกทีฟเพิ่มเติม โปรดดูคลาสinteractive_beam.options
เริ่มต้นไปป์ไลน์โดยใช้ออบเจ็กต์ InteractiveRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
การอ่านและการแสดงภาพข้อมูล
ตัวอย่างต่อไปนี้แสดงไปป์ไลน์ Apache Beam ที่สร้างการสมัครใช้บริการหัวข้อ Pub/Sub ที่ระบุและอ่านจากการสมัครใช้บริการ
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
ไปป์ไลน์จะนับคำตามหน้าต่างจากแหล่งที่มา สร้างกรอบเวลาคงที่โดยให้แต่ละหน้าต่างมีระยะเวลา 10 วินาที
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
หลังจากหน้าต่างข้อมูลอยู่ในหน้าต่างแล้ว ระบบจะนับคำตามกรอบเวลา
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
การแสดงภาพข้อมูล
เมธอด show()
จะแสดง PCollection ผลลัพธ์ในสมุดบันทึก
ib.show(windowed_word_counts, include_window_info=True)
หากต้องการแสดงภาพของข้อมูล ให้ส่ง visualize_data=True
ไปยังเมธอด show()
เพิ่มเซลล์ใหม่
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
คุณใช้ตัวกรองหลายรายการกับการแสดงภาพได้ การแสดงภาพต่อไปนี้ช่วยให้คุณกรองตามป้ายกำกับและแกนได้
5. การใช้ Pandas DataFrame
การแสดงข้อมูลผ่านภาพที่มีประโยชน์อีกอย่างหนึ่งในสมุดบันทึกของ Apacheบีมคือ Pandas DataFrame ตัวอย่างต่อไปนี้จะแปลงคำให้เป็นตัวพิมพ์เล็กก่อน จากนั้นจะคำนวณความถี่ของแต่ละคำ
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
เมธอด collect()
จะแสดงเอาต์พุตใน Pandas DataFrame
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (ไม่บังคับ) การเปิดตัวงาน Dataflow จากสมุดบันทึก
- หากต้องการเรียกใช้งานบน Dataflow คุณต้องมีสิทธิ์เพิ่มเติม ตรวจสอบว่าบัญชีบริการเริ่มต้นของ Compute Engine มีบทบาทผู้แก้ไขหรือให้สิทธิ์บทบาท IAM ต่อไปนี้
- ผู้ดูแลระบบ Dataflow
- ผู้ปฏิบัติงานโฟลว์ข้อมูล
- ผู้ดูแลระบบพื้นที่เก็บข้อมูล และ
- ผู้ใช้บัญชีบริการ (roles/iam.serviceAccountUser)
ดูข้อมูลเพิ่มเติมเกี่ยวกับบทบาทในเอกสารประกอบ
- (ไม่บังคับ) ก่อนใช้สมุดบันทึกเพื่อเรียกใช้งานโฟลว์ข้อมูล ให้รีสตาร์ทเคอร์เนล เรียกใช้เซลล์ทั้งหมดอีกครั้ง และตรวจสอบเอาต์พุต
- นำคำสั่งการนำเข้าต่อไปนี้ออก
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- เพิ่มคำสั่งการนำเข้าต่อไปนี้
from apache_beam.runners import DataflowRunner
- นำตัวเลือกระยะเวลาการบันทึกต่อไปนี้ออก
ib.options.recording_duration = '60s'
- เพิ่มข้อมูลต่อไปนี้ลงในตัวเลือกไปป์ไลน์ คุณจะต้องปรับตำแหน่งของ Cloud Storage ให้ชี้ไปยังที่เก็บข้อมูลที่มีอยู่แล้ว หรือสร้างที่เก็บข้อมูลใหม่เพื่อวัตถุประสงค์นี้ คุณเปลี่ยนค่าภูมิภาคจาก
us-central1
ได้ด้วย
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- ในเครื่องมือสร้าง
beam.Pipeline()
ให้แทนที่InteractiveRunner
ด้วยDataflowRunner
p
เป็นออบเจ็กต์ของไปป์ไลน์จากการสร้างไปป์ไลน์
p = beam.Pipeline(DataflowRunner(), options=options)
- นำการโทรแบบอินเทอร์แอกทีฟออกจากโค้ด เช่น นำ
show()
,collect()
,head()
,show_graph()
และwatch()
ออกจากโค้ด - คุณจะต้องเพิ่มซิงก์เพื่อให้ดูผลการค้นหาได้ ในส่วนก่อนหน้านี้ เราได้แสดงภาพผลลัพธ์ในสมุดบันทึก แต่ในขณะนี้ เรากำลังเรียกใช้งานภายนอกสมุดบันทึกนี้ ซึ่งก็คือใน Dataflow ด้วยเหตุนี้ เราจึงต้องการตําแหน่งภายนอกสําหรับผลการค้นหา ในตัวอย่างนี้ เราจะเขียนผลลัพธ์ลงในไฟล์ข้อความใน GCS (Google Cloud Storage) เนื่องจากนี่เป็นไปป์ไลน์สตรีมมิงที่มีกรอบเวลาของข้อมูล เราจึงต้องสร้างไฟล์ข้อความ 1 ไฟล์ต่อหน้าต่าง โดยให้เพิ่มขั้นตอนต่อไปนี้ลงในไปป์ไลน์ของคุณ
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- เพิ่ม
p.run()
ต่อท้ายโค้ดไปป์ไลน์ - ตอนนี้ให้ตรวจสอบโค้ดสมุดบันทึกเพื่อยืนยันว่าคุณได้รวมการเปลี่ยนแปลงทั้งหมดแล้ว ซึ่งควรมีหน้าตาคล้ายกับด้านล่างนี้
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- เรียกใช้เซลล์
- คุณควรเห็นผลลัพธ์ที่คล้ายกับข้อความต่อไปนี้
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- หากต้องการตรวจสอบว่างานกำลังทำงานอยู่หรือไม่ ให้ไปที่หน้างานสำหรับ Dataflow คุณควรเห็นงานใหม่ในรายการ โดยงานจะใช้เวลาประมาณ 5-10 นาทีเพื่อเริ่มประมวลผลข้อมูล
- เมื่อประมวลผลข้อมูลแล้ว ให้ไปที่ Cloud Storage และไปยังไดเรกทอรีที่ Dataflow จัดเก็บผลลัพธ์ (
output_gcs_location
ที่คุณกำหนด) คุณจะเห็นรายการไฟล์ข้อความ โดยมี 1 ไฟล์ต่อหน้าต่าง - ดาวน์โหลดไฟล์และตรวจสอบเนื้อหา ซึ่งควรมีรายการคำที่จับคู่กับจำนวนของคำนั้น หรือจะใช้อินเทอร์เฟซบรรทัดคำสั่งเพื่อตรวจสอบไฟล์ก็ได้ ซึ่งทำได้โดยการเรียกใช้โค้ดต่อไปนี้ในเซลล์ใหม่ในสมุดบันทึก
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- คุณจะเห็นผลลัพธ์ที่คล้ายกับข้อความต่อไปนี้
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- เท่านี้ก็เรียบร้อย อย่าลืมล้างและหยุดงานที่คุณสร้างขึ้น (ดูขั้นตอนสุดท้ายของ Codelab นี้)
สำหรับตัวอย่างวิธีทำ Conversion นี้ในสมุดบันทึกแบบอินเทอร์แอกทีฟ โปรดดูสมุดบันทึกจำนวนคำของ Dataflow ในอินสแตนซ์สมุดบันทึก
หรือคุณจะส่งออกสมุดบันทึกเป็นสคริปต์สั่งการ แก้ไขไฟล์ .py ที่สร้างขึ้นโดยใช้ขั้นตอนก่อนหน้า จากนั้นทำให้ไปป์ไลน์ใช้งานได้กับบริการ Dataflow
7. กำลังบันทึกสมุดบันทึก
สมุดบันทึกที่คุณสร้างจะบันทึกอยู่ในอินสแตนซ์สมุดบันทึกที่ทำงานอยู่ หากคุณรีเซ็ตหรือปิดอินสแตนซ์สมุดบันทึกในระหว่างการพัฒนา สมุดบันทึกใหม่เหล่านั้นจะยังคงอยู่ ตราบใดที่มีการสร้างสมุดบันทึกภายใต้ไดเรกทอรี /home/jupyter
อย่างไรก็ตาม หากอินสแตนซ์สมุดบันทึกถูกลบ ระบบจะลบสมุดบันทึกดังกล่าวด้วย
หากต้องการเก็บสมุดบันทึกไว้ใช้ในอนาคต ให้ดาวน์โหลดสมุดบันทึกลงในเวิร์กสเตชันของคุณ บันทึกไว้ใน GitHub หรือส่งออกไปยังรูปแบบไฟล์อื่น
8. กำลังล้างข้อมูล
หลังจากใช้อินสแตนซ์สมุดบันทึก Apache Beam เสร็จแล้ว ให้ล้างทรัพยากรที่คุณสร้างบน Google Cloud โดยปิดอินสแตนซ์สมุดบันทึกและหยุดงานสตรีมมิง หากคุณได้เรียกใช้งาน
หรือหากคุณสร้างโปรเจ็กต์เพื่อวัตถุประสงค์เพียงอย่างเดียวของ Codelab นี้ คุณสามารถปิดโปรเจ็กต์ไปเลยก็ได้