הרצת משימה של ספירת מילים ב-Hadoop באשכול Dataproc

1. מבוא

תהליכי עבודה הם תרחיש שימוש נפוץ בניתוח נתונים – הם כוללים קליטה, שינוי וניתוח של נתונים כדי למצוא את המידע המשמעותי שגלום בהם. ב-Google Cloud Platform, הכלי לתזמור תהליכי עבודה הוא Cloud Composer, שהוא גרסה מארחת של כלי תהליכי העבודה הפופולרי בקוד פתוח Apache Airflow. בשיעור ה-Lab הזה תשתמשו ב-Cloud Composer כדי ליצור תהליך עבודה פשוט שיוצר אשכול Cloud Dataproc, מנתח אותו באמצעות Cloud Dataproc ו-Apache Hadoop, ואז מוחק את אשכול Cloud Dataproc.

מה זה Cloud Composer?

Cloud Composer הוא שירות מנוהל במלואו לתזמור תהליכי עבודה שמאפשר ליצור, לתזמן ולנטר פייפליינים בין סביבות ענן ומרכזי נתונים מקומיים. ‫Cloud Composer מבוסס על פרויקט הקוד הפתוח הפופולרי Apache Airflow ומופעל באמצעות שפת התכנות Python. הוא לא מחייב התחייבות לשימוש במוצר מסוים וקל לשימוש.

השימוש ב-Cloud Composer במקום במופע מקומי של Apache Airflow מאפשר למשתמשים ליהנות מהיתרונות של Airflow בלי להשקיע מאמץ בהתקנה או בניהול.

מה זה Apache Airflow?

‫Apache Airflow הוא כלי בקוד פתוח שמשמש ליצירה, לתזמון ולניטור של תהליכי עבודה באופן פרוגרמטי. יש כמה מונחים חשובים שקשורים ל-Airflow ויופיעו לאורך שיעור ה-Lab:

  • ‫DAG – DAG (גרף אציקלי מכוון) הוא אוסף של משימות מאורגנות שרוצים לתזמן ולהריץ. גרפים מכוונים מחזוריים (DAG), שנקראים גם תהליכי עבודה, מוגדרים בקובצי Python רגילים
  • אופרטור – אופרטור מתאר משימה יחידה בתהליך עבודה

מה זה Cloud Dataproc?

Cloud Dataproc הוא שירות מנוהל באופן מלא של Apache Spark ו-Apache Hadoop ב-Google Cloud Platform. קל לשלב את Cloud Dataproc עם שירותים אחרים של GCP, וכך לקבל פלטפורמה עוצמתית ומלאה לעיבוד נתונים, לניתוח נתונים וללמידת מכונה.

מה עושים

ב-Codelab הזה נדגים איך ליצור ולהריץ ב-Cloud Composer תהליך עבודה של Apache Airflow שמבצע את המשימות הבאות:

  • יצירת אשכול Cloud Dataproc
  • מריץ משימה של ספירת מילים ב-Apache Hadoop באשכול, ומייצא את התוצאות ל-Cloud Storage
  • מחיקת האשכול

מה תלמדו

  • איך יוצרים ומריצים תהליך עבודה של Apache Airflow ב-Cloud Composer
  • איך משתמשים ב-Cloud Composer וב-Cloud Dataproc כדי להריץ ניתוח על מערך נתונים
  • איך ניגשים לסביבת Cloud Composer דרך Google Cloud Platform Console,‏ Cloud SDK וממשק האינטרנט של Airflow

מה תצטרכו

  • חשבון GCP
  • ידע בסיסי ב-CLI
  • הבנה בסיסית של Python

2. הגדרת GCP

יצירת הפרויקט

בוחרים פרויקט קיים או יוצרים פרויקט חדש ב-Google Cloud Platform.

חשוב לשים לב למזהה הפרויקט, כי תצטרכו להשתמש בו בשלבים הבאים.

אם אתם יוצרים פרויקט חדש, מזהה הפרויקט מופיע ממש מתחת לשם הפרויקט בדף היצירה.

אם כבר יצרתם פרויקט, תוכלו למצוא את המזהה בדף הבית של המסוף בכרטיסיית מידע של הפרויקט.

הפעלת ממשקי ה-API

מפעילים את Cloud Composer API,‏ Cloud Dataproc API ו-Cloud Storage API.אחרי ההפעלה, אפשר להתעלם מהלחצן 'מעבר לפרטי הכניסה' ולהמשיך לשלב הבא במדריך.

יצירת סביבת Composer

יוצרים סביבת Cloud Composer עם ההגדרות הבאות:

  • שם: my-composer-environment
  • מיקום: us-central1
  • אזור: us-central1-a

אפשר להשאיר את כל שאר ההגדרות כברירת מחדל. בתחתית המסך, לוחצים על 'יצירה'.

יצירת קטגוריה של Cloud Storage

בפרויקט, יוצרים קטגוריה של Cloud Storage עם ההגדרות הבאות:

  • שם: <your-project-id>
  • סוג האחסון (storage class) שמוגדר כברירת מחדל: Multi-regional
  • מיקום: ארצות הברית
  • מודל בקרת גישה: פרטני

כשמוכנים, לוחצים על 'יצירה'.

3. הגדרת Apache Airflow

צפייה בפרטים של סביבת Composer

במסוף GCP, פותחים את הדף Environments.

לוחצים על שם הסביבה כדי לראות את הפרטים שלה.

בדף פרטי הסביבה מופיעים פרטים כמו כתובת ה-URL של ממשק האינטרנט של Airflow, מזהה האשכול של Google Kubernetes Engine, השם של קטגוריית Cloud Storage והנתיב של התיקייה ‎ /dags.

ב-Airflow, ‏ DAG (גרף אציקלי מכוון) הוא אוסף של משימות מאורגנות שרוצים לתזמן ולהריץ. גרפים מכוונים מחזוריים (DAG), שנקראים גם תהליכי עבודה, מוגדרים בקובצי Python רגילים. ‫Cloud Composer מתזמן רק את ה-DAG בתיקייה ‎ /dags. התיקייה ‎ /dags נמצאת בקטגוריית Cloud Storage ש-Cloud Composer יוצר באופן אוטומטי כשיוצרים את הסביבה.

הגדרת משתני סביבה של Apache Airflow

משתני Apache Airflow הם מושג ספציפי ל-Airflow, ששונה ממשתני סביבה. בשלב הזה, מגדירים את שלושת משתני Airflow הבאים: gcp_project,‏ gcs_bucket ו-gce_zone.

שימוש בgcloud להגדרת משתנים

קודם כול, פותחים את Cloud Shell, שבו Cloud SDK מותקן בצורה נוחה.

מגדירים את משתנה הסביבה COMPOSER_INSTANCE לשם של סביבת ה-Composer

COMPOSER_INSTANCE=my-composer-environment

כדי להגדיר משתני Airflow באמצעות כלי שורת הפקודה של Google Cloud, משתמשים בפקודה gcloud composer environments run עם פקודת המשנה variables. הפקודה gcloud composer מריצה את פקודת המשנה של Airflow CLI‏ variables. פקודת המשנה מעבירה את הארגומנטים לכלי שורת הפקודה gcloud.

מריצים את הפקודה הזו שלוש פעמים, ומחליפים את המשתנים באלה שרלוונטיים לפרויקט.

מגדירים את gcp_project באמצעות הפקודה הבאה, ומחליפים את <your-project-id> במזהה הפרויקט שרשמתם בשלב 2.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcp_project <your-project-id>

הפלט ייראה בערך כך:

kubeconfig entry generated for us-central1-my-composer-env-123abc-gke.
Executing within the following Kubernetes cluster namespace: composer-1-10-0-airflow-1-10-2-123abc
[2020-04-17 20:42:49,713] {settings.py:176} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=449
[2020-04-17 20:42:50,123] {default_celery.py:90} WARNING - You have configured a result_backend of redis://airflow-redis-service.default.svc.cluste
r.local:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2020-04-17 20:42:50,127] {__init__.py:51} INFO - Using executor CeleryExecutor
[2020-04-17 20:42:50,433] {app.py:52} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2020-04-17 20:42:50,440] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg
[2020-04-17 20:42:50,452] {configuration.py:522} INFO - Reading the config from /etc/airflow/airflow.cfg

מגדירים את gcs_bucket באמצעות הפקודה הבאה, ומחליפים את <your-bucket-name> במזהה הקטגוריה שרשמתם בשלב 2. אם פעלתם לפי ההמלצה שלנו, שם הקטגוריה זהה למזהה הפרויקט. הפלט יהיה דומה לפלט של הפקודה הקודמת.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gcs_bucket gs://<your-bucket-name>

מגדירים את gce_zone באמצעות הפקודה הבאה. הפלט שלכם יהיה דומה לפלט של הפקודות הקודמות.

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --set gce_zone us-central1-a

(אופציונלי) שימוש ב-gcloud כדי להציג משתנה

כדי לראות את הערך של משתנה, מריצים את פקודת המשנה של Airflow CLI‏ variables עם הארגומנט get או משתמשים בממשק המשתמש של Airflow.

לדוגמה:

gcloud composer environments run ${COMPOSER_INSTANCE} \
    --location us-central1 variables -- --get gcs_bucket

אפשר לעשות את זה עם כל אחד משלושת המשתנים שהגדרתם: gcp_project,‏ gcs_bucket ו-gce_zone.

4. תהליך עבודה לדוגמה

בואו נסתכל על הקוד של ה-DAG שבו נשתמש בשלב 5. אל תדאגו לגבי הורדת קבצים בשלב הזה, פשוט תפעלו לפי ההוראות כאן.

יש כאן הרבה פרטים, אז נסביר קצת.

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

נתחיל עם כמה ייבואים של Airflow:

  • airflow.models – מאפשר לנו לגשת לנתונים במסד הנתונים של Airflow וליצור נתונים.
  • airflow.contrib.operators – המקום שבו נמצאים המפעילים מהקהילה. במקרה הזה, אנחנו צריכים את dataproc_operator כדי לגשת אל Cloud Dataproc API.
  • airflow.utils.trigger_rule – להוספת כללי טריגר לאופרטורים. כללי הפעלה מאפשרים שליטה מדויקת בשאלה אם להפעיל אופרטור בהתאם למצב של רכיבי האב שלו.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep

כאן מציינים את המיקום של קובץ הפלט. השורה שחשוב לשים לב אליה היא models.Variable.get('gcs_bucket'), שמושכת את ערך המשתנה gcs_bucket ממסד הנתונים של Airflow.

WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)

input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]
  • WORDCOUNT_JAR – המיקום של קובץ ה-‎ .jar שנריץ בסופו של דבר באשכול Cloud Dataproc. הוא כבר מתארח ב-GCP בשבילכם.
  • input_file – המיקום של הקובץ שמכיל את הנתונים שעליהם תתבצע בסופו של דבר החישוב של משימת Hadoop. בשלב 5, נעלה את הנתונים למיקום הזה ביחד.
  • wordcount_args – ארגומנטים שיועברו לקובץ ה-jar.
yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

הפעולה הזו תחזיר אובייקט datetime ששווה לחצות של היום הקודם. לדוגמה, אם הפקודה הזו מופעלת ב-4 במרץ בשעה 11:00, אובייקט התאריך והשעה ייצג את השעה 00:00 ב-3 במרץ. הבעיה קשורה לאופן שבו Airflow מטפל בתזמון. מידע נוסף בנושא

default_dag_args = {
    'start_date': yesterday,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

צריך לספק את המשתנה default_dag_args בצורת מילון בכל פעם שיוצרים DAG חדש:

  • 'email_on_failure' – מציין אם צריך לשלוח התראות באימייל כשמשימה נכשלת
  • 'email_on_retry' – מציין אם צריך לשלוח התראות באימייל כשמנסים שוב לבצע משימה
  • 'retries' – מציין כמה ניסיונות חוזרים Airflow צריך לבצע במקרה של כשל ב-DAG
  • 'retry_delay' – מציין כמה זמן Airflow צריך לחכות לפני ניסיון חוזר
  • 'project_id' – מציין ל-DAG את מזהה פרויקט GCP שאליו הוא ישויך. המזהה הזה יידרש בהמשך עם Dataproc Operator
with models.DAG(
        'composer_hadoop_tutorial',
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:

השימוש ב-with models.DAG אומר לסקריפט לכלול את כל מה שמתחתיו באותו DAG. אפשר לראות גם שלושה ארגומנטים שמועברים:

  • הראשון, מחרוזת, הוא השם שניתן ל-DAG שאנחנו יוצרים. במקרה הזה, אנחנו משתמשים ב-composer_hadoop_tutorial.
  • schedule_interval – אובייקט datetime.timedelta, שהגדרנו כאן ליום אחד. המשמעות היא שה-DAG הזה ינסה לפעול פעם ביום אחרי השעה 'start_date' שהוגדרה קודם ב-'default_dag_args'
  • default_args – המילון שיצרנו קודם לכן שמכיל את ארגומנטי ברירת המחדל של DAG

יצירת אשכול Dataproc

בשלב הבא, ניצור dataproc_operator.DataprocClusterCreateOperator שיוצר אשכול Cloud Dataproc.

    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

בתוך האופרטור הזה יש כמה ארגומנטים, וכולם, חוץ מהראשון, ספציפיים לאופרטור הזה:

  • task_id – בדיוק כמו ב-BashOperator, זה השם שאנחנו מקצים לאופרטור, שאפשר לראות בממשק המשתמש של Airflow
  • cluster_name – השם שאנחנו מקצים לאשכול Cloud Dataproc. במקרה הזה, קראנו לו composer-hadoop-tutorial-cluster-{{ ds_nodash }} (בתיבת המידע מופיע מידע נוסף אופציונלי)
  • num_workers - מספר העובדים שהוקצו לאשכול Cloud Dataproc
  • zone – האזור הגיאוגרפי שבו אנחנו רוצים שהאשכול יפעל, כפי שנשמר במסד הנתונים של Airflow. הקוד יקרא את המשתנה 'gce_zone' שהגדרנו בשלב 3.
  • master_machine_type - סוג המכונה שאנחנו רוצים להקצות למכונת ה-master של Cloud Dataproc
  • worker_machine_type – סוג המכונה שרוצים להקצות לעובד של Cloud Dataproc

שליחת משימה של Apache Hadoop

הפקודה dataproc_operator.DataProcHadoopOperator מאפשרת לנו לשלוח משימה לאשכול Cloud Dataproc.

    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

אנחנו מספקים כמה פרמטרים:

  • task_id – השם שמוקצה לחלק הזה של ה-DAG
  • main_jar – המיקום של קובץ ה-‎ .jar שאנחנו רוצים להריץ מול האשכול
  • cluster_name – שם האשכול שבו יופעל הג'וב. השם הזה זהה לשם שמופיע באופרטור הקודם.
  • arguments – ארגומנטים שמועברים לקובץ ה-JAR, כמו שהייתם עושים אם הייתם מריצים את קובץ ה-JAR משורת הפקודה

מחיקת האשכול

האופרטור האחרון שניצור הוא dataproc_operator.DataprocClusterDeleteOperator.

    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

כפי שהשם מרמז, האופרטור הזה ימחק אשכול Cloud Dataproc נתון. אפשר לראות כאן שלושה ארגומנטים:

  • task_id – בדיוק כמו ב-BashOperator, זה השם שאנחנו מקצים לאופרטור, שאפשר לראות בממשק המשתמש של Airflow
  • cluster_name – השם שאנחנו מקצים לאשכול Cloud Dataproc. במקרה הזה, קראנו לו composer-hadoop-tutorial-cluster-{{ ds_nodash }} (מידע נוסף אופציונלי מופיע בתיבת המידע אחרי 'יצירת אשכול Dataproc')
  • trigger_rule – הזכרנו בקצרה את כללי הטריגר במהלך הייבוא בתחילת השלב הזה, אבל כאן אנחנו רואים דוגמה לשימוש בהם. כברירת מחדל, אופרטור Airflow לא מופעל אלא אם כל האופרטורים במעלה הזרם שלו הושלמו בהצלחה. כלל הטריגר ALL_DONE דורש רק שכל האופרטורים במעלה הזרם יושלמו, בלי קשר להצלחה שלהם. במקרה הזה, המשמעות היא שגם אם משימת Hadoop נכשלה, אנחנו עדיין רוצים לפרק את האשכול.
  create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

לבסוף, אנחנו רוצים שהאופרטורים האלה יפעלו בסדר מסוים, ואנחנו יכולים לציין את זה באמצעות אופרטורים של הזזה בינארית ב-Python. במקרה הזה, הפונקציה create_dataproc_cluster תמיד תופעל ראשונה, אחריה הפונקציה run_dataproc_hadoop ולבסוף הפונקציה delete_dataproc_cluster.

הקוד המלא נראה כך:

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START composer_hadoop_tutorial]
"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
  created.
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
  See https://cloud.google.com/storage/docs/creating-buckets for creating a
  bucket.
"""

import datetime
import os

from airflow import models
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule

# Output file for Cloud Dataproc job.
output_file = os.path.join(
    models.Variable.get('gcs_bucket'), 'wordcount',
    datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
# Path to Hadoop wordcount example available on every Dataproc cluster.
WORDCOUNT_JAR = (
    'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
)
# Arguments to pass to Cloud Dataproc job.
input_file = 'gs://pub/shakespeare/rose.txt'

wordcount_args = ['wordcount', input_file, output_file]

yesterday = datetime.datetime.combine(
    datetime.datetime.today() - datetime.timedelta(1),
    datetime.datetime.min.time())

default_dag_args = {
    # Setting start date as yesterday starts the DAG immediately when it is
    # detected in the Cloud Storage bucket.
    'start_date': yesterday,
    # To email on failure or retry set 'email' arg to your email and enable
    # emailing here.
    'email_on_failure': False,
    'email_on_retry': False,
    # If a task fails, retry it once after waiting at least 5 minutes
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'project_id': models.Variable.get('gcp_project')
}

# [START composer_hadoop_schedule]
with models.DAG(
        'composer_hadoop_tutorial',
        # Continue to run DAG once per day
        schedule_interval=datetime.timedelta(days=1),
        default_args=default_dag_args) as dag:
    # [END composer_hadoop_schedule]

    # Create a Cloud Dataproc cluster.
    create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
        task_id='create_dataproc_cluster',
        # Give the cluster a unique name by appending the date scheduled.
        # See https://airflow.apache.org/code.html#default-variables
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        num_workers=2,
        zone=models.Variable.get('gce_zone'),
        master_machine_type='n1-standard-1',
        worker_machine_type='n1-standard-1')

    # Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
    # master node.
    run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
        task_id='run_dataproc_hadoop',
        main_jar=WORDCOUNT_JAR,
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        arguments=wordcount_args)

    # Delete Cloud Dataproc cluster.
    delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
        task_id='delete_dataproc_cluster',
        cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}',
        # Setting trigger_rule to ALL_DONE causes the cluster to be deleted
        # even if the Dataproc job fails.
        trigger_rule=trigger_rule.TriggerRule.ALL_DONE)

    # [START composer_hadoop_steps]
    # Define DAG dependencies.
    create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
    # [END composer_hadoop_steps]

# [END composer_hadoop]

5. העלאת קובצי Airflow ל-Cloud Storage

העתקת ה-DAG לתיקייה /dags

  1. קודם כול, פותחים את Cloud Shell, שבו Cloud SDK מותקן בצורה נוחה.
  2. משכפלים את מאגר הדוגמאות של Python ועוברים לספרייה composer/workflows
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git && cd python-docs-samples/composer/workflows
  1. מריצים את הפקודה הבאה כדי להגדיר את השם של תיקיית ה-DAG כמשתנה סביבה
DAGS_FOLDER=$(gcloud composer environments describe ${COMPOSER_INSTANCE} \
--location us-central1 --format="value(config.dagGcsPrefix)")
  1. מריצים את הפקודה הבאה gsutil כדי להעתיק את קוד ההדרכה למקום שבו נוצרה התיקייה ‎ /dags
gsutil cp hadoop_tutorial.py $DAGS_FOLDER

הפלט ייראה בערך כך:

Copying file://hadoop_tutorial.py [Content-Type=text/x-python]...
/ [1 files][  4.1 KiB/  4.1 KiB]
Operation completed over 1 objects/4.1 KiB.

6. שימוש בממשק המשתמש של Airflow

כדי לגשת לממשק האינטרנט של Airflow באמצעות מסוף Google Cloud:

  1. פותחים את הדף סביבות.
  2. בעמודה Airflow webserver של הסביבה, לוחצים על סמל החלון החדש. ממשק המשתמש האינטרנטי של Airflow נפתח בחלון דפדפן חדש.

מידע על ממשק המשתמש של Airflow זמין במאמר בנושא גישה לממשק האינטרנט.

הצגת משתנים

המשתנים שהגדרתם קודם נשמרים בסביבה שלכם. כדי לראות את המשתנים, בוחרים באפשרות Admin > Variables (אדמין > משתנים) בסרגל התפריטים של ממשק המשתמש של Airflow.

הכרטיסייה List (רשימה) נבחרה ומוצגת טבלה עם המפתחות והערכים הבאים: מפתח: gcp_project, ערך: project-id מפתח: gcs_bucket, ערך: gs://bucket-name מפתח: gce_zone, ערך: zone

עיון בהרצות של תרשימי DAG

כשמעלים את קובץ ה-DAG לתיקייה dags ב-Cloud Storage, ‏ Cloud Composer מנתח את הקובץ. אם לא נמצאו שגיאות, שם תהליך העבודה מופיע ברשימת ה-DAG, ותהליך העבודה מתווסף לתור להפעלה מיידית. כדי לראות את ה-DAG, לוחצים על DAGs בחלק העליון של הדף.

84a29c71f20bff98.png

לוחצים על composer_hadoop_tutorial כדי לפתוח את דף הפרטים של ה-DAG. בדף הזה מוצג ייצוג גרפי של משימות ויחסי תלות בתהליך העבודה.

f4f1663c7a37f47c.png

עכשיו, בסרגל הכלים, לוחצים על תצוגת תרשים ומעבירים את העכבר מעל הגרפיקה של כל משימה כדי לראות את הסטטוס שלה. שימו לב: גם המסגרת סביב כל משימה מציינת את הסטטוס (מסגרת ירוקה = פועל; אדום = נכשל וכו').

4c5a0c6fa9f88513.png

כדי להריץ שוב את תהליך העבודה מתצוגת התרשים:

  1. בתצוגת הגרף של ממשק המשתמש של Airflow, לוחצים על הגרפיקה create_dataproc_cluster.
  2. לוחצים על ניקוי כדי לאפס את שלוש המשימות ואז לוחצים על אישור כדי לאשר את הפעולה.

fd1b23b462748f47.png

אפשר גם לבדוק את הסטטוס והתוצאות של composer-hadoop-tutorial זרימת העבודה בדפים הבאים במסוף GCP:

  • אשכולות Cloud Dataproc כדי לעקוב אחרי יצירה ומחיקה של אשכולות. הערה: האשכול שנוצר על ידי תהליך העבודה הוא זמני: הוא קיים רק למשך תהליך העבודה ונמחק כחלק מהמשימה האחרונה בתהליך העבודה.
  • Cloud Dataproc Jobs כדי להציג או לעקוב אחרי משימת ספירת המילים ב-Apache Hadoop. לוחצים על מזהה משימה כדי לראות את הפלט של יומן העבודה.
  • Cloud Storage Browser כדי לראות את התוצאות של ספירת המילים בתיקייה wordcount בקטגוריה של Cloud Storage שיצרתם בשביל ה-codelab הזה.

7. הסרת המשאבים

כדי להימנע מחיובים בחשבון GCP על המשאבים שבהם השתמשתם ב-codelab הזה:

  1. (אופציונלי) כדי לשמור את הנתונים, מורידים את הנתונים מקטגוריה של Cloud Storage של סביבת Cloud Composer ומקטגוריית האחסון שיצרתם בשביל ה-Codelab הזה.
  2. מוחקים את הקטגוריה של Cloud Storage שיצרתם בשביל ה-Codelab הזה.
  3. מחיקת קטגוריית Cloud Storage של הסביבה.
  4. מחיקת סביבת Cloud Composer. שימו לב: מחיקת הסביבה לא מוחקת את דלי האחסון של הסביבה.

אפשר גם למחוק את הפרויקט:

  1. במסוף GCP, נכנסים לדף Projects.
  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על מחיקה.
  3. בתיבה, כותבים את מזהה הפרויקט ולוחצים על Shut down כדי למחוק את הפרויקט.