שימוש ב-notebooks עם Google Cloud Dataflow

1. מבוא

Cloud-Dataflow.png

Google Cloud Dataflow

העדכון האחרון: 5 ביולי 2023

מהו Dataflow?

‫Dataflow הוא שירות מנוהל להרצת מגוון רחב של דפוסי עיבוד נתונים. במסמכים באתר הזה מוסבר איך לפרוס את צינורות עיבוד הנתונים של אצווה וסטרימינג באמצעות Dataflow, כולל הוראות לשימוש בתכונות השירות.

‫Apache Beam SDK הוא מודל תכנות בקוד פתוח שמאפשר לפתח צינורות להעברת נתונים באצווה וצינורות להעברת נתוני סטרימינג. אתם יוצרים את צינורות הנתונים באמצעות תוכנית Apache Beam ומריצים אותם בשירות Dataflow. בתיעוד של Apache Beam אפשר למצוא מידע תיאורטי מפורט וחומר עזר למודל התכנות, ל-SDK ולרכיבי Runner אחרים של Apache Beam.

ניתוח נתונים בזמן אמת במהירות

‫Dataflow מאפשרת פיתוח מהיר ופשוט של פייפליינים בסטרימינג עם חביון נתונים נמוך יותר.

פשטו את התפעול והניהול

הגישה של Dataflow בלי שרת (serverless) מסירה את התקורה התפעולית מעומסי עבודה של הנדסת נתונים, ומאפשרת לצוותים להתמקד בתכנות במקום בניהול של אשכולות שרתים.

הפחתת עלות הבעלות הכוללת

התאמה אוטומטית לעומס של משאבים בשילוב עם יכולות עיבוד ברצף (batch processing) שעברו אופטימיזציה לעלויות, מאפשרות ל-Dataflow להציע קיבולת כמעט בלתי מוגבלת לניהול עומסי עבודה עונתיים ופיקים בעומס, בלי חריגה מהתקציב.

התכונות העיקריות

ניהול אוטומטי של משאבים ואיזון מחדש דינמי של עומסי העבודה

‫Dataflow מבצע הקצאה וניהול אוטומטיים של משאבי עיבוד כדי למזער את זמן האחזור ולמקסם את הניצול, כך שלא תצטרכו להתחיל הרצה של מופעים או להזמין אותם באופן ידני. חלוקת העבודה מתבצעת באופן אוטומטי ועוברת אופטימיזציה כדי לאזן מחדש באופן דינמי עבודה שמתעכבת. אין צורך לחפש 'מקשי קיצור' או לעבד מראש את נתוני הקלט.

התאמה אוטומטית אופקית לעומס

התאמה אוטומטית לעומס אופקית של משאבי עובדים כדי להשיג תפוקה אופטימלית מובילה ליחס טוב יותר בין מחיר לביצועים באופן כללי.

תמחור של תזמון משאבים גמיש לעיבוד באצווה

לעיבוד עם גמישות בזמן תזמון העבודות, כמו עבודות שמתבצעות במהלך הלילה, תזמון משאבים גמיש (FlexRS) מציע מחיר נמוך יותר לעיבוד באצווה. המשימות הגמישות האלה מוצבות בתור, עם הבטחה שהן יאוחזרו לביצוע תוך חלון של שש שעות.

מה יופעל כחלק מהתהליך הזה

שימוש ב-Apache Beam interactive runner עם מחברות JupyterLab מאפשר לפתח צינורות עיבוד נתונים באופן איטרטיבי, לבדוק את הגרף של צינור עיבוד הנתונים ולנתח PCollections בודדים בתהליך עבודה של קריאה-הערכה-הדפסה (REPL). מחברות Apache Beam זמינות דרך Vertex AI Workbench, שירות מנוהל שמארח מכונות וירטואליות של מחברות עם frameworks של מדעי נתונים ולמידת מכונה שהותקנו מראש.

ה-Codelab הזה מתמקד בפונקציונליות שהוצגה במחברות של Apache Beam.

מה תלמדו

  • איך יוצרים מופע של מחברת
  • יצירת פייפליין בסיסי
  • קריאת נתונים ממקור לא מוגבל
  • המחשה חזותית של הנתונים
  • הפעלת משימת Dataflow מ-notebook
  • שמירת Notebook

מה תצטרכו

  • פרויקט ב-Google Cloud Platform עם חיוב מופעל.
  • הפעלתם את Google Cloud Dataflow ואת Google Cloud PubSub.

‫2. תהליך ההגדרה

  1. במסוף Cloud, בדף לבחירת הפרויקט, בוחרים פרויקט בענן או יוצרים פרויקט חדש.

ודאו שממשקי ה-API הבאים מופעלים:

  • Dataflow API
  • Cloud Pub/Sub API
  • Compute Engine
  • Notebooks API

כדי לוודא זאת, אפשר לבדוק בדף'API ושירותים'.

במדריך הזה, נקרא נתונים ממינוי Pub/Sub, ולכן צריך לוודא שלחשבון השירות שמשמש כברירת המחדל של Compute Engine יש את התפקיד 'עורך', או להעניק לו את התפקיד 'עריכה ב-Pub/Sub'.

‫3. תחילת העבודה עם מחברות Apache Beam

הפעלת מכונת notebook של Apache Beam

  1. מפעילים את Dataflow ב-Console:

  1. בתפריט הימני, בוחרים באפשרות סביבת עבודה.
  2. מוודאים שאתם בכרטיסייה notebooks בניהול המשתמשים.
  3. בסרגל הכלים, לוחצים על New Notebook (מחברת חדשה).
  4. בוחרים באפשרות Apache Beam > Without GPUs.
  5. בדף New notebook (מחברת חדשה), בוחרים רשת משנה למכונה הווירטואלית של המחברת ולוחצים על Create (יצירה).
  6. לוחצים על Open JupyterLab כשהקישור הופך לפעיל. ‫Vertex AI Workbench יוצר מכונת notebook חדשה של Apache Beam.

4. יצירת צינור העיבוד

יצירת מופע של מחברת

עוברים אל קובץ > חדש > מחברת ובוחרים ליבה שהיא Apache Beam 2.47 ואילך.

איך מוסיפים קוד ל-Notebook

  • מעתיקים ומדביקים את הקוד מכל קטע בתא חדש ב-Notebook
  • הרצת התא

6bd3dd86cc7cf802.png

שימוש ב-Apache Beam interactive runner עם מחברות JupyterLab מאפשר לפתח צינורות עיבוד נתונים באופן איטרטיבי, לבדוק את הגרף של צינור עיבוד הנתונים ולנתח PCollections בודדים בתהליך עבודה של קריאה-הערכה-הדפסה (REPL).

‫Apache Beam מותקן במכונה ב-notebook, לכן צריך לכלול את המודולים interactive_runner ו-interactive_beam ב-notebook.

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

אם מחברת ה-Notebook משתמשת בשירותים אחרים של Google, מוסיפים את הצהרות הייבוא הבאות:

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

הגדרת אפשרויות אינטראקטיביות

הפקודה הבאה מגדירה את משך איסוף הנתונים ל-60 שניות. כדי לבצע איטרציות מהר יותר, מגדירים משך זמן קצר יותר, למשל '10s'.

ib.options.recording_duration = '60s'

אפשרויות אינטראקטיביות נוספות מפורטות במחלקה interactive_beam.options.

מאתחלים את הצינור באמצעות אובייקט InteractiveRunner.

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(InteractiveRunner(), options=options)

קריאה והצגה חזותית של הנתונים

בדוגמה הבאה מוצג צינור עיבוד נתונים של Apache Beam שיוצר מינוי לנושא Pub/Sub שצוין וקורא מהמינוי.

words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

הצינור סופר את המילים לפי חלונות מהמקור. הוא יוצר חלונות קבועים, וכל חלון נמשך 10 שניות.

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

אחרי שהנתונים מחולקים לחלונות, המילים נספרות לפי חלון.

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

המחשה חזותית של הנתונים

השיטה show() יוצרת ויזואליזציה של ה-PCollection שמתקבל במחברת.

ib.show(windowed_word_counts, include_window_info=True)

השיטה show מציגה PCollection בצורה של טבלה.

כדי להציג את הנתונים בהצגה חזותית, מעבירים את visualize_data=True לשיטה show(). הוספת תא חדש:

ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

אפשר להחיל כמה מסננים על ההדמיות. בתצוגה החזותית הבאה אפשר לסנן לפי תווית וציר:

השיטה show ממחישה PCollection כקבוצה עשירה של רכיבי ממשק משתמש שניתנים לסינון.

5. שימוש ב-Pandas Dataframe

עוד תצוגה חזותית שימושית במחברות Apache Beam היא Pandas DataFrame. בדוגמה הבאה, המילים מומרות קודם לאותיות קטנות, ואז מחושבת התדירות של כל מילה.

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

הפלט של השיטה collect() הוא Pandas DataFrame.

ib.collect(windowed_lower_word_counts, include_window_info=True)

השיטה collect שמייצגת PCollection ב-Pandas DataFrame.

6. (אופציונלי) הפעלת משימות Dataflow מ-notebook

  1. כדי להריץ עבודות ב-Dataflow, צריך הרשאות נוספות. מוודאים שלחשבון השירות שמוגדר כברירת מחדל ב-Compute Engine יש את התפקיד 'עריכה', או מקצים לו את תפקידי ה-IAM הבאים:
  • אדמין של Dataflow
  • Dataflow Worker
  • אדמין לניהול נפח האחסון, ו
  • משתמש בחשבון שירות (roles/iam.serviceAccountUser)

מידע נוסף על תפקידים מופיע במאמרי העזרה.

  1. (אופציונלי) לפני שמשתמשים במחברת כדי להריץ משימות של Dataflow, מפעילים מחדש את הליבה, מריצים מחדש את כל התאים ומאמתים את הפלט.
  2. מסירים את הצהרות הייבוא הבאות:
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
  1. מוסיפים את הצהרת הייבוא הבאה:
from apache_beam.runners import DataflowRunner
  1. להסיר את האפשרות הבאה של משך ההקלטה:
ib.options.recording_duration = '60s'
  1. מוסיפים את האפשרויות הבאות לאפשרויות של צינור העיבוד. תצטרכו לשנות את המיקום של Cloud Storage כך שיצביע על קטגוריה שכבר בבעלותכם, או ליצור קטגוריה חדשה למטרה הזו. אפשר גם לשנות את ערך האזור מ-us-central1.
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
  1. ב-constructor של beam.Pipeline(), מחליפים את InteractiveRunner ב-DataflowRunner. ‫p הוא אובייקט צינור העיבוד שנוצר כשיוצרים את צינור העיבוד.
p = beam.Pipeline(DataflowRunner(), options=options)
  1. מסירים את הקוד של הקריאות האינטראקטיביות. לדוגמה, מסירים את show(), collect(), head(), show_graph() ו-watch() מהקוד.
  2. כדי לראות תוצאות, צריך להוסיף יעד. בקטע הקודם הצגנו את התוצאות ב-notebook, אבל הפעם אנחנו מריצים את העבודה מחוץ ל-notebook הזה – ב-Dataflow. לכן, אנחנו צריכים מיקום חיצוני לתוצאות שלנו. בדוגמה הזו, נכתוב את התוצאות לקובצי טקסט ב-GCS ‏ (Google Cloud Storage). מכיוון שזהו צינור נתונים בסטרימינג עם חלונות נתונים, נרצה ליצור קובץ טקסט אחד לכל חלון. כדי לעשות זאת, מוסיפים את השלבים הבאים לצינור העברת הנתונים:
result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))
  1. מוסיפים p.run() בסוף קוד הצינור.
  2. עכשיו בודקים את הקוד במחברת כדי לוודא שכל השינויים שולבו. הוא אמור להיראות כך:
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
from apache_beam.runners import DataflowRunner

# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Choose a Cloud Storage location.
dataflow_gcs_location = 'gs://<change me>/dataflow'

# Choose a location for your output files
output_gcs_location = '%s/results/' % dataflow_gcs_location


# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



# The Google Cloud PubSub topic for this example.
topic = "projects/pubsub-public-data/topics/shakespeare-kinglear"

p = beam.Pipeline(DataflowRunner(), options=options)
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)
windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

windowed_lower_word_counts = (windowed_words
   | "to lower case" >> beam.Map(lambda word: word.lower())
   | "count lowered" >> beam.combiners.Count.PerElement())

result = (windowed_lower_word_counts
    | "decode words and format" >> beam.Map(lambda x: f"{x[0].decode('utf-8')}: {x[1]}")
    | "write events to file" >> beam.io.fileio.WriteToFiles(path=output_gcs_location, shards=1, max_writers_per_bundle=0))

p.run()
  1. מריצים את התאים.
  2. הפלט שיוצג אמור להיות דומה לזה שמופיע כאן:
<DataflowPipelineResult <Job
 clientRequestId: '20230623100011457336-8998'
 createTime: '2023-06-23T10:00:33.447347Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-06-23_03_00_33-11346237320103246437'
 location: 'us-central1'
 name: 'beamapp-root-0623075553-503897-boh4u4wb'
 projectId: 'your-project-id'
 stageStates: []
 startTime: '2023-06-23T10:00:33.447347Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc7e4084d60>
  1. כדי לוודא שההפעלה של המשימה מתבצעת, עוברים אל הדף Jobs ב-Dataflow. עבודה חדשה אמורה להופיע ברשימה. יעברו בערך 5-10 דקות עד שהעבודה תתחיל לעבד את הנתונים.
  2. אחרי שהנתונים מתחילים לעבור עיבוד, עוברים אל Cloud Storage ומנווטים אל הספרייה שבה Dataflow מאחסן את התוצאות (הספרייה שהגדרתם, output_gcs_location). אמורה להופיע רשימה של קובצי טקסט, קובץ אחד לכל חלון. bfcc5ce9e46a8b14.png
  3. מורידים את הקובץ ובודקים את התוכן. הוא צריך להכיל את רשימת המילים עם מספר המופעים שלהן. אפשר גם להשתמש בממשק שורת הפקודה כדי לבדוק את הקבצים. כדי לעשות זאת, מריצים את הפקודה הבאה בתא חדש ב-Notebook:
! gsutils cat gs://<your-bucket-here>/results/<file-name>
  1. יוצג פלט דומה לזה:

Safer: 1

trust: 1

mercy: 1

harms: 1

far: 2

fear: 1

than: 1

take: 1

me: 1

goneril: 1

still: 1

away: 1

let: 1

too: 2

the: 1

  1. זהו! אל תשכחו לנקות ולהפסיק את העבודה שיצרתם (ראו את השלב האחרון של ה-codelab הזה).

דוגמה לאופן ביצוע ההמרה הזו ב-notebook אינטראקטיבי מופיעה ב-notebook של ספירת מילים ב-Dataflow במופע של ה-notebook.

לחלופין, אפשר לייצא את המחברת כסקריפט שניתן להפעלה, לשנות את קובץ ה-‎ .py שנוצר באמצעות השלבים הקודמים, ואז לפרוס את צינור העיבוד בשירות Dataflow.

7. שמירת ה-notebook

מחברות שיוצרים נשמרות באופן מקומי במופע הפעיל של המחברת. אם מאפסים או משביתים את מופע המחברת במהלך הפיתוח, המחברות החדשות האלה נשמרות כל עוד הן נוצרו בספרייה /home/jupyter. עם זאת, אם מוחקים את מופע ה-Notebook, גם ה-Notebooks האלה נמחקים.

כדי להמשיך להשתמש ב-notebooks, צריך להוריד אותם באופן מקומי לתחנת העבודה, לשמור אותם ב-GitHub או לייצא אותם לפורמט קובץ אחר.

8. סידור וארגון

אחרי שמסיימים להשתמש במכונת מחברת Apache Beam, חשוב להסיר את המשאבים שיצרתם ב-Google Cloud על ידי כיבוי מכונת המחברת והפסקת עבודת הסטרימינג, אם הפעלתם אותה.

לחלופין, אם יצרתם פרויקט רק למטרת ה-Codelab הזה, אתם יכולים גם להשבית את הפרויקט לגמרי.