מבוא ל-Vertex Pipelines

1. סקירה כללית

בשיעור ה-Lab הזה תלמדו איך ליצור ולהריץ צינורות עיבוד נתונים של למידת מכונה באמצעות Vertex Pipelines.

מה לומדים

נסביר לכם איך:

  • שימוש ב-Kubeflow Pipelines SDK כדי לפתח צינורות עיבוד נתונים ניתנים להתאמה של למידת מכונה
  • יצירה והפעלה של צינור עיבוד נתונים ראשוני בן 3 שלבים שמקבל קלט טקסט
  • יצירה והפעלה של צינור עיבוד נתונים לאימון, הערכה ופריסה של מודל סיווג AutoML
  • להשתמש ברכיבים מוכנים מראש לאינטראקציה עם שירותי Vertex AI, שמסופקים דרך הספרייה google_cloud_pipeline_components
  • תזמון משימה של צינור עיבוד נתונים באמצעות Cloud Scheduler

העלות הכוללת של הפעלת שיעור ה-Lab הזה ב-Google Cloud היא כ-25$.

2. מבוא ל-Vertex AI

בשיעור ה-Lab הזה נעשה שימוש במוצר ה-AI החדש ביותר שזמין ב-Google Cloud. Vertex AI משלב את הצעות למידת המכונה ב-Google Cloud ליצירת חוויית פיתוח חלקה. בעבר, ניתן היה לגשת למודלים שעברו אימון באמצעות AutoML ומודלים בהתאמה אישית דרך שירותים נפרדים. המוצר החדש משלב את כל ממשקי ה-API האלה בממשק API אחד, לצד מוצרים חדשים אחרים. תוכלו גם להעביר פרויקטים קיימים ל-Vertex AI.

בנוסף לאימון מודלים ולשירותי פריסה, Vertex AI כולל גם מגוון מוצרי MLOps, כולל Vertex Pipelines (המוקד של שיעור ה-Lab הזה), Model Monitoring, Feature Store ועוד. אתם יכולים לראות את כל המוצרים של Vertex AI בתרשים הבא.

סקירה כללית של המוצר Vertex

יש לך משוב? אפשר למצוא אותו בדף התמיכה.

למה צינורות עיבוד נתונים של למידת מכונה שימושיים?

לפני שנצלול לעומק, נסביר למה כדאי להשתמש בצינור עיבוד נתונים. נניח שאתם מפתחים תהליך עבודה של למידת מכונה שכולל עיבוד נתונים, אימון מודל, כוונון היפר-פרמטרים, הערכה ופריסת מודלים. לכל אחד מהשלבים האלה עשויים להיות יחסי תלות שונים, ואם תתייחסו לתהליך העבודה כולו כמונולית, הדבר עלול להיות קשה יותר. כשתתחילו לבצע התאמה לעומס של תהליך למידת המכונה (ML), יכול להיות שתרצו לשתף עם אחרים בצוות את תהליך העבודה של למידת המכונה כדי שהם יוכלו להריץ אותו ולתרום קוד. ללא תהליך מהימן וניתן לשחזור, הדבר עלול להיות קשה. בצינורות עיבוד נתונים, כל שלב בתהליך למידת המכונה הוא קונטיינר משלו. כך תוכלו לפתח שלבים באופן עצמאי ולעקוב אחרי הקלט והפלט מכל שלב בדרך של שחזור. תוכלו גם לתזמן או להפעיל הפעלות של צינור עיבוד הנתונים על סמך אירועים אחרים בסביבת הענן שלכם, כמו הפעלת צינור עיבוד נתונים כשיש נתוני אימון חדשים.

The tl;dr: צינורות עיבוד נתונים עוזרים לכם לבצע אוטומציה ולשחזר את תהליך העבודה שלכם בלמידת מכונה.

3. הגדרה של סביבת הענן

כדי להריץ את ה-Codelab הזה צריך פרויקט ב-Google Cloud Platform שהחיוב מופעל בו. כדי ליצור פרויקט, יש לפעול לפי ההוראות האלה.

שלב 1: הפעלת Cloud Shell

בשיעור ה-Lab הזה תלמדו במסגרת סשן של Cloud Shell, שהוא תרגום פקודות שמתארח במכונה וירטואלית שרצה בענן של Google. באותה מידה תוכלו להריץ את הקטע הזה באופן מקומי במחשב שלכם, אבל השימוש ב-Cloud Shell מאפשר לכולם ליהנות מחוויה שניתן לשחזר בסביבה עקבית. אחרי שיעור ה-Lab, אתם יכולים לנסות שוב את הקטע הזה במחשב.

מתן הרשאה ל-Cloud Shell

הפעלת Cloud Shell

בצד שמאל למעלה ב-Cloud Console, לוחצים על הלחצן שלמטה כדי Activate Cloud Shell (הפעלה של Cloud Shell):

הפעלת Cloud Shell

אם לא הפעלתם את Cloud Shell בעבר, יוצג לכם מסך ביניים (בחלק הנגלל) שמתאר מהו. במקרה כזה, לוחצים על המשך (וזה לא יקרה שוב). כך נראה המסך החד-פעמי:

הגדרת Cloud Shell

ההקצאה וההתחברות ל-Cloud Shell נמשכת כמה דקות.

אתחול של Cloud Shell

במכונה הווירטואלית הזו משולבת כל כלי הפיתוח שדרושים לכם. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, מה שמשפר משמעותית את ביצועי הרשת והאימות. אם לא את כולן, ניתן לבצע חלק גדול מהעבודה ב-Codelab הזה באמצעות דפדפן או Chromebook.

אחרי ההתחברות ל-Cloud Shell, אתם אמורים לראות שכבר בוצע אימות ושהפרויקט כבר מוגדר למזהה הפרויקט שלכם.

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהאימות בוצע:

gcloud auth list

אתם אמורים לראות משהו כזה בפלט הפקודה:

הפלט של Cloud Shell

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהפקודה ב-gcloud יודעת על הפרויקט שלכם:

gcloud config list project

פלט הפקודה

[core]
project = <PROJECT_ID>

אם הוא לא משויך, תוכלו להגדיר אותו באמצעות הפקודה הבאה:

gcloud config set project <PROJECT_ID>

פלט הפקודה

Updated property [core/project].

ב-Cloud Shell יש כמה משתני סביבה, כולל GOOGLE_CLOUD_PROJECT שמכיל את השם של הפרויקט הנוכחי שלנו ב-Cloud. נשתמש בו במקומות שונים במהלך שיעור ה-Lab הזה. כדי לראות את זה, מריצים את:

echo $GOOGLE_CLOUD_PROJECT

שלב 2: הפעלת ממשקי API

בשלבים הבאים תראו איפה השירותים האלה נחוצים (ולמה), אבל בינתיים, מריצים את הפקודה הזו כדי לתת לפרויקט גישה לשירותים Compute Engine, Container Registry ו-Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

אמורה ליצור מסר מוצלח שדומה להודעה הזו:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

שלב 3: יצירת קטגוריה של Cloud Storage

כדי להריץ משימת אימון על Vertex AI, אנחנו זקוקים לקטגוריית אחסון לאחסון נכסי המודלים שנשמרו. הקטגוריה צריכה להיות אזורית. אנחנו משתמשים כאן ב-us-central, אבל אפשר להשתמש באזור אחר (פשוט צריך להחליף אותו בשיעור ה-Lab הזה). אם כבר יש לכם קטגוריה, אתם יכולים לדלג על השלב הזה.

כדי ליצור קטגוריה, מריצים את הפקודות הבאות בטרמינל של Cloud Shell:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

בשלב הבא ניתן לחשבון שירות המחשוב שלנו גישה לקטגוריה הזו. כך תבטיחו של-Vertex Pipelines יהיו ההרשאות הדרושות לכתיבת קבצים בקטגוריה הזו. מריצים את הפקודה הבאה כדי להוסיף את ההרשאה הזו:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

שלב 4: יצירת מכונה של Vertex AI Workbench

בקטע Vertex AI במסוף Cloud, לוחצים על Workbench:

תפריט Vertex AI

משם, בתוך notebooks בניהול המשתמשים, לוחצים על New Notebook:

יצירת notebook חדש

לאחר מכן בוחרים בסוג המכונה TensorFlow Enterprise 2.3 (עם LTS) ללא GPUs:

מכונת TFE

משתמשים באפשרויות ברירת המחדל ואז לוחצים על יצירה.

שלב 5: פותחים את ה-notebook

אחרי שיוצרים את המכונה, בוחרים באפשרות Open JupyterLab:

פתיחת ה-notebook

4. הגדרה של Vertex Pipelines

יש כמה ספריות נוספות שנצטרך להתקין כדי להשתמש ב-Vertex Pipelines:

  • Kubeflow Pipelines: זה ה-SDK שבו נשתמש כדי לפתח את צינור עיבוד הנתונים שלנו. ב-Vertex Pipelines יש תמיכה בצינורות עיבוד נתונים שפועלים ב-Kubeflow Pipelines וגם ב-TFX.
  • רכיבי Google Cloud Pipeline: הספרייה הזו מספקת רכיבים מוכנים מראש כדי להקל על האינטראקציה עם שירותי Vertex AI מהשלבים של צינור עיבוד הנתונים.

שלב 1: יצירת notebook ב-Python והתקנת ספריות

קודם כול, בתפריט מרכז האפליקציות במכונה של ה-Notebook, יוצרים notebook על ידי בחירה ב-Python 3:

יצירת notebook ב-Python3

כדי לגשת לתפריט של מרכז האפליקציות, לוחצים על הסימן + בפינה הימנית העליונה של מכונת ה-notebook.

כדי להתקין את שני השירותים שבהם נשתמש בשיעור ה-Lab הזה, קודם צריך להגדיר את דגל המשתמש בתא ב-notebook:

USER_FLAG = "--user"

לאחר מכן מריצים את הפקודה הבאה מה-notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

לאחר התקנת החבילות, תצטרכו להפעיל מחדש את הליבה:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

לבסוף, מוודאים שהתקנתם את החבילות בצורה תקינה. הגרסה של KFP SDK צריכה להיות >=1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

שלב 2: מגדירים את מזהה הפרויקט ואת הקטגוריה

בשיעור ה-Lab הזה תפנו למזהה הפרויקט ב-Cloud ולקטגוריה שיצרתם קודם. בשלב הבא ניצור משתנים לכל אחד מהמשתנים האלה.

אם אתם לא יודעים מהו מזהה הפרויקט, תוכלו לבצע את הפעולות הבאות כדי לאתר אותו:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

אם לא, יש להגדיר אותו כאן:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

לאחר מכן יוצרים משתנה לאחסון שם הקטגוריה. אם יצרתם אותו בשיעור ה-Lab הזה, הפעולות הבאות יעבדו. אחרת, צריך להגדיר את האפשרות הזו באופן ידני:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

שלב 3: ייבוא ספריות

צריך להוסיף את הפריטים הבאים כדי לייבא את הספריות שבהן נשתמש ב-Codelab הזה:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

שלב 4: מגדירים קבועים

הדבר האחרון שאנחנו צריכים לעשות לפני פיתוח צינור עיבוד הנתונים שלנו הוא להגדיר משתנים קבועים. PIPELINE_ROOT הוא הנתיב של Cloud Storage שבו ייכתבו הארטיפקטים שנוצרו על ידי צינור עיבוד הנתונים שלנו. כאן אנחנו משתמשים באזור us-central1, אבל אם השתמשתם באזור אחר כשיצרתם את הקטגוריה, צריך לעדכן את המשתנה REGION בקוד הבא:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

אחרי שהרצתם את הקוד שלמעלה, אתם אמורים לראות הדפסה של ספריית הבסיס של צינור עיבוד הנתונים. זהו המיקום ב-Cloud Storage שבו ייכתבו ארטיפקטים מצינור עיבוד הנתונים שלכם. זה יהיה בפורמט gs://YOUR-BUCKET-NAME/pipeline_root/

5. יצירת צינור עיבוד הנתונים הראשון

כדי להכיר את אופן הפעולה של Vertex Pipelines, קודם ניצור צינור עיבוד נתונים קצר באמצעות KFP SDK. צינור עיבוד הנתונים הזה לא עושה שום דבר שקשור ללמידת מכונה (אל דאגה, נגיע לשם!), אנחנו משתמשים בו כדי ללמד אתכם:

  • איך יוצרים רכיבים מותאמים אישית ב-KFP SDK
  • איך להריץ צינור עיבוד נתונים ב-Vertex Pipelines ולעקוב אחריו

אנחנו ניצור צינור עיבוד נתונים שידפיס משפט באמצעות שני פלטים: שם המוצר ותיאור אמוג'י. צינור עיבוד הנתונים הזה יכלול שלושה רכיבים:

  • product_name: הרכיב הזה יקבל שם מוצר (או כל שם עצם שרוצים באמת) כקלט, ויחזיר את המחרוזת הזו כפלט
  • emoji: הרכיב הזה יקבל את התיאור הטקסט של האמוג'י וימיר אותו לאמוג'י. לדוגמה, קוד הטקסט של ✨ הוא 'ניצוצות'. הרכיב הזה משתמש בספריית אמוג'י כדי להראות לך איך לנהל יחסי תלות חיצוניים בצינור עיבוד הנתונים שלך
  • build_sentence: הרכיב הסופי ישתמש בפלט של השניים הקודמים כדי לבנות משפט עם האמוג'י. לדוגמה, הפלט שתתקבל עשוי להיות 'Vertex Pipelines is ✨'.

קדימה, מתחילים לתכנת!

שלב 1: יצירת רכיב שמבוסס על פונקציית Python

באמצעות KFP SDK אנחנו יכולים ליצור רכיבים על סמך פונקציות Python. נשתמש בו ל-3 הרכיבים בצינור עיבוד הנתונים הראשון שלנו. קודם כל ניצור את הרכיב product_name, שמקבל מחרוזת כקלט ומחזיר אותה. מוסיפים את הפרטים הבאים למחברת:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

בואו נבחן מקרוב את התחביר כאן:

  • כלי העיצוב @component מרכיב את הפונקציה הזו לרכיב כשצינור עיבוד הנתונים פועל. הקוד הזה ישמש אותך בכל פעם שכותבים רכיב מותאם אישית.
  • הפרמטר base_image מציין את קובץ האימג' בקונטיינר שבו הרכיב הזה ישתמש.
  • הפרמטר output_component_file הוא אופציונלי ומציין את קובץ ה-yaml שבו צריך לכתוב את הרכיב שעבר הידור. אחרי שמריצים את התא, הקובץ אמור להופיע במכונה של ה-notebook. כדי לשתף את הרכיב הזה עם מישהו, אפשר לשלוח לו את קובץ ה-yaml שנוצר ולבקש ממנו לטעון אותו עם הפרטים הבאים:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • הערך -> str אחרי הגדרת הפונקציה מציין את סוג הפלט של הרכיב הזה.

שלב 2: יצירת שני רכיבים נוספים

כדי להשלים את צינור עיבוד הנתונים שלנו, ניצור שני רכיבים נוספים. המילה הראשונה שנגדיר מקבלת מחרוזת כקלט, וממירה את המחרוזת הזו לאמוג'י המתאים, אם יש כזה. הפונקציה מחזירה tuple עם טקסט הקלט שעובר, ואת האמוג'י שמתקבל:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

הרכיב הזה קצת יותר מורכב מהרכיב הקודם. הנה סקירה מפורטת:

  • הפרמטר packages_to_install מציין לרכיב את יחסי התלות של ספריות חיצוניות בקונטיינר הזה. במקרה הזה, אנחנו משתמשים בספרייה בשם אמוג'י.
  • הרכיב הזה מחזיר NamedTuple בשם Outputs. שימו לב שלכל אחת מהמחרוזות ב-tuple יש מפתחות: emoji_text ו-emoji. נשתמש בהם ברכיב הבא כדי לגשת לפלט.

הרכיב האחרון בצינור עיבוד הנתונים הזה ייצור את הפלט של השניים הראשונים וישלב אותם כדי להחזיר מחרוזת:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

ייתכן שתהיתם: איך הרכיב הזה יודע להשתמש בפלט מהשלבים הקודמים שהגדרתם? שאלה טובה! נחבר את הכל יחד בשלב הבא.

שלב 3: שילוב הרכיבים לצינור עיבוד נתונים

הגדרות הרכיבים שהגדרנו למעלה יצרו פונקציות מפעל, שניתן להשתמש בהן בהגדרת צינור עיבוד נתונים כדי ליצור שלבים. כדי להגדיר צינור עיבוד נתונים, צריך להשתמש במעצב @pipeline, לתת לצינור עיבוד הנתונים שם ותיאור ולספק את הנתיב ברמה הבסיסית (root) שבו צריך לכתוב את הארטיפקטים של צינור עיבוד הנתונים. המונח 'ארטיפקטים' מתייחס לכל קובץ פלט שנוצר על ידי צינור עיבוד הנתונים שלכם. צינור עיבוד הנתונים ההתחלתי הזה לא יוצר צינורות עיבוד נתונים, אבל צינור עיבוד הנתונים הבא שלנו כן יפיק אותם.

בקטע הקוד הבא אנחנו מגדירים את הפונקציה intro_pipeline. כאן אנחנו מציינים את הקלטים לשלבים הראשוניים בצינור עיבוד הנתונים, ואת האופן שבו השלבים מתחברים זה לזה:

  • product_task משתמש בשם מוצר כקלט. כאן אנחנו מעבירים את Vertex Pipelines אבל תוכלו לשנות את זה לכל מה שתרצו.
  • emoji_task מקבל את קוד הטקסט של אמוג'י כקלט. אפשר גם לשנות את הבחירה בכל דרך שתרצו. לדוגמה, "party_face" מתייחס לאמוג'י 🥳 חשוב לשים לב שגם ברכיב הזה וגם ברכיב product_task אין שלבים שמזינים בהם את הקלט, אנחנו מציינים את הקלט עבורם באופן ידני כשאנחנו מגדירים את צינור עיבוד הנתונים.
  • בשלב האחרון בצינור עיבוד הנתונים שלנו – consumer_task יש שלושה פרמטרים לקלט:
    • הפלט של product_task. מכיוון שהשלב הזה מייצר רק פלט אחד, אנחנו יכולים להפנות אליו דרך product_task.output.
    • הפלט של emoji של השלב emoji_task. ניתן לעיין ברכיב emoji שהוגדר למעלה, במקום שבו קראנו לפרמטרים של הפלט.
    • באופן דומה, הפלט של emoji_text בעל השם מהרכיב emoji. אם צינור עיבוד הנתונים שלנו מועבר עם טקסט שלא תואם לאמוג'י, אנחנו נשתמש בטקסט הזה כדי ליצור משפט.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

שלב 4: הידור והפעלה של צינור עיבוד הנתונים

אחרי הגדרת צינור עיבוד הנתונים, אתם מוכנים להדר אותו. הקוד הבא ייצור קובץ JSON שבו תשתמשו כדי להפעיל את צינור עיבוד הנתונים:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

בשלב הבא יוצרים משתנה TIMESTAMP. נשתמש בכתובת הזאת במזהה המשימה שלנו:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

לאחר מכן מגדירים את המשימה של צינור עיבוד הנתונים:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

לבסוף, מריצים את המשימה כדי ליצור הפעלה חדשה של צינור עיבוד הנתונים:

job.submit()

אחרי שמריצים את התא הזה, אמורים להופיע יומנים עם קישור לצפייה בצינור עיבוד הנתונים שפועל במסוף:

יומני משימות של צינור עיבוד נתונים

עוברים לקישור הזה. צינור עיבוד הנתונים שלכם אמור להיראות כך בסיום:

צינור עיבוד הנתונים ההתחלתי הושלם

הרצה של צינור עיבוד הנתונים הזה תימשך 5-6 דקות. בסיום, אפשר ללחוץ על הרכיב build-sentence כדי לראות את הפלט הסופי:

פלט צינור עיבוד הנתונים של המבוא

עכשיו, אחרי שאתם מכירים את אופן הפעולה של KFP SDK ו-Vertex Pipelines, אתם מוכנים לפתח צינור עיבוד נתונים שיוצר ופורס מודל למידת מכונה באמצעות שירותי Vertex AI אחרים. קדימה, מתחילים!

6. יצירת צינור עיבוד נתונים של למידת מכונה מקצה לקצה

זה הזמן לפתח את צינור עיבוד הנתונים הראשון שלכם ללמידת מכונה. בצינור עיבוד הנתונים הזה נשתמש במערך הנתונים של שעועית אמריקאית של UCI, מהסוגים הבאים: KOKLU, M. ו-OZKAN, I.A, (2020), "סיווג רב-סיווגי של שעועית יבשה באמצעות ראייה ממוחשבת וטכניקות למידת מכונה".בליחולים, 174, 105507. DOI.

זה מערך נתונים טבלאי, ובצינור עיבוד הנתונים שלנו נשתמש במערך הנתונים כדי לאמן, להעריך ולפרוס מודל AutoML שמסווג שעועית לאחד מ-7 סוגים על סמך המאפיינים שלה.

צינור עיבוד הנתונים הזה:

  • יצירת מערך נתונים ב-
  • אימון מודל סיווג טבלאי באמצעות AutoML
  • קבלת מדדי הערכה של המודל הזה
  • על סמך מדדי ההערכה, מחליטים אם לפרוס את המודל באמצעות לוגיקה מותנית ב-Vertex Pipelines
  • פריסת המודל בנקודת קצה (endpoint) באמצעות Vertex Prediction

כל אחד מהשלבים המפורטים יהיה רכיב. רוב השלבים של צינור עיבוד הנתונים יכללו רכיבים מוכנים מראש לשירותי Vertex AI דרך ספריית google_cloud_pipeline_components שייבאנו קודם ב-Codelab הזה. בקטע הזה נגדיר תחילה רכיב מותאם אישית אחד. לאחר מכן נגדיר את שאר השלבים של צינור עיבוד הנתונים באמצעות רכיבים מוכנים מראש. בעזרת רכיבים מובנים מראש קל יותר לגשת לשירותי Vertex AI כמו אימון מודלים ופריסה.

שלב 1: רכיב מותאם אישית להערכת המודל

הרכיב המותאם אישית שנגדיר ישמש בסוף צינור עיבוד הנתונים שלנו אחרי שאימון המודל יסתיים. הרכיב הזה יבצע מספר פעולות:

  • קבלת מדדי ההערכה ממודל הסיווג המאומן של AutoML
  • תנתח את המדדים ותעבד אותם בממשק המשתמש של Vertex Pipelines
  • להשוות את המדדים לערך סף כדי לקבוע אם צריך לפרוס את המודל.

לפני שמגדירים את הרכיב, צריך להבין את הפרמטרים של הקלט והפלט שלו. כקלט, צינור עיבוד הנתונים הזה לוקח חלק מהמטא-נתונים של הפרויקט שלנו ב-Cloud, את המודל המאומן שמתקבל (בהמשך נגדיר את הרכיב הזה), מדדי ההערכה של המודל וthresholds_dict_str. thresholds_dict_str הוא משהו שנגדיר כשנריץ את צינור עיבוד הנתונים שלנו. במקרה של מודל הסיווג הזה, זה יהיה השטח מתחת לערך של עקומת ה-ROC שעבורו צריך לפרוס את המודל. לדוגמה, אם נעביר ערך של 0.95, המשמעות היא שאנחנו רוצים שצינור עיבוד הנתונים שלנו יפרוס את המודל רק אם המדד הזה גבוה מ-95%.

רכיב ההערכה שלנו מחזיר מחרוזת שמציינת אם צריך לפרוס את המודל. מוסיפים את הטקסט הבא לתא ה-notebook כדי ליצור את הרכיב המותאם אישית הזה:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

שלב 2: הוספת רכיבים מוכנים מראש ב-Google Cloud

בשלב הזה נגדיר את שאר רכיבי צינור עיבוד הנתונים שלנו ונראה איך כולם משתלבים יחד. קודם כול, מגדירים את השם המוצג של הרצת צינור עיבוד הנתונים באמצעות חותמת זמן:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

לאחר מכן מעתיקים את הטקסט הבא לתא notebook חדש:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

בואו נראה מה קורה בקוד הזה:

  • קודם כל, בדיוק כמו בצינור עיבוד הנתונים הקודם, אנחנו מגדירים את הפרמטרים של הקלט לצינור עיבוד הנתונים הזה. אנחנו צריכים להגדיר אותם ידנית כי הם לא תלויים בפלט של שלבים אחרים בצינור עיבוד הנתונים.
  • בשאר צינורות עיבוד הנתונים משתמשים בכמה רכיבים מוכנים מראש לאינטראקציה עם שירותי Vertex AI:
    • TabularDatasetCreateOp יוצר מערך נתונים טבלאי ב-Vertex AI על סמך מקור של מערך נתונים ב-Cloud Storage או ב-BigQuery. בצינור עיבוד הנתונים הזה, הנתונים מועברים דרך כתובת URL של טבלה ב-BigQuery
    • AutoMLTabularTrainingJobRunOp מתחיל משימת אימון של AutoML למערך נתונים של טבלה. אנחנו מעבירים לרכיב הזה כמה פרמטרים של הגדרה, כולל סוג המודל (במקרה הזה, סיווג), נתונים מסוימים בעמודות, כמה זמן נרצה להריץ את האימון וסמן למערך הנתונים. שימו לב שכדי להעביר את מערך הנתונים לרכיב הזה, אנחנו מספקים את הפלט של הרכיב הקודם דרך dataset_create_op.outputs["dataset"]
    • EndpointCreateOp יוצרת נקודת קצה ב-Vertex AI. נקודת הקצה שנוצרה מהשלב הזה תועבר כקלט לרכיב הבא
    • ModelDeployOp פורס מודל נתון בנקודת קצה ב-Vertex AI. במקרה כזה, נשתמש בנקודת הקצה שנוצרה מהשלב הקודם. יש אפשרויות הגדרה נוספות, אבל כאן אנחנו מסבירים את הסוג והמודל של מכונת נקודות הקצה שרוצים לפרוס. אנחנו מעבירים את המודל באמצעות גישה לפלט של שלב האימון בצינור עיבוד הנתונים שלנו
  • צינור עיבוד הנתונים הזה משתמש גם בלוגיקה מותנית, תכונה של Vertex Pipelines שמאפשרת להגדיר תנאי עם הסתעפויות שונות בהתאם לתוצאה של התנאי הזה. חשוב לזכור שכשהגדרתנו את צינור עיבוד הנתונים שלנו, העברנו פרמטר thresholds_dict_str. זה סף הדיוק שבו אנחנו משתמשים כדי לקבוע אם לפרוס את המודל שלנו בנקודת קצה. כדי ליישם את האפשרות הזו, אנחנו משתמשים במחלקה Condition מה-KFP SDK. התנאי שאנחנו מעבירים הוא הפלט של רכיב ההערכה בהתאמה אישית שהגדרנו קודם לכן ב-Codelab הזה. אם התנאי הזה מתקיים, צינור עיבוד הנתונים ימשיך להפעיל את הרכיב deploy_op. אם הדיוק לא עומד בסף המוגדר מראש שלנו, צינור עיבוד הנתונים יעצור כאן ולא יפרוס מודל.

שלב 3: הידור והפעלה של צינור עיבוד הנתונים של למידת המכונה מקצה לקצה

אחרי שמגדירים את צינור עיבוד הנתונים המלא שלנו, זה הזמן להדר אותו:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

בשלב הבא מגדירים את המשימה:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

לבסוף, מריצים את המשימה:

ml_pipeline_job.submit()

כדי לראות את צינור עיבוד הנתונים במסוף, צריך לעבור לקישור שמוצג ביומנים אחרי הרצת התא שלמעלה. הרצת צינור עיבוד הנתונים הזה תימשך קצת יותר משעה. רוב הזמן מושקע בשלב האימון של AutoML. צינור עיבוד הנתונים השלם ייראה כך:

צינור עיבוד הנתונים של AutoML הושלם

אם מחליפים את המצב של 'הרחבת פריטי מידע שנוצרו בתהליך הפיתוח (Artifact)' למעלה, אפשר לראות את הפרטים על פריטי המידע שנוצרו בתהליך הפיתוח (Artifact) שנוצרו מצינור עיבוד הנתונים. לדוגמה, אם לוחצים על הארטיפקט dataset, מוצגים פרטים על מערך הנתונים של Vertex AI שנוצר. אפשר ללחוץ על הקישור כאן כדי לעבור לדף של מערך הנתונים:

מערך נתונים של צינור עיבוד נתונים

באופן דומה, כדי לראות את הרכיבים החזותיים של המדדים שמתקבלים מרכיב ההערכה המותאם אישית שלנו, לוחצים על פריט המידע שנוצר בתהליך הפיתוח (Artifact) שנקרא metricsc. בצד שמאל של מרכז הבקרה, תוצג מטריצת הבלבול של המודל הזה:

המחשה חזותית של מדדים

כדי לראות את המודל ונקודת הקצה שנוצרו מההרצה של צינור עיבוד הנתונים הזה, עוברים לקטע מודלים ולוחצים על המודל automl-beans. כאן אתם אמורים לראות שהמודל הבא נפרס בנקודת קצה:

נקודת קצה של מודל

אפשר לגשת לדף הזה גם בלחיצה על פריט המידע שנוצר בתהליך הפיתוח (Artifact) של נקודת הקצה בתרשים צינור עיבוד הנתונים.

בנוסף להסתכלות על תרשים צינור עיבוד הנתונים במסוף, אפשר גם להשתמש ב-Vertex Pipelines למעקב אחרי שושלת. במעקב אחרי שושלת, פירוש הדבר הוא ארטיפקטים של מעקב שנוצרו בצינור עיבוד הנתונים שלכם. המידע הזה יכול לעזור לנו להבין איפה נוצרו ארטיפקטים ואיך משתמשים בהם בתהליך העבודה של למידת מכונה. לדוגמה, כדי לראות את מעקב ההשתלשלות של מערך הנתונים שנוצר בצינור עיבוד הנתונים הזה, לוחצים על הארטיפקט של מערך הנתונים ואז על View Lineage:

הצגת השושלת

כאן רואים את כל המקומות שבהם נעשה שימוש בארטיפקט הזה:

פרטי השושלת

שלב 4: השוואת מדדים בין הפעלות של צינורות עיבוד נתונים

אם אתם מפעילים את צינור עיבוד הנתונים הזה כמה פעמים, כדאי להשוות בין מדדים בכמה הפעלות. אפשר להשתמש ב-method aiplatform.get_pipeline_df() כדי לגשת למטא-נתונים של ההרצה. כאן נקבל מטא-נתונים לכל ההפעלות של צינור עיבוד הנתונים הזה ונטענו אותו ל-Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

זהו, סיימתם את שיעור ה-Lab!

🎉 כל הכבוד! 🎉

למדתם איך להשתמש ב-Vertex AI כדי:

  • שימוש ב-Kubeflow Pipelines SDK כדי לפתח צינורות עיבוד נתונים מקצה לקצה עם רכיבים מותאמים אישית
  • הרצת צינורות עיבוד נתונים ב-Vertex Pipelines והפעלת צינורות עיבוד נתונים עם ה-SDK
  • הצגה וניתוח של תרשים Vertex Pipelines במסוף
  • שימוש ברכיבים של צינור עיבוד נתונים מוכנים מראש כדי להוסיף שירותי Vertex AI לצינור עיבוד הנתונים שלכם
  • תזמון משימות חוזרות של צינור עיבוד נתונים

כדי לקבל מידע נוסף על החלקים השונים ב-Vertex, אתם יכולים לעיין במסמכי העזרה.

7. הסרת המשאבים

כדי שלא תחויבו, מומלץ למחוק את המשאבים שנוצרו בשיעור ה-Lab הזה.

שלב 1: מפסיקים או מוחקים את המכונה של Notebooks

אם אתם רוצים להמשיך להשתמש ב-notebook שיצרתם בשיעור ה-Lab הזה, מומלץ להשבית אותו כשהוא לא בשימוש. בממשק המשתמש של Notebooks במסוף Cloud, בוחרים את ה-notebook ואז לוחצים על Stop. כדי למחוק את המכונה לגמרי, בוחרים באפשרות מחיקה:

עצירת המופע

שלב 2: מחיקת נקודת הקצה

כדי למחוק את נקודת הקצה שפרסתם, עוברים לקטע Endpoints במסוף Vertex AI ולוחצים על סמל המחיקה:

מחיקת נקודת הקצה

לאחר מכן לוחצים על UnDeploy מתוך הבקשה הבאה:

ביטול הפריסה של המודל

בשלב האחרון עוברים לקטע מודלים במסוף, מאתרים את המודל הרצוי ובתפריט שלוש הנקודות בצד שמאל לוחצים על מחיקת מודל:

מחיקת המודל

שלב 3: מחיקת קטגוריה של Cloud Storage

כדי למחוק את קטגוריית האחסון, באמצעות תפריט הניווט במסוף Cloud, עוברים אל Storage (אחסון), בוחרים את הקטגוריה ולוחצים על סמל המחיקה:

מחיקת האחסון