מבוא ל-Vertex Pipelines

1. סקירה כללית

בשיעור ה-Lab הזה תלמדו איך ליצור ולהריץ צינורות ML באמצעות Vertex Pipelines.

מה לומדים

במאמר הזה נסביר איך:

  • שימוש ב-Kubeflow Pipelines SDK כדי לבנות צינורות עיבוד נתונים של למידת מכונה (ML) שניתנים להרחבה
  • יצירה והפעלה של פייפליין פתיח בן 3 שלבים שמקבל קלט טקסט
  • יצירה והרצה של פייפליין שמאמן, מעריך ופורס מודל סיווג של AutoML
  • שימוש ברכיבים מוכנים מראש לאינטראקציה עם שירותי Vertex AI, שזמינים דרך ספריית google_cloud_pipeline_components
  • תזמון משימה בצינור עיבוד נתונים באמצעות Cloud Scheduler

העלות הכוללת של הרצת שיעור ה-Lab הזה ב-Google Cloud היא בערך 25$.

2. מבוא ל-Vertex AI

בשיעור ה-Lab הזה נעשה שימוש במוצר ה-AI החדש ביותר שזמין ב-Google Cloud. ‫Vertex AI משלב את מוצרי ה-ML ב-Google Cloud לחוויית פיתוח חלקה. בעבר, היה אפשר לגשת למודלים שאומנו באמצעות AutoML ולמודלים בהתאמה אישית דרך שירותים נפרדים. המוצר החדש משלב את שניהם ב-API אחד, יחד עם מוצרים חדשים אחרים. אפשר גם להעביר פרויקטים קיימים אל Vertex AI.

בנוסף לשירותי אימון ופריסה של מודלים, Vertex AI כולל גם מגוון מוצרי MLOps, כולל Vertex Pipelines (המוצר העיקרי בשיעור ה-Lab הזה), Model Monitoring,‏ Feature Store ועוד. בדיאגרמה שלמטה אפשר לראות את כל המוצרים של Vertex AI.

סקירה כללית על מוצר Vertex

אם יש לך משוב, אפשר לעיין בדף התמיכה.

למה צינורות עיבוד נתונים של למידת מכונה שימושיים?

לפני שמתחילים, כדאי להבין למה כדאי להשתמש בפייפליין. נניח שאתם בונים תהליך עבודה של למידת מכונה שכולל עיבוד נתונים, אימון מודל, כוונון היפר-פרמטרים, הערכה ופריסת מודל. לכל אחד מהשלבים האלה יכולות להיות תלויות שונות, וזה יכול להיות מסובך אם מתייחסים לכל תהליך העבודה כמקשה אחת. כשמתחילים להרחיב את תהליך ה-ML, כדאי לשתף את תהליך העבודה של ה-ML עם חברים אחרים בצוות כדי שהם יוכלו להפעיל פתרונות חכמים ולתרום קוד. ללא תהליך אמין שניתן לשחזור, זה יכול להיות קשה. בצינורות, כל שלב בתהליך למידת המכונה הוא קונטיינר משלו. כך תוכלו לפתח שלבים בנפרד ולעקוב אחרי הקלט והפלט מכל שלב בצורה שניתנת לשחזור. אפשר גם לתזמן או להפעיל הרצות של פייפליין על סמך אירועים אחרים בסביבת Cloud, כמו הפעלה של הרצת פייפליין כשנתוני אימון חדשים זמינים.

בקיצור: צינורות עיבוד נתונים עוזרים לכם ליצור אוטומציה של תהליך העבודה של למידת המכונה ולשחזר אותו.

3. הגדרה של סביבת ענן

כדי להפעיל את ה-codelab הזה, צריך פרויקט ב-Google Cloud Platform שמופעל בו חיוב. כדי ליצור פרויקט, פועלים לפי ההוראות האלה.

שלב 1: הפעלת Cloud Shell

בשיעור ה-Lab הזה תעבדו בסשן של Cloud Shell, שהוא מתורגמן פקודות שמתארח במכונה וירטואלית שפועלת בענן של Google. אפשר להריץ את הקטע הזה בקלות במחשב שלכם, אבל השימוש ב-Cloud Shell מאפשר לכולם גישה לחוויה שניתנת לשחזור בסביבה עקבית. אחרי הסדנה, אתם מוזמנים לנסות שוב את החלק הזה במחשב שלכם.

מתן הרשאה ל-Cloud Shell

הפעלת Cloud Shell

בפינה הימנית העליונה של Cloud Console, לוחצים על הלחצן Activate Cloud Shell (הפעלת Cloud Shell):

הפעלת Cloud Shell

אם זו הפעם הראשונה שאתם מפעילים את Cloud Shell, יוצג לכם מסך ביניים (בחלק הנגלל) עם תיאור של הכלי. במקרה כזה, לוחצים על המשך (והמסך הזה לא יוצג לכם יותר). כך נראה המסך החד-פעמי:

הגדרה של Cloud Shell

הקצאת המשאבים והחיבור ל-Cloud Shell נמשכים רק כמה רגעים.

Cloud Shell init

המכונה הווירטואלית הזו כוללת את כל הכלים שדרושים למפתחים. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, מה שמשפר מאוד את הביצועים והאימות של הרשת. אפשר לבצע את רוב העבודה ב-codelab הזה, אם לא את כולה, באמצעות דפדפן או Chromebook.

אחרי שמתחברים ל-Cloud Shell, אמור להופיע אימות שכבר בוצע ושהפרויקט כבר הוגדר לפי מזהה הפרויקט.

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שעברתם אימות:

gcloud auth list

הפלט של הפקודה אמור להיראות כך:

פלט של Cloud Shell

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שפקודת gcloud מכירה את הפרויקט:

gcloud config list project

פלט הפקודה

[core]
project = <PROJECT_ID>

אם הוא לא מוגדר, אפשר להגדיר אותו באמצעות הפקודה הבאה:

gcloud config set project <PROJECT_ID>

פלט הפקודה

Updated property [core/project].

ל-Cloud Shell יש כמה משתני סביבה, כולל GOOGLE_CLOUD_PROJECT שמכיל את השם של פרויקט בענן הנוכחי. נשתמש בזה במקומות שונים במהלך שיעור ה-Lab הזה. אפשר לראות אותו על ידי הפעלת הפקודה:

echo $GOOGLE_CLOUD_PROJECT

שלב 2: הפעלת ממשקי API

בשלבים הבאים נסביר איפה צריך את השירותים האלה (ולמה), אבל בינתיים, מריצים את הפקודה הזו כדי להעניק לפרויקט גישה לשירותים Compute Engine,‏ Container Registry ו-Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

אמורה להופיע הודעה על הצלחה, כמו זו:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

שלב 3: יצירת קטגוריה של Cloud Storage

כדי להריץ משימת אימון ב-Vertex AI, נצטרך קטגוריית אחסון לאחסון נכסי המודל השמורים. הקטגוריה צריכה להיות אזורית. בשיעור Lab הזה אנחנו משתמשים ב-us-central, אבל אתם יכולים להשתמש באזור אחר (פשוט צריך להחליף אותו בכל מקום בשיעור Lab). אם כבר יש לכם קטגוריה, אתם יכולים לדלג על השלב הזה.

כדי ליצור קטגוריה, מריצים את הפקודות הבאות במסוף Cloud Shell:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

בשלב הבא ניתן לחשבון השירות של Compute גישה לקטגוריה הזו. כך מוודאים של-Vertex Pipelines יש את ההרשאות הנדרשות לכתיבת קבצים בקטגוריה הזו. מריצים את הפקודה הבאה כדי להוסיף את ההרשאה הזו:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

שלב 4: יצירת מכונה של Vertex AI Workbench

בקטע Vertex AI במסוף Cloud, לוחצים על Workbench:

תפריט Vertex AI

משם, בקטע Notebooks בניהול המשתמשים, לוחצים על New Notebook:

יצירת Notebook חדש

לאחר מכן בוחרים את סוג האינסטנס TensorFlow Enterprise 2.3 (with LTS) without GPUs:

מופע TFE

משתמשים באפשרויות ברירת המחדל ולוחצים על יצירה.

שלב 5: פותחים את המחברת

אחרי שהמופע נוצר, בוחרים באפשרות Open JupyterLab:

פתיחת Notebook

4. הגדרה של Vertex Pipelines

כדי להשתמש ב-Vertex Pipelines, צריך להתקין עוד כמה ספריות:

  • Kubeflow Pipelines: זהו ה-SDK שבו נשתמש כדי לבנות את צינור עיבוד הנתונים. ב-Vertex Pipelines יש תמיכה בהרצת צינורות עיבוד נתונים שנבנו באמצעות Kubeflow Pipelines או TFX.
  • רכיבי פייפליין של Google Cloud: בספרייה הזו יש רכיבים מוכנים מראש שמקלים על האינטראקציה עם שירותי Vertex AI משלבי הפייפליין.

שלב 1: יוצרים מחברת Python ומתקינים ספריות

קודם כל, בתפריט של מרכז האפליקציות במופע Notebook, יוצרים מחברת על ידי בחירה באפשרות Python 3:

יצירת מחברת Python3

כדי לגשת לתפריט מרכז האפליקציות, לוחצים על הסימן + בפינה השמאלית העליונה של מחברת Jupyter.

כדי להתקין את שני השירותים שבהם נשתמש בשיעור ה-Lab הזה, קודם צריך להגדיר את דגל המשתמש בתא של notebook:

USER_FLAG = "--user"

לאחר מכן מריצים את הפקודה הבאה מתוך ה-notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

אחרי שמתקינים את החבילות האלה, צריך להפעיל מחדש את ליבת המערכת:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

לבסוף, בודקים שהחבילות הותקנו בצורה נכונה. גרסת ה-SDK של KFP צריכה להיות ‎ >=1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

שלב 2: מגדירים את מזהה הפרויקט ואת הקטגוריה

במהלך שיעור ה-Lab הזה תצטרכו להתייחס למזהה פרויקט בענן ולמאגר שיצרתם קודם. לאחר מכן ניצור משתנים לכל אחד מהם.

אם אתם לא יודעים מהו מזהה הפרויקט, יכול להיות שתוכלו לקבל אותו על ידי הפעלת הפקודה הבאה:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

אחרת, מגדירים אותו כאן:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

לאחר מכן יוצרים משתנה לאחסון שם הקטגוריה. אם יצרתם אותו בשיעור ה-Lab הזה, הפעולות הבאות יעבדו. אחרת, תצטרכו להגדיר את זה באופן ידני:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

שלב 3: ייבוא ספריות

מוסיפים את השורות הבאות כדי לייבא את הספריות שבהן נשתמש במהלך ה-codelab:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

שלב 4: הגדרת קבועים

הדבר האחרון שצריך לעשות לפני שיוצרים את הפייפליין הוא להגדיר כמה משתנים קבועים. ‫PIPELINE_ROOT הוא הנתיב ב-Cloud Storage שבו ייכתבו הארטיפקטים שנוצרו על ידי הפייפליין שלנו. בדוגמה הזו, האזור הוא us-central1, אבל אם השתמשתם באזור אחר כשייצרתם את הקטגוריה, צריך לעדכן את המשתנה REGION בקוד שלמטה:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

אחרי שמריצים את הקוד שלמעלה, אמורה להופיע הדפסה של תיקיית השורש של הפייפליין. זהו המיקום ב-Cloud Storage שבו ייכתבו הארטיפקטים מהפייפליין. הפורמט יהיה gs://YOUR-BUCKET-NAME/pipeline_root/

5. יצירת צינור עיבוד הנתונים הראשון

כדי להבין איך Vertex Pipelines פועל, נתחיל ביצירת פייפליין קצר באמצעות KFP SDK. הפייפליין הזה לא עושה שום דבר שקשור ל-ML (אל דאגה, נגיע לזה בהמשך!), אנחנו משתמשים בו כדי ללמד אתכם:

  • איך יוצרים רכיבים מותאמים אישית ב-KFP SDK
  • איך מריצים פייפליין ב-Vertex Pipelines ועוקבים אחריו

ניצור פייפליין שיוציא פלט של משפט באמצעות שני פלטים: שם המוצר ותיאור אמוג'י. הפייפליין הזה יכלול שלושה רכיבים:

  • product_name: הרכיב הזה מקבל שם מוצר (או כל שם עצם אחר שרוצים) כקלט, ומחזיר את המחרוזת הזו כפלט
  • emoji: הרכיב הזה יקבל את תיאור הטקסט של אמוג'י וימיר אותו לאמוג'י. לדוגמה, קוד הטקסט של ✨ הוא sparkles. הרכיב הזה משתמש בספריית אמוג'י כדי להראות לכם איך לנהל יחסי תלות חיצוניים בפייפליין.
  • build_sentence: הרכיב הסופי הזה יקבל את הפלט של שני הרכיבים הקודמים כדי ליצור משפט שכולל את סמל האמוג'י. לדוגמה, הפלט שמתקבל יכול להיות 'Vertex Pipelines is ✨'.

בואו נתחיל לתכנת!

שלב 1: יצירת רכיב שמבוסס על פונקציית Python

באמצעות KFP SDK, אפשר ליצור רכיבים שמבוססים על פונקציות Python. נשתמש בזה בשלושת הרכיבים בצינור הראשון שלנו. קודם ניצור את הרכיב product_name, שמקבל מחרוזת כקלט ומחזיר את המחרוזת הזו. מוסיפים את הטקסט הבא ל-Notebook:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

בואו נבחן את התחביר:

  • הדקורטור @component מקמפל את הפונקציה הזו לרכיב כשהפייפליין מופעל. תשתמשו בזה בכל פעם שתכתבו רכיב בהתאמה אישית.
  • הפרמטר base_image מציין את קובץ אימג' של קונטיינר שבו הרכיב הזה ישתמש.
  • הפרמטר output_component_file הוא אופציונלי, והוא מציין את קובץ ה-YAML שאליו ייכתב הרכיב שעבר קומפילציה. אחרי שמריצים את התא, הקובץ אמור להיכתב למופע של המחברת. אם רוצים לשתף את הרכיב הזה עם מישהו, אפשר לשלוח לו את קובץ ה-YAML שנוצר ולבקש ממנו לטעון אותו באמצעות הפקודה הבאה:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • הסימן -> str אחרי הגדרת הפונקציה מציין את סוג הפלט של הרכיב הזה.

שלב 2: יוצרים שני רכיבים נוספים

כדי להשלים את הפייפליין, ניצור עוד שני רכיבים. הפונקציה הראשונה שנגדיר מקבלת מחרוזת כקלט, וממירה את המחרוזת לאימוג'י התואם שלה אם קיים כזה. הפונקציה מחזירה טאפל עם טקסט הקלט שהועבר ואמוג'י התוצאה:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

הרכיב הזה קצת יותר מורכב מהקודם. מה חדש:

  • הפרמטר packages_to_install מציין לרכיב את כל התלויות בספריות חיצוניות של הקונטיינר הזה. במקרה הזה, אנחנו משתמשים בספרייה שנקראת emoji.
  • הרכיב הזה מחזיר NamedTuple בשם Outputs. שימו לב שלכל אחת מהמחרוזות בטופל הזה יש מפתחות: emoji_text ו-emoji. נשתמש בהם ברכיב הבא כדי לגשת לפלט.

הרכיב האחרון בפייפליין הזה יצרוך את הפלט של שני הרכיבים הראשונים וישלב אותם כדי להחזיר מחרוזת:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

יכול להיות שאתם תוהים: איך הרכיב הזה יודע להשתמש בפלט מהשלבים הקודמים שהגדרתם? שאלה טובה! בשלב הבא נסביר איך הכל מתחבר.

שלב 3: הרכבת הרכיבים לפייפליין

הגדרות הרכיבים שהגדרנו למעלה יצרו פונקציות פקטורי (factory) שאפשר להשתמש בהן בהגדרת פייפליין כדי ליצור שלבים. כדי להגדיר פייפליין, משתמשים ב-decorator‏ @pipeline, נותנים לפייפליין שם ותיאור ומספקים את נתיב הבסיס שבו צריך לכתוב את הארטיפקטים של הפייפליין. במונח 'ארטיפקטים' אנחנו מתכוונים לכל קובץ פלט שנוצר על ידי צינור הנתונים. בפייפליין הפתיח הזה לא נוצרים נתונים, אבל בפייפליין הבא כן.

בבלוק הקוד הבא אנחנו מגדירים פונקציית intro_pipeline. כאן מציינים את הקלט לשלבים הראשוניים של צינור הנתונים, ואת אופן הקישור בין השלבים:

  • product_task מקבל שם מוצר כקלט. בדוגמה הזו אנחנו מעבירים את הערך Vertex Pipelines, אבל אפשר לשנות אותו לכל ערך אחר שרוצים.
  • emoji_task מקבל את קוד הטקסט של אמוג'י כקלט. אפשר גם לשנות את הקריאה לפעולה איך שרוצים. לדוגמה, party_face מתייחס לאמוג'י 🥳. שימו לב: מכיוון שגם לרכיב הזה וגם לרכיב product_task אין שלבים שמזינים קלט, אנחנו מציינים את הקלט באופן ידני כשאנחנו מגדירים את צינור הנתונים.
  • השלב האחרון בצינור עיבוד הנתונים – consumer_task – כולל שלושה פרמטרים של קלט:
    • הפלט של product_task. מכיוון שהשלב הזה יוצר רק פלט אחד, אפשר להפנות אליו באמצעות product_task.output.
    • הפלט של emoji מהשלב emoji_task. אפשר לעיין ברכיב emoji שמוגדר למעלה, שבו נתנו שמות לפרמטרים של הפלט.
    • באופן דומה, הפלט שנקרא emoji_text מהרכיב emoji. אם הצינור שלנו מקבל טקסט שלא תואם לאמוג'י, הוא ישתמש בטקסט הזה כדי ליצור משפט.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

שלב 4: קומפילציה והרצה של צינור עיבוד הנתונים

אחרי שמגדירים את הצינור, אפשר לקמפל אותו. הפקודה הבאה תיצור קובץ JSON שבו תשתמשו כדי להריץ את הפייפליין:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

לאחר מכן יוצרים TIMESTAMP משתנה. נשתמש בזה במזהה המשרה:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

לאחר מכן מגדירים את משימת צינור הנתונים:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

לבסוף, מריצים את העבודה כדי ליצור הרצה חדשה של פייפליין:

job.submit()

אחרי שמריצים את התא הזה, אמורים לראות יומנים עם קישור לצפייה בהרצת צינור הנתונים במסוף:

יומני משרות בצינור

עוברים לקישור הזה. כשהפייפליין יסתיים, הוא אמור להיראות כך:

צינור עיבוד נתונים של פתיח שהושלם

הרצת צינור הנתונים הזה תימשך 5-6 דקות. אחרי שהרכיב יסיים את הפעולה, תוכלו ללחוץ על הרכיב build-sentence כדי לראות את הפלט הסופי:

פלט של צינור עיבוד נתונים להקדמה

עכשיו, אחרי שהכרתם את ה-SDK של KFP ואת Vertex Pipelines, אתם מוכנים ליצור פייפליין שיוצר ופורס מודל ML באמצעות שירותים אחרים של Vertex AI. קדימה, מתחילים!

6. יצירת פייפליין של למידת מכונה מקצה לקצה

הגיע הזמן לבנות את פייפליין ה-ML הראשון. בפייפליין הזה נשתמש בקבוצת הנתונים של שעועית יבשה מ-UCI Machine Learning, מתוך: KOKLU, M. and OZKAN, I.A., ‫(2020), ‏"Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

זהו מערך נתונים טבלאי, ובפייפליין שלנו נשתמש במערך הנתונים כדי לאמן, להעריך ולפרוס מודל AutoML שמסווג שעועית לאחד מ-7 סוגים על סמך המאפיינים שלה.

הצינור הזה:

  • יצירת מערך נתונים ב-
  • אימון מודל לסיווג טבלאות באמצעות AutoML
  • קבלת מדדי הערכה לגבי המודל הזה
  • על סמך מדדי ההערכה, מחליטים אם לפרוס את המודל באמצעות לוגיקה מותנית ב-Vertex Pipelines
  • פריסת המודל לנקודת קצה באמצעות Vertex Prediction

כל אחד מהשלבים שמתוארים יהיה רכיב. ברוב השלבים של הפייפליין נעשה שימוש ברכיבים מוכנים מראש לשירותי Vertex AI דרך ספריית google_cloud_pipeline_components שייבאנו קודם ב-Codelab הזה. בקטע הזה נגדיר קודם רכיב מותאם אישית אחד. לאחר מכן, נגדיר את שאר השלבים בפייפליין באמצעות רכיבים מוכנים מראש. רכיבים מוכנים מראש מקלים על הגישה לשירותי Vertex AI, כמו אימון ופריסה של מודלים.

שלב 1: רכיב בהתאמה אישית להערכת מודל

הרכיב המותאם אישית שנגדיר ישמש לקראת סוף צינור העיבוד, אחרי שאימון המודל יסתיים. הרכיב הזה יבצע כמה פעולות:

  • קבלת מדדי ההערכה ממודל הסיווג שאומן באמצעות AutoML
  • ניתוח המדדים והצגתם בממשק המשתמש של Vertex Pipelines
  • השוואת המדדים לסף כדי לקבוע אם כדאי לפרוס את המודל

לפני שנגדיר את הרכיב, נסביר מהם פרמטרי הקלט והפלט שלו. הקלט של צינור העיבוד הזה כולל מטא-נתונים מסוימים בפרויקט בענן שלנו, את המודל המאומן שנוצר (נגדיר את הרכיב הזה בהמשך), את מדדי ההערכה של המודל ו-thresholds_dict_str. ה-thresholds_dict_str הוא משהו שנגדיר כשהפייפליין יפעל. במקרה של מודל הסיווג הזה, זה יהיה הערך של השטח מתחת לעקומת ה-ROC שעבורו צריך לפרוס את המודל. לדוגמה, אם מעבירים את הערך 0.95, המשמעות היא שהפייפליין יפרוס את המודל רק אם המדד הזה גבוה מ-95%.

רכיב ההערכה שלנו מחזיר מחרוזת שמציינת אם כדאי לפרוס את המודל או לא. כדי ליצור את הרכיב המותאם אישית הזה, מוסיפים את הקוד הבא לתא ב-Notebook:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

שלב 2: הוספת רכיבים מוכנים מראש של Google Cloud

בשלב הזה נגדיר את שאר רכיבי הצינור ונראה איך הם משתלבים יחד. קודם כל, מגדירים את שם התצוגה של הרצת הצינור באמצעות חותמת זמן:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

לאחר מכן מעתיקים את הקוד הבא לתא חדש במחברת:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

בואו נראה מה קורה בקוד הזה:

  • קודם כל, כמו בפייפליין הקודם, אנחנו מגדירים את פרמטרי הקלט שהפייפליין הזה מקבל. אנחנו צריכים להגדיר אותם באופן ידני כי הם לא תלויים בפלט של שלבים אחרים בפייפליין.
  • שאר הפייפליין משתמש בכמה רכיבים מוכנים מראש כדי ליצור אינטראקציה עם שירותי Vertex AI:
    • TabularDatasetCreateOp יוצר מערך נתונים טבלאי ב-Vertex AI בהינתן מקור מערך נתונים ב-Cloud Storage או ב-BigQuery. בפייפליין הזה, אנחנו מעבירים את הנתונים באמצעות כתובת URL של טבלה ב-BigQuery
    • AutoMLTabularTrainingJobRunOp מפעיל משימת אימון של AutoML עבור מערך נתונים טבלאי. אנחנו מעבירים לרכיב הזה כמה פרמטרים של הגדרה, כולל סוג המודל (במקרה הזה, סיווג), נתונים מסוימים על העמודות, משך הזמן שבו אנחנו רוצים להריץ את האימון ומצביע על מערך הנתונים. שימו לב: כדי להעביר את מערך הנתונים לרכיב הזה, אנחנו מספקים את הפלט של הרכיב הקודם באמצעות dataset_create_op.outputs["dataset"]
    • EndpointCreateOp יוצר נקודת קצה ב-Vertex AI. נקודת הקצה שנוצרה בשלב הזה תועבר כקלט לרכיב הבא
    • ModelDeployOp פורס מודל נתון לנקודת קצה ב-Vertex AI. במקרה הזה, אנחנו משתמשים בנקודת הקצה שנוצרה בשלב הקודם. יש עוד אפשרויות הגדרה, אבל כאן אנחנו מציינים את סוג המכונה ואת הדגם של נקודת הקצה שאנחנו רוצים לפרוס. אנחנו מעבירים את המודל על ידי גישה לפלטים של שלב האימון בצינור שלנו
  • בצינור הזה נעשה שימוש גם בלוגיקה מותנית, תכונה של Vertex Pipelines שמאפשרת להגדיר תנאי, יחד עם ענפים שונים על סמך התוצאה של התנאי הזה. חשוב לזכור שכשהגדרנו את צינור הנתונים, העברנו פרמטר thresholds_dict_str. זהו סף הדיוק שבו אנחנו משתמשים כדי לקבוע אם לפרוס את המודל שלנו לנקודת קצה. כדי להטמיע את זה, אנחנו משתמשים במחלקה Condition מ-KFP SDK. התנאי שאנחנו מעבירים הוא הפלט של רכיב ההערכה המותאם אישית שהגדרנו קודם ב-codelab הזה. אם התנאי הזה מתקיים, הצינור ימשיך להפעיל את רכיב deploy_op. אם רמת הדיוק לא תעמוד בסף שהגדרנו מראש, הצינור יפסיק כאן ולא יפרוס מודל.

שלב 3: קומפילציה והרצה של צינור עיבוד הנתונים של למידת מכונה מקצה לקצה

אחרי שהגדרנו את צינור הנתונים המלא, הגיע הזמן לקמפל אותו:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

לאחר מכן, מגדירים את העבודה:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

ולבסוף, מריצים את העבודה:

ml_pipeline_job.submit()

כדי לראות את הפייפליין במסוף, עוברים לקישור שמופיע ביומנים אחרי שמריצים את התא שלמעלה. הרצת צינור הנתונים הזה תימשך קצת יותר משעה. רוב הזמן מושקע בשלב האימון של AutoML. הפייפליין שהושלם ייראה בערך כך:

צינור עיבוד נתונים של AutoML הושלם

אם לוחצים על הלחצן 'הרחבת ארטיפקטים' בחלק העליון, אפשר לראות את הפרטים של הארטיפקטים השונים שנוצרו מצינור הנתונים. לדוגמה, אם לוחצים על ארטיפקט dataset, מוצגים פרטים על מערך הנתונים של Vertex AI שנוצר. כדי לעבור לדף של מערך הנתונים, לוחצים על הקישור:

מערך נתונים של צינור עיבוד נתונים

באופן דומה, כדי לראות את התרשימים של המדדים שנוצרו מרכיב ההערכה המותאם אישית, לוחצים על הארטיפקט שנקרא metricsc. בצד שמאל של מרכז הבקרה, תוכלו לראות את מטריצת הטעות של המודל הזה:

הצגה חזותית של מדדים

כדי לראות את המודל ונקודת הקצה שנוצרו מהרצת הצינור הזה, עוברים אל models section ולוחצים על המודל שנקרא automl-beans. שם אמור להופיע המודל הזה שנפרס לנקודת קצה:

נקודת קצה של מודל

אפשר גם לגשת לדף הזה בלחיצה על ארטיפקט נקודת הקצה בתרשים של צינור הנתונים.

בנוסף לצפייה בתרשים של פייפליין במסוף, אפשר להשתמש ב-Vertex Pipelines גם למעקב אחר מקורות נתונים. מעקב אחר שושלת היוחסין הוא מעקב אחר פריטי מידע שנוצרו לאורך צינור הנתונים. כך נוכל להבין איפה נוצרו הארטיפקטים ואיך משתמשים בהם בתהליך העבודה של למידת מכונה. לדוגמה, כדי לראות את מעקב השתלשלות הנתונים של קבוצת הנתונים שנוצרה בפייפליין הזה, לוחצים על ארטיפקט קבוצת הנתונים ואז על View Lineage:

הצגת שושלת

כאן אפשר לראות את כל המקומות שבהם נעשה שימוש בארטיפקט הזה:

פרטי שרשרת היוחסין

שלב 4: השוואה בין מדדים בהפעלות של צינורות

אם מריצים את צינור העיבוד הזה כמה פעמים, יכול להיות שתרצו להשוות בין המדדים של הריצות השונות. אפשר להשתמש ב-method‏ aiplatform.get_pipeline_df() כדי לגשת למטא נתונים של ההרצה. כאן נאחזר מטא-נתונים של כל ההרצות של פייפליין זה ונטען אותם ל-Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

סיימתם את שיעור ה-Lab.

‫🎉 איזה כיף! 🎉

למדתם איך להשתמש ב-Vertex AI כדי:

  • שימוש ב-Kubeflow Pipelines SDK כדי ליצור צינורות עיבוד נתונים מקצה לקצה עם רכיבים בהתאמה אישית
  • הרצת צינורות עיבוד נתונים ב-Vertex Pipelines והפעלת צינורות עיבוד נתונים באמצעות ה-SDK
  • הצגה וניתוח של הגרף של Vertex Pipelines במסוף
  • שימוש ברכיבי פייפליין מוכנים מראש כדי להוסיף שירותי Vertex AI לפייפליין
  • תזמון משימות חוזרות בצינור

מידע נוסף על החלקים השונים של Vertex זמין בתיעוד.

7. הסרת המשאבים

כדי שלא תחויבו, מומלץ למחוק את המשאבים שנוצרו במהלך שיעור ה-Lab הזה.

שלב 1: מפסיקים או מוחקים את מופע Notebooks

אם אתם רוצים להמשיך להשתמש ב-notebook שיצרתם בשיעור Lab הזה, מומלץ לכבות אותו כשאתם לא משתמשים בו. בממשק המשתמש של Notebooks במסוף Cloud, בוחרים את ה-Notebook ואז בוחרים באפשרות Stop (עצירה). אם רוצים למחוק את המופע לגמרי, בוחרים באפשרות מחיקה:

עצירת מכונה

שלב 2: מוחקים את נקודת הקצה

כדי למחוק את נקודת הקצה שפרסתם, עוברים לקטע Endpoints במסוף Vertex AI ולוחצים על סמל המחיקה:

למחוק נקודות קצה

לאחר מכן, לוחצים על ביטול הפריסה בהודעה הבאה:

ביטול הפריסה של מודל

לבסוף, עוברים לקטע Models (מודלים) במסוף, מאתרים את המודל הרלוונטי ובתפריט שלוש הנקודות שמשמאל לוחצים על Delete model (מחיקת המודל):

מחיקת המודל

שלב 3: מחיקת קטגוריית Cloud Storage

כדי למחוק את קטגוריית האחסון, בתפריט הניווט ב-Cloud Console, עוברים אל Storage, בוחרים את הקטגוריה ולוחצים על Delete:

מחיקת האחסון