1. מבוא
Google Cloud Dataflow
עדכון אחרון: 5 ביולי 2023
מה זה Dataflow?
Dataflow הוא שירות מנוהל לביצוע מגוון רחב של תבניות עיבוד נתונים. בתיעוד באתר הזה מוסבר איך לפרוס צינורות עיבוד נתונים באצווה ובסטרימינג באמצעות Dataflow, כולל הוראות לשימוש בתכונות שירות.
Apache Beam SDK הוא מודל תכנות בקוד פתוח שמאפשר לפתח צינורות עיבוד נתונים באצווה וגם בסטרימינג. את צינורות עיבוד הנתונים שלכם יוצרים באמצעות תוכנית Apache Beam, ולאחר מכן מריצים אותם בשירות Dataflow. מסמכי התיעוד של Apache Beam כוללים מידע מושגים וחומר עזר מפורטים למודל התכנות Apache Beam, לערכות SDK ולעובדים אחרים.
סטרימינג של ניתוח נתונים במהירות
Dataflow להעלאת צינור נתונים בסטרימינג מהיר ופשוט, עם זמן אחזור קצר יותר.
לפשט את הפעולות והניהול
מאפשרים לצוותים להתמקד בתכנות במקום לנהל אשכולות שרתים, כי הגישה של Dataflow ללא שרת (serverless) מסירה מעומסי העבודה של הנדסת מערכות מידע.
הפחתת עלות הבעלות הכוללת
התאמה לעומס (autoscaling) של משאבים עם יכולות עיבוד באצווה שעברו אופטימיזציה לעלות, מאפשרת ל-Dataflow קיבולת כמעט בלתי מוגבלת לנהל עומסי עבודה עונתיים וקוצניים בלי להוציא יותר מדי כסף.
תכונות עיקריות
ניהול משאבים אוטומטי ואיזון עבודה דינמי
Dataflow מאפשר הקצאה וניהול של משאבי עיבוד באופן אוטומטי כדי למזער את זמן האחזור ולמקסם את השימוש, כך שלא תצטרכו להפעיל מכונות או לשמור אותן ידנית. חלוקת העבודה למחיצות היא גם אוטומטית ועברה אופטימיזציה לאיזון מחדש באופן דינמי. אין צורך לרדוף אחרי 'מפתחות חמים'. או לעבד מראש את נתוני הקלט.
התאמה לעומס (autoscaling) לרוחב
התאמה לעומס (autoscaling) של משאבי עובדים באופן אופקי לתפוקה אופטימלית, משפרת את המחיר הכולל לביצועים.
תמחור גמיש לתזמון משאבים לעיבוד ברצף
לצורך עיבוד עם גמישות בזמן תזמון משימות, כמו משימות במהלך לילה, התכונה 'תזמון משאבים גמיש' (FlexRS) מציעה מחיר נמוך יותר לעיבוד ברצף. המשימות הגמישות האלה מוצבות בתור ומבטיחה שהן יאוחזרו לביצוע בתוך חלון זמן של שש שעות.
מה תפעילי במסגרת התהליך הזה
שימוש במכשיר ההרצה האינטראקטיבי Apache Beam עם קובצי notebook של JupyterLab מאפשר לפתח צינורות עיבוד נתונים באופן חזרתי, לבדוק את תרשים צינור עיבוד הנתונים שלכם ולנתח נתוני PCollections ספציפיים בתהליך עבודה של Read-eval-print-loop (REPL). קובצי ה-notebook האלה ב-Apache Beam זמינים באמצעות Vertex AI Workbench, שירות מנוהל שמארח מכונות וירטואליות של notebook שהותקנו מראש עם ה-frameworks של מדעי נתונים ולמידת מכונה.
ה-Codelab הזה מתמקד בפונקציונליות של קובצי notebook של Apache Beam.
מה תלמדו
- איך יוצרים מכונת notebook
- יצירת צינור עיבוד נתונים בסיסי
- קריאת נתונים ממקור בלתי מוגבל
- המחשת הנתונים
- הפעלת משימת Dataflow מה-notebook
- שמירת מסמך notebook
מה צריך להכין
- פרויקט ב-Google Cloud Platform שהחיוב מופעל בו.
- Google Cloud Dataflow ו-Google Cloud PubSub הופעלו.
2. בתהליך ההגדרה
- במסוף Cloud, בדף בחירת הפרויקטים, בוחרים או יוצרים פרויקט ב-Cloud.
אתם צריכים לוודא שממשקי ה-API הבאים מופעלים אצלכם:
- ממשק API של Dataflow
- API של Cloud Pub/Sub
- Compute Engine
- API של Notebooks
אפשר לבדוק את זה דרך ה-API הדף 'שירותים'.
במדריך הזה נקרא נתונים ממינוי Pub/Sub, ולכן מוודאים שלחשבון השירות שמשמש כברירת המחדל של Compute Engine יש את התפקיד 'עריכה', או מקצים לו את התפקיד 'עורך Pub/Sub'.
3. תחילת העבודה עם קובצי notebook של Apache Beam
הפעלת מכונה של Apache Beam notebooks
- מפעילים את Dataflow במסוף:
- בתפריט הימני בוחרים בדף Workbench.
- מוודאים שאתם נמצאים בכרטיסייה notebooks בניהול משתמשים.
- בסרגל הכלים, לוחצים על New Notebook (מחברת חדשה).
- בוחרים באפשרות Apache Beam > ללא מעבדי GPU.
- בדף New notebook, בוחרים רשת משנה ל-VM של ה-notebook ולוחצים על Create.
- כשהקישור הופך לפעיל, לוחצים על Open JupyterLab. Vertex AI Workbench יוצר מכונה notebook חדשה ב-Apache Beam.
4. יצירת צינור עיבוד הנתונים
יצירת מכונה של notebook
מנווטים אל קובץ > חדש > Notebook ובוחרים ליבה (kernel) ב-Apache Beam 2.47 ואילך.
מתחילים להוסיף קוד ל-notebook
- מעתיקים ומדביקים את הקוד מכל קטע בתא חדש ב-notebook.
- הרצת התא
שימוש במכשיר ההרצה האינטראקטיבי Apache Beam עם קובצי notebook של JupyterLab מאפשר לפתח צינורות עיבוד נתונים באופן חזרתי, לבדוק את תרשים צינור עיבוד הנתונים שלכם ולנתח נתוני PCollections ספציפיים בתהליך עבודה של Read-eval-print-loop (REPL).
Apache Beam מותקנת במכונה ב-notebook, לכן צריך לכלול את המודולים interactive_runner
ו-interactive_beam
ב-notebook.
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
אם ה-notebook משתמש בשירותים אחרים של Google, צריך להוסיף את הצהרות הייבוא הבאות:
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
הגדרת אפשרויות האינטראקטיביות
בהמשך מוגדר משך הקלטת הנתונים ל-60 שניות. אם רוצים לבצע איטרציה מהר יותר, אפשר להגדיר משך זמן נמוך יותר, לדוגמה '10 שניות'.
ib.options.recording_duration = '60s'
לאפשרויות אינטראקטיביות נוספות, ראו interactive_beam.options class.
אתחול צינור עיבוד הנתונים באמצעות אובייקט InteractiveRunner
.
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(InteractiveRunner(), options=options)
קריאת הנתונים והצגה חזותית שלהם
בדוגמה הבאה מוצג צינור עיבוד נתונים של Apache Beam שיוצר מינוי לנושא Pub/Sub הנתון וקורא מהמינוי.
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
צינור עיבוד הנתונים סופר את המילים לפי חלונות מהמקור. הוא יוצר עיבוד חלון קבוע שבו כל חלון הוא באורך 10 שניות.
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
לאחר עיבוד הנתונים בחלון נפרד, המילים נספרות לפי חלון.
windowed_word_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
המחשה חזותית של הנתונים
השיטה show()
מציגה באופן חזותי את ה-PCollection שמתקבל ב-notebook.
ib.show(windowed_word_counts, include_window_info=True)
כדי להציג תצוגות חזותיות של הנתונים, צריך להעביר את visualize_data=True
לשיטה show()
. הוספת תא חדש:
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)
אפשר להחיל כמה מסננים על הרכיבים החזותיים שמוצגים. בתצוגה החזותית הבאה אפשר לסנן לפי תווית וציר:
5. שימוש ב-Pandas Dataframe
עוד המחשה שימושית בקובצי notebooks של Apache Beam היא Pandas DataFrame. הדוגמה הבאה ממירה קודם את המילים לאותיות קטנות ואז מחשבת את התדירות של כל מילה.
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
ה-method collect()
מספקת את הפלט ב-Pandas DataFrame.
ib.collect(windowed_lower_word_counts, include_window_info=True)
6. (אופציונלי) הפעלת משימות Dataflow מה-notebook
- כדי להריץ משימות ב-Dataflow, נדרשות הרשאות נוספות. צריך לוודא שלחשבון השירות שמשמש כברירת המחדל של Compute Engine יש את התפקיד 'עריכה', או להקצות לו את תפקידי ה-IAM הבאים:
- אדמין של Dataflow
- עובד Dataflow
- אדמין אחסון, וגם
- משתמש בחשבון שירות (roles/iam.serviceAccountUser)
מידע נוסף על התפקידים זמין במסמכי העזרה.
- (אופציונלי) לפני שמשתמשים ב-notebook כדי להריץ משימות Dataflow, צריך להפעיל מחדש את הליבה, להריץ מחדש את כל התאים ולאמת את הפלט.
- מסירים את הצהרות הייבוא הבאות:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
- מוסיפים את הצהרת הייבוא הבאה:
from apache_beam.runners import DataflowRunner
- מסירים את האפשרות הבאה של משך הקלטה:
ib.options.recording_duration = '60s'
- מוסיפים את הרכיבים הבאים לאפשרויות של צינור עיבוד הנתונים. תצטרכו לשנות את המיקום ב-Cloud Storage כך שיפנה לקטגוריה שכבר נמצאת בבעלותכם, או שאפשר ליצור קטגוריה חדשה למטרה הזו. אפשר גם לשנות את ערך האזור החל מ-
us-central1
.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
- ב-constructor של
beam.Pipeline()
, מחליפים אתInteractiveRunner
ב-DataflowRunner
.p
הוא האובייקט של צינור עיבוד הנתונים מיצירת צינור עיבוד הנתונים שלכם.
p = beam.Pipeline(DataflowRunner(), options=options)
- צריך להסיר את השיחות האינטראקטיביות מהקוד. לדוגמה, צריך להסיר את
show()
,collect()
,head()
,show_graph()
ו-watch()
מהקוד. - כדי לראות תוצאות, עליך להוסיף sink. בקטע הקודם הצגנו את התוצאות ב-notebook, אבל הפעם אנחנו מפעילים את המשימה מחוץ ל-notebook הזה ב-Dataflow. לכן אנחנו צריכים מיקום חיצוני כדי לקבל את התוצאות שלנו. בדוגמה הזו, נכתוב את התוצאות בקובצי טקסט ב-GCS (Google Cloud Storage). מכיוון שמדובר בצינור עיבוד נתונים בסטרימינג, עם סינון נתונים, נרצה ליצור קובץ טקסט אחד בכל חלון. כדי לעשות זאת, מוסיפים את השלבים הבאים לצינור עיבוד הנתונים:
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
- מוסיפים
p.run()
בסוף הקוד של צינור עיבוד הנתונים. - עכשיו בודקים את הקוד של ה-notebook כדי לוודא שהטמעתם את כל השינויים. היא אמורה להיראות כך:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()
# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True
# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'
# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location
# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"
p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
| "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
windowed_words_counts = (windowed_words
| "count" >> beam.combiners.Count.PerElement())
windowed_lower_word_counts = (windowed_words
| "to lower case" >> beam.Map(lambda word: word.lower())
| "count lowered" >> beam.combiners.Count.PerElement())
result = (windowed_lower_word_counts
| "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
| "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
p.run()
- מריצים את התאים.
- הפלט אמור להיראות כך:
<DataflowPipelineResult <Job
clientRequestId: '20230623100011457336-8998'
createTime: '2023-06-23T10:00:33.447347Z'
currentStateTime: '1970-01-01T00:00:00Z'
id: '2023-06-23_03_00_33-11346237320103246437'
location: 'us-central1'
name: 'beamapp-root-0623075553-503897-boh4u4wb'
projectId: 'your-project-id'
stageStates: []
startTime: '2023-06-23T10:00:33.447347Z'
steps: []
tempFiles: []
type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
- כדי לבדוק אם המשימה פועלת, נכנסים לדף המשימות של Dataflow. משרה חדשה אמורה להופיע ברשימה. עיבוד הנתונים יתחיל בתוך 5 עד 10 דקות.
- אחרי עיבוד הנתונים, עוברים אל Cloud Storage ועוברים לספרייה שבה התוצאות נשמרות ב-Dataflow (
output_gcs_location
שהוגדר). תוצג רשימה של קובצי טקסט, עם קובץ אחד בכל חלון. - מורידים את הקובץ ובודקים את התוכן. היא צריכה להכיל את רשימת המילים שהותאמו לספירה שלהן. לחלופין, משתמשים בממשק שורת הפקודה כדי לבדוק את הקבצים. כדי לעשות את זה, מריצים את הפקודה הבאה בתא חדש ב-notebook:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
- הפלט אמור להיראות כך:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
- זהו! לא לשכוח למחוק את המשימה שיצרתם ולהפסיק אותה (ראו את השלב האחרון ב-Codelab הזה).
דוגמה לאופן שבו מבצעים את ההמרה הזו ב-notebook אינטראקטיבי זמינה ב-notebook של Dataflow לספירת מילים במכונה של ה-notebook.
לחלופין, ניתן לייצא את ה-notebook כסקריפט להפעלה, לשנות את קובץ ה- .py שנוצר באמצעות השלבים הקודמים ואז לפרוס את צינור עיבוד הנתונים לשירות Dataflow.
7. מתבצעת שמירה של ה-notebook
מחברות שאתם יוצרים נשמרות באופן מקומי במכונת ה-notebook שפועלת. אם מאפסים או משביתים את המכונה של ה-notebook במהלך הפיתוח, קובצי ה-notebook החדשים יישמרו כל עוד הם נוצרים בספרייה /home/jupyter
. אבל אם מוחקים מכונת notebook, גם ה-notebooks האלה נמחקים.
כדי לשמור את ה-notebooks לשימוש עתידי, אתם יכולים להוריד אותם באופן מקומי לתחנת העבודה, לשמור אותם ב-GitHub או לייצא אותם לפורמט קובץ אחר.
8. מנקה
בסיום השימוש במכונת ה-notebook של Apache Beam, מנקים את המשאבים שיצרתם ב-Google Cloud על ידי כיבוי מכונת ה-notebook והפסקת משימת הסטרימינג, אם הרצתם אותה.
לחלופין, אם יצרתם פרויקט למטרה הבלעדית של ה-Codelab הזה, אפשר גם להשבית את הפרויקט לגמרי.