מבוא ל-Vertex Pipelines

1. סקירה כללית

בשיעור ה-Lab הזה תלמדו איך ליצור ולהריץ צינורות עיבוד נתונים ל-ML באמצעות Vertex Pipelines.

מה לומדים

תלמדו איך:

  • שימוש ב-Kubeflow Pipelines SDK כדי לפתח צינורות עיבוד נתונים ניתנים להרחבה של למידת מכונה
  • יצירת צינור עיבוד נתונים של מבוא בן 3 שלבים שמקבל קלט טקסט והפעלה שלו
  • יצירה והפעלה של צינור עיבוד נתונים לאימון, להערכה ולפריסה של מודל סיווג של AutoML
  • להשתמש ברכיבים מוכנים מראש לאינטראקציה עם שירותי Vertex AI, שמסופקים דרך הספרייה google_cloud_pipeline_components
  • תזמון משימה בצינור עיבוד נתונים באמצעות Cloud Scheduler

העלות הכוללת להרצת הסדנה הזו ב-Google Cloud היא כ-25$.

2. מבוא ל-Vertex AI

במעבדה הזו נעשה שימוש במוצרי ה-AI החדשים ביותר שזמינים ב-Google Cloud. Vertex AI משלבת את חבילות ה-ML ב-Google Cloud לחוויית פיתוח חלקה. בעבר, ניתן היה לגשת למודלים שעברו אימון באמצעות AutoML ומודלים בהתאמה אישית דרך שירותים נפרדים. המוצר החדש משלב את שניהם ב-API אחד, יחד עם מוצרים חדשים אחרים. אפשר גם להעביר פרויקטים קיימים ל-Vertex AI.

בנוסף לאימון מודלים ולשירותי פריסה, Vertex AI כולל גם מגוון מוצרי MLOps, כולל Vertex Pipelines (המוקד של שיעור ה-Lab הזה), Model Monitoring, Feature Store ועוד. כל מוצרי Vertex AI מפורטים בתרשים הבא.

סקירה כללית על מוצרי Vertex

יש לך משוב? אפשר להיכנס לדף התמיכה.

למה צינורות עיבוד נתונים של למידת מכונה שימושיים?

לפני שנמשיך, חשוב להבין למה כדאי להשתמש בצינור עיבוד נתונים. נניח שאתם מפתחים תהליך עבודה של למידת מכונה שכולל עיבוד נתונים, אימון מודל, כוונון היפר-פרמטרים, הערכה ופריסת מודלים. לכל אחד מהשלבים האלה יכולות להיות יחסי תלות שונים, שעשויים להיות קשים לניהול אם מתייחסים לתהליך העבודה כולו כאל מונולית. כשתתחילו לבצע התאמה לעומס של תהליך למידת המכונה (ML), כדאי לשתף את תהליך העבודה עם אחרים בצוות כדי להריץ אותו ולתרום קוד. ללא תהליך אמין שאפשר לשחזר, זה יכול להיות קשה. באמצעות צינורות עיבוד נתונים, כל שלב בתהליך למידת המכונה הוא מאגר משלו. כך אפשר לפתח שלבים בנפרד ולעקוב אחרי הקלט והפלט של כל שלב באופן שניתן לשחזור. תוכלו גם לתזמן או להפעיל הפעלות של צינור עיבוד הנתונים על סמך אירועים אחרים בסביבת הענן שלכם, כמו הפעלת צינור עיבוד נתונים כשיש נתוני אימון חדשים.

הסיכום: צינורות עיבוד נתונים עוזרים לבצע אוטומציה של תהליך העבודה של למידת המכונה ולשכפל אותו.

3. הגדרת סביבת ענן

כדי להריץ את הקודלה הזה, צריך פרויקט ב-Google Cloud Platform שבו החיוב מופעל. כדי ליצור פרויקט, יש לפעול לפי ההוראות האלה.

שלב 1: הפעלת Cloud Shell

בשיעור ה-Lab הזה תלמדו איך לעבוד בסשן של Cloud Shell, שהוא מתורגם פקודות שמתארח במכונה וירטואלית שפועלת בענן של Google. באותה מידה תוכלו להריץ את הקטע הזה באופן מקומי במחשב שלכם, אבל השימוש ב-Cloud Shell מאפשר לכולם ליהנות מחוויה שניתן לשחזר בסביבה עקבית. אחרי שיעור ה-Lab, אתם יכולים לנסות שוב את הקטע הזה במחשב.

מתן הרשאה ל-Cloud Shell

הפעלת Cloud Shell

בפינה הימנית העליונה של מסוף Cloud, לוחצים על הלחצן הבא כדי להפעיל את Cloud Shell:

הפעלת Cloud Shell

אם זו הפעם הראשונה שאתם מפעילים את Cloud Shell, יוצג לכם מסך ביניים (בחלק הנגלל) שמתאר מהו. אם זה המצב, לחצו על המשך (והוא לא יופיע יותר). כך נראה המסך החד-פעמי:

הגדרת Cloud Shell

תהליך ההקצאה וההתחברות ל-Cloud Shell אמור להימשך רק כמה רגעים.

אתחול של Cloud Shell

במכונה הווירטואלית הזו משולבת כל כלי הפיתוח שדרושים לכם. יש בה ספריית בית בנפח מתמיד של 5GB והיא פועלת ב-Google Cloud, מה שמשפר משמעותית את ביצועי הרשת והאימות. אפשר לבצע חלק גדול מהעבודה בקודלאב הזה, אם לא את כולה, באמצעות דפדפן או Chromebook.

אחרי שתתחברו ל-Cloud Shell, אמורה להופיע הודעה על כך שהאימות כבר בוצע והפרויקט כבר מוגדר לפי מזהה הפרויקט שלכם.

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהאימות בוצע:

gcloud auth list

הפלט של הפקודה אמור להיראות כך:

הפלט של Cloud Shell

מריצים את הפקודה הבאה ב-Cloud Shell כדי לוודא שהפקודה ב-gcloud יודעת על הפרויקט שלכם:

gcloud config list project

פלט הפקודה

[core]
project = <PROJECT_ID>

אם לא, אפשר להגדיר אותו באמצעות הפקודה הבאה:

gcloud config set project <PROJECT_ID>

פלט הפקודה

Updated property [core/project].

ל-Cloud Shell יש כמה משתני סביבה, כולל GOOGLE_CLOUD_PROJECT שמכיל את השם של הפרויקט הנוכחי ב-Cloud. נשתמש בו במקומות שונים במהלך שיעור ה-Lab. כדי לראות את זה, מריצים את:

echo $GOOGLE_CLOUD_PROJECT

שלב 2: מפעילים את ממשקי ה-API

בשלבים הבאים נסביר איפה נדרשים השירותים האלה (ולמה), אבל בינתיים מריצים את הפקודה הבאה כדי לתת לפרויקט גישה לשירותים Compute Engine,‏ Container Registry ו-Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

אמורה להופיע הודעה על הצלחה שדומה להודעה הבאה:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

שלב 3: יצירת קטגוריה של Cloud Storage

כדי להריץ משימה של אימון ב-Vertex AI, נצטרך קטגוריית אחסון לאחסון נכסי המודלים השמורים שלנו. הקטגוריה צריכה להיות אזורית. אנחנו משתמשים כאן ב-us-central, אבל אתם יכולים להשתמש באזור אחר (פשוט מחליפים אותו בכל שיעור ה-Lab). אם כבר יש לכם קטגוריה, אתם יכולים לדלג על השלב הזה.

כדי ליצור קטגוריה, מריצים את הפקודות הבאות בטרמינל של Cloud Shell:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

בשלב הבא נתת לחשבון השירות שלנו ל-Compute גישה לקטגוריה הזו. כך תוכלו לוודא של-Vertex Pipelines יש את ההרשאות הנדרשות לכתוב קבצים לקטגוריה הזו. מריצים את הפקודה הבאה כדי להוסיף את ההרשאה הזו:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

שלב 4: יצירת מכונה של Vertex AI Workbench

בקטע Vertex AI במסוף Cloud, לוחצים על Workbench:

התפריט של Vertex AI

משם, בתוך notebooks בניהול המשתמשים, לוחצים על New Notebook:

יצירת notebook חדש

לאחר מכן בוחרים בסוג המכונה TensorFlow Enterprise 2.3 (with LTS) without GPUs:

מכונת TFE

משתמשים באפשרויות ברירת המחדל ואז לוחצים על יצירה.

שלב 5: פותחים את ה-Notebook

אחרי שיוצרים את המכונה, בוחרים באפשרות Open JupyterLab:

פתיחת Notebook

4. הגדרת Vertex Pipelines

כדי להשתמש ב-Vertex Pipelines, נצטרך להתקין כמה ספריות נוספות:

  • Kubeflow Pipelines: זה ה-SDK שבו נשתמש כדי לפתח את צינור עיבוד הנתונים שלנו. Vertex Pipelines תומך בהרצת צינורות עיבוד נתונים שנוצרו באמצעות Kubeflow Pipelines או TFX.
  • רכיבי Google Cloud Pipeline: הספרייה הזו מספקת רכיבים מוכנים מראש כדי להקל על האינטראקציה עם שירותי Vertex AI מהשלבים של צינור עיבוד הנתונים.

שלב 1: יצירת notebook ב-Python והתקנת ספריות

קודם כול, בתפריט מרכז האפליקציות במכונה של ה-Notebook, יוצרים notebook על ידי בחירה ב-Python 3.

יצירת notebook ב-Python3

כדי לגשת לתפריט 'מרכז האפליקציות', לוחצים על הסימן + בפינה השמאלית העליונה של מכונה של מחברת.

כדי להתקין את שני השירותים שבהם נשתמש במעבדה הזו, קודם צריך להגדיר את דגל המשתמש בתא של מחברת:

USER_FLAG = "--user"

לאחר מכן מריצים את הפקודה הבאה מה-notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

אחרי התקנת החבילות האלה, צריך להפעיל מחדש את הליבה:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

לבסוף, בודקים שהתקנתם את החבילות בצורה נכונה. גרסת ה-SDK של KFP צריכה להיות 1.8 ומעלה:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

שלב 2: מגדירים את מזהה הפרויקט ואת הקטגוריה

בשיעור ה-Lab הזה תפנו למזהה הפרויקט ב-Cloud ולקטגוריה שיצרתם קודם. בשלב הבא ניצור משתנים לכל אחד מהם.

אם אתם לא יודעים מהו מזהה הפרויקט, תוכלו לבצע את הפעולות הבאות כדי לאתר אותו:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

אחרת, מגדירים אותו כאן:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

לאחר מכן יוצרים משתנה לאחסון שם הקטגוריה. אם יצרתם אותו במהלך ה-Lab, הפעולות הבאות יפעלו. אחרת, תצטרכו להגדיר את זה באופן ידני:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

שלב 3: ייבוא ספריות

צריך להוסיף את הפריטים הבאים כדי לייבא את הספריות שבהן נשתמש ב-Codelab הזה:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

שלב 4: מגדירים קבועים

הדבר האחרון שצריך לעשות לפני שמפתחים את צינור עיבוד הנתונים הוא להגדיר כמה משתנים קבועים. PIPELINE_ROOT הוא הנתיב של Cloud Storage שבו ייכתבו הארטיפקטים שנוצרו על ידי צינור עיבוד הנתונים שלנו. אנחנו משתמשים כאן באזור us-central1, אבל אם השתמשתם באזור אחר כשיצרתם את הקטגוריה, עליכם לעדכן את המשתנה REGION בקוד הבא:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

אחרי שתפעילו את הקוד שלמעלה, ספריית השורש של צינור עיבוד הנתונים אמורה להדפיס. זהו המיקום ב-Cloud Storage שאליו ייכתבו הארטיפקטים מצינור עיבוד הנתונים. הוא יהיה בפורמט gs://YOUR-BUCKET-NAME/pipeline_root/

5. יצירת צינור עיבוד הנתונים הראשון

כדי להכיר את אופן הפעולה של Vertex Pipelines, נתחיל ליצור צינור עיבוד נתונים קצר באמצעות KFP SDK. צינור עיבוד הנתונים הזה לא עושה שום דבר שקשור ללמידת מכונה (אל דאגה, נגיע לשם!), אנחנו משתמשים בו כדי ללמד אתכם:

  • איך יוצרים רכיבים מותאמים אישית ב-KFP SDK
  • איך מריצים צינור עיבוד נתונים ומנטרים אותו ב-Vertex Pipelines

נוצר צינור עיבוד נתונים שמדפיס משפט באמצעות שני פלטים: שם מוצר ותיאור של אמוג'י. צינור עיבוד הנתונים הזה יכלול שלושה רכיבים:

  • product_name: הרכיב הזה יקבל שם מוצר (או כל שם עצם שרוצים באמת) כקלט, ויחזיר את המחרוזת הזו כפלט
  • emoji: הרכיב הזה ייקח את תיאור הטקסט של אמוג'י וימיר אותו לאמוג'י. לדוגמה, קוד הטקסט של ✨ הוא 'sparkles'. הרכיב הזה משתמש בספריית אמוג'י כדי להראות איך לנהל יחסי תלות חיצוניים בצינור עיבוד הנתונים
  • build_sentence: הרכיב האחרון הזה ישתמש בפלט של שני הרכיבים הקודמים כדי ליצור משפט שמשתמש באמוג'י. לדוגמה, הפלט שיוצג יכול להיות 'Vertex Pipelines is ✨'.

קדימה, מתחילים לתכנת!

שלב 1: יצירת רכיב שמבוסס על פונקציית Python

בעזרת KFP SDK אפשר ליצור רכיבים על סמך פונקציות Python. נשתמש בו ל-3 הרכיבים בצינור עיבוד הנתונים הראשון שלנו. קודם נבנה את הרכיב product_name, שמקבל מחרוזת כקלט ומחזיר את המחרוזת הזו. מוסיפים את הטקסט הבא לפנקס:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

נבחן את התחביר לעומק:

  • כלי העיצוב @component מרכיב את הפונקציה הזו לרכיב כשצינור עיבוד הנתונים פועל. תשתמשו בזה בכל פעם שתכתבו רכיב מותאם אישית.
  • הפרמטר base_image מציין את קובץ האימג' בקונטיינר שבו הרכיב הזה ישתמש.
  • הפרמטר output_component_file הוא אופציונלי ומציין את קובץ ה-yaml שבו צריך לכתוב את הרכיב שעבר הידור. אחרי הפעלת התא, הקובץ אמור להיכתב במכונה של המחברות. כדי לשתף את הרכיב הזה עם מישהו, אפשר לשלוח לו את קובץ ה-yaml שנוצר ולבקש ממנו לטעון אותו עם הפרטים הבאים:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • הערך -> str אחרי הגדרת הפונקציה מציין את סוג הפלט של הרכיב הזה.

שלב 2: יצירת שני רכיבים נוספים

כדי להשלים את צינור עיבוד הנתונים, נוצר שני רכיבים נוספים. המילה הראשונה שנגדיר מקבלת מחרוזת כקלט, וממירה את המחרוזת הזו לאמוג'י המתאים, אם יש כזה. הפונקציה מחזירה טופל (tuple) עם טקסט הקלט שהוענק ועם אמוג'י התוצאה:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

הרכיב הזה קצת יותר מורכב מהרכיב הקודם. ריכזנו כאן את השינויים החדשים:

  • הפרמטר packages_to_install מעדכן את הרכיב לגבי יחסי התלות בספריות חיצוניות של הקונטיינר הזה. במקרה הזה, אנחנו משתמשים בספרייה שנקראת emoji.
  • הרכיב הזה מחזיר NamedTuple שנקרא Outputs. שימו לב שלכל אחת מהמחרוזות בקבוצה הזו יש מפתחות: emoji_text ו-emoji. נשתמש בהם ברכיב הבא כדי לגשת לפלט.

הרכיב האחרון בצינור עיבוד הנתונים הזה ישתמש בפלט של שני הרכיבים הראשונים וישלב אותם כדי להחזיר מחרוזת:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

יכול להיות שתתהו: איך הרכיב הזה יודע להשתמש בפלט מהשלבים הקודמים שהגדרתם? שאלה טובה! בשלב הבא נסביר איך משלבים את כל הרכיבים.

שלב 3: איך משלבים את הרכיבים בצינור עיבוד נתונים

הגדרות הרכיבים שהגדרנו למעלה יצרו פונקציות ייצור שאפשר להשתמש בהן בהגדרת צינור עיבוד נתונים כדי ליצור שלבים. כדי להגדיר צינור עיבוד נתונים, משתמשים ב-decorator‏ @pipeline, נותנים לצינור עיבוד הנתונים שם ותיאור ומספקים את נתיב הבסיס שבו צריך לכתוב את הארטיפקטים של צינור עיבוד הנתונים. 'פריטי ארטיפקט' הם כל קובצי הפלט שנוצרים על ידי צינור עיבוד הנתונים. צינור עיבוד הנתונים הראשון לא יוצר אירועים, אבל צינור עיבוד הנתונים הבא כן ייצור אותם.

בבלוק הקוד הבא מגדירים פונקציית intro_pipeline. כאן אנחנו מציינים את הקלטים לשלבים הראשוניים בצינור עיבוד הנתונים, ואת האופן שבו השלבים מתחברים זה לזה:

  • product_task מקבל כקלט שם של מוצר. כאן אנחנו מעבירים את 'Vertex Pipelines', אבל אפשר לשנות את השם הזה לכל שם אחר.
  • emoji_task מקבל את קוד הטקסט של אמוג'י כקלט. אפשר גם לשנות את השם הזה לכל שם אחר. לדוגמה, 'party_face' מתייחס לאמוג'י 🥳. שימו לב: מכיוון שלרכיב הזה ולרכיב product_task אין שלבים שמספקים להם קלט, אנחנו מציינים את הקלט שלהם באופן ידני כשאנחנו מגדירים את צינור עיבוד הנתונים.
  • לשלב האחרון בצינור עיבוד הנתונים שלנו – consumer_task – יש שלושה פרמטרים של קלט:
    • הפלט של product_task. מכיוון שהשלב הזה מניב פלט אחד בלבד, אפשר להפנות אליו באמצעות product_task.output.
    • הפלט emoji של השלב emoji_task. אפשר לעיין ברכיב emoji שהוגדר למעלה, שבו נתתי שמות לפרמטרים של הפלט.
    • באופן דומה, הפלט של emoji_text בעל השם מהרכיב emoji. אם צינור עיבוד הנתונים שלנו יקבל טקסט שלא תואם לאמוג'י, הוא ישתמש בטקסט הזה כדי ליצור משפט.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

שלב 4: הידור והפעלה של צינור עיבוד הנתונים

עכשיו, אחרי שהגדרתם את צינור עיבוד הנתונים, אתם מוכנים לקמפל אותו. הפקודה הבאה תיצור קובץ JSON שבעזרתו תוכלו להריץ את צינור עיבוד הנתונים:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

בשלב הבא יוצרים משתנה TIMESTAMP. נשתמש בכתובת הזאת במזהה המשימה שלנו:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

לאחר מכן מגדירים את המשימה של צינור עיבוד הנתונים:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

לבסוף, מריצים את המשימה כדי ליצור הרצת צינור עיבוד נתונים חדשה:

job.submit()

אחרי שמריצים את התא הזה, אמורים להופיע יומנים עם קישור להצגת צינור עיבוד הנתונים שפועל במסוף:

יומני משימות של צינורות עיבוד נתונים

עוברים לקישור הזה. צינור עיבוד הנתונים אמור להיראות כך בסיום:

צינור עיבוד נתונים של מבוא הושלם

הרצת צינור עיבוד הנתונים הזה תימשך 5-6 דקות. בסיום, אפשר ללחוץ על הרכיב build-sentence כדי לראות את הפלט הסופי:

פלט צינור עיבוד הנתונים של המבוא

עכשיו, אחרי שהבנתם איך פועלים KFP SDK ו-Vertex Pipelines, אתם מוכנים ליצור צינור עיבוד נתונים שיוצר ומפרס מודלים של למידת מכונה באמצעות שירותים אחרים של Vertex AI. קדימה, מתחילים!

6. יצירת צינור עיבוד נתונים של למידת מכונה מקצה לקצה

הגיע הזמן ליצור את צינור עיבוד הנתונים הראשון שלכם ל-ML. בצינור עיבוד הנתונים הזה נשתמש במערך הנתונים של שעועית יבשה ב-UCI Machine Learning, מתוך: KOKLU, M. and OZKAN, I.A., (2020), "סיווג רב-סיווגי של שעועית יבשה באמצעות ראייה ממוחשבת וטכניקות למידת מכונה".בליחולים, 174, 105507. DOI.

זהו מערך נתונים בטבלה, ובצינור עיבוד הנתונים שלנו נשתמש במערך הנתונים כדי לאמן, להעריך ולפרוס מודל AutoML שמסווג שעועיות לאחד מ-7 הסוגים על סמך המאפיינים שלהן.

צינור עיבוד הנתונים הזה:

  • יצירת מערך נתונים ב-
  • אימון מודל סיווג טבלאי באמצעות AutoML
  • הצגת מדדי ההערכה של המודל
  • על סמך מדדי ההערכה, מחליטים אם לפרוס את המודל באמצעות לוגיקה מותנית ב-Vertex Pipelines
  • פריסת המודל בנקודת קצה (endpoint) באמצעות Vertex Prediction

כל אחד מהשלבים המפורטים יהיה רכיב. ברוב השלבים בצינור עיבוד הנתונים נעשה שימוש ברכיבים מוכנים מראש לשירותי Vertex AI באמצעות הספרייה google_cloud_pipeline_components שייבאנו מוקדם יותר בסדנת הקוד הזו. בקטע הזה נגדיר תחילה רכיב מותאם אישית אחד. לאחר מכן נגדיר את שאר השלבים בצינור עיבוד הנתונים באמצעות רכיבים שנוצרו מראש. רכיבים מוכנים מראש מאפשרים לכם לגשת בקלות לשירותי Vertex AI, כמו אימון מודלים ופריסה שלהם.

שלב 1: רכיב מותאם אישית להערכת המודל

נשתמש ברכיב בהתאמה אישית שתגדירו בסוף צינור עיבוד הנתונים, אחרי שהאימון של המודל יסתיים. הרכיב הזה יבצע כמה פעולות:

  • אחזור מדדי ההערכה ממודל הסיווג של AutoML שהוכשר
  • תנתח את המדדים ותעבד אותם בממשק המשתמש של Vertex Pipelines
  • השוואת המדדים לסף כדי לקבוע אם כדאי לפרוס את המודל

לפני שנגדיר את הרכיב, נבין את הפרמטרים של הקלט והפלט שלו. צינור עיבוד הנתונים הזה מקבל כקלט כמה מטא-נתונים מהפרויקט שלנו ב-Cloud, את המודל המאומן שנוצר (נתאר את הרכיב הזה בהמשך), מדדי ההערכה של המודל ו-thresholds_dict_str. thresholds_dict_str הוא ערך שתגדירו כשתפעילו את צינור עיבוד הנתונים. במקרה של מודל הסיווג הזה, זה יהיה האזור מתחת לערך של עקומת ה-ROC שבו צריך לפרוס את המודל. לדוגמה, אם נעביר ב-0.95, סימן שנרצה שצינור עיבוד הנתונים שלנו יפרוס את המודל רק אם המדד הזה גבוה מ-95%.

רכיב ההערכה שלנו מחזיר מחרוזת שמציינת אם כדאי לפרוס את המודל או לא. מוסיפים את הטקסט הבא לתא ה-notebook כדי ליצור את הרכיב המותאם אישית הזה:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

שלב 2: הוספת רכיבים מוכנים מראש של Google Cloud

בשלב הזה נתאר את שאר הרכיבים של צינור עיבוד הנתונים ונראה איך הם מתאימים זה לזה. קודם כול, מגדירים את השם המוצג של הרצת צינור עיבוד הנתונים באמצעות חותמת זמן:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

לאחר מכן מעתיקים את הטקסט הבא לתא חדש ביומן:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

בואו נראה מה קורה בקוד הזה:

  • קודם כל, בדיוק כמו בצינור עיבוד הנתונים הקודם שלנו, אנחנו מגדירים את הפרמטרים של הקלט לצינור עיבוד הנתונים הזה. צריך להגדיר אותם באופן ידני כי הם לא תלויים בפלט של שלבים אחרים בצינור עיבוד הנתונים.
  • בשאר צינור עיבוד הנתונים נעשה שימוש בכמה רכיבים מוכנים מראש ליצירת אינטראקציה עם שירותי Vertex AI:
    • TabularDatasetCreateOp יוצר מערך נתונים טבלאי ב-Vertex AI על סמך מקור של מערך נתונים ב-Cloud Storage או ב-BigQuery. בצינור עיבוד הנתונים הזה, אנחנו מעבירים את הנתונים דרך כתובת URL של טבלה ב-BigQuery
    • הפקודה AutoMLTabularTrainingJobRunOp מפעילה משימה של אימון AutoML למערך נתונים בטבלאות. אנחנו מעבירים כמה פרמטרים של הגדרה לרכיב הזה, כולל סוג המודל (במקרה הזה, סיווג), נתונים מסוימים על העמודות, משך הזמן שבו נרצה להריץ את האימון וכן הפניה למערך הנתונים. שימו לב שכדי להעביר את מערך הנתונים לרכיב הזה, אנחנו מספקים את הפלט של הרכיב הקודם דרך dataset_create_op.outputs["dataset"]
    • EndpointCreateOp יוצרת נקודת קצה ב-Vertex AI. נקודת הקצה שנוצרה מהשלב הזה תועבר כקלט לרכיב הבא
    • ModelDeployOp פורס מודל נתון בנקודת קצה ב-Vertex AI. במקרה כזה, נשתמש בנקודת הקצה שנוצרה מהשלב הקודם. יש אפשרויות תצורה נוספות, אבל כאן אנחנו מציינים את סוג המכונה והדגם של נקודת הקצה שאנחנו רוצים לפרוס. אנחנו מעבירים את המודל על ידי גישה לפלט של שלב האימון בצינור עיבוד הנתונים שלנו
  • צינור עיבוד הנתונים הזה משתמש גם בלוגיקה מותנית, תכונה של Vertex Pipelines שמאפשרת להגדיר תנאי, יחד עם ענפים שונים על סמך התוצאה של התנאי הזה. חשוב לזכור שכאשר הגדרנו את צינור עיבוד הנתונים, הענקנו לו פרמטר thresholds_dict_str. זהו סף הדיוק שאנחנו משתמשים בו כדי לקבוע אם לפרוס את המודל שלנו בנקודת קצה. כדי להטמיע את זה, אנחנו משתמשים בכיתה Condition מ-KFP SDK. התנאי שאנחנו מעבירים הוא הפלט של רכיב ההערכה בהתאמה אישית שהגדרנו קודם לכן ב-Codelab הזה. אם התנאי הזה מתקיים, צינור עיבוד הנתונים ימשיך להריץ את הרכיב deploy_op. אם הדיוק לא עומד בסף המוגדר מראש שלנו, צינור עיבוד הנתונים יעצור כאן ולא יפרוס מודל.

שלב 3: הידור והפעלה של צינור עיבוד הנתונים של למידת המכונה מקצה לקצה

עכשיו, אחרי שהגדרתם את צינור עיבוד הנתונים המלא, הגיע הזמן לכתוב אותו:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

בשלב הבא מגדירים את המשימה:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

ולבסוף, מריצים את המשימה:

ml_pipeline_job.submit()

כדי לראות את צינור עיבוד הנתונים במסוף, עוברים לקישור שמוצג ביומני האירועים אחרי שמריצים את התא שלמעלה. הרצת צינור עיבוד הנתונים הזה תימשך קצת יותר משעה. רוב הזמן מוקדש לשלב האימון של AutoML. צינור עיבוד הנתונים המלא ייראה כך:

צינור עיבוד נתונים של AutoML שהושלם

אם מפעילים או משביתים את הלחצן 'הרחבת הארטיפקטים' בחלק העליון, אפשר לראות פרטים על הארטיפקטים השונים שנוצרו מצינור עיבוד הנתונים. לדוגמה, אם לוחצים על הארטיפקט dataset, מוצגים פרטים על מערך הנתונים של Vertex AI שנוצר. אפשר ללחוץ על הקישור הבא כדי לעבור לדף של מערך הנתונים הזה:

מערך נתונים של צינור עיבוד נתונים

באופן דומה, כדי לראות את התצוגות החזותיות של המדדים שנוצרו מרכיב ההערכה המותאם אישית, לוחצים על הארטיפקט שנקרא metricsc. בצד שמאל של מרכז הבקרה תוכלו לראות את מטריצת הבלבול של המודל הזה:

הצגה חזותית של מדדים

כדי לראות את המודל ונקודת הקצה שנוצרו מההרצה של צינור עיבוד הנתונים הזה, עוברים לקטע מודלים ולוחצים על המודל automl-beans. המודל הזה אמור להיות פרוס בנקודת קצה:

Model-endpoint

אפשר גם לגשת לדף הזה בלחיצה על הארטיפקט endpoint בתרשים של צינור עיבוד הנתונים.

בנוסף להצגת תרשים צינור עיבוד הנתונים במסוף, אפשר גם להשתמש ב-Vertex Pipelines למעקב אחר ישות אב. במעקב אחרי שושלת, פירוש הדבר הוא ארטיפקטים של מעקב שנוצרו בצינור עיבוד הנתונים שלכם. כך נוכל להבין איפה נוצרו הארטיפקטים ואיך הם משמשים לאורך תהליך העבודה של למידת המכונה. לדוגמה, כדי לראות את מעקב ההשתלשלות של מערך הנתונים שנוצר בצינור עיבוד הנתונים הזה, לוחצים על הארטיפקט של מערך הנתונים ואז על View Lineage:

הצגת שושלת

כך אפשר לראות את כל המקומות שבהם נעשה שימוש באובייקט הארטיפקט הזה:

פרטי ישות אב

שלב 4: השוואת מדדים בין הפעלות של צינור עיבוד נתונים

אם אתם מפעילים את צינור עיבוד הנתונים הזה כמה פעמים, כדאי להשוות בין מדדים בכמה הפעלות. אפשר להשתמש בשיטה aiplatform.get_pipeline_df() כדי לגשת למטא-נתונים של ריצה. כאן נקבל מטא-נתונים של כל ההפעלות של צינור עיבוד הנתונים הזה ונטען אותם ב-Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

זהו, סיימתם את שיעור ה-Lab!

🎉 מזל טוב! 🎉

למדתם איך להשתמש ב-Vertex AI כדי:

  • שימוש ב-Kubeflow Pipelines SDK כדי לפתח צינורות עיבוד נתונים מקצה לקצה עם רכיבים מותאמים אישית
  • הרצת צינורות עיבוד הנתונים ב-Vertex Pipelines והפעלת הרצות של צינורות עיבוד נתונים באמצעות ה-SDK
  • הצגה וניתוח של תרשים Vertex Pipelines במסוף
  • שימוש ברכיבים מוכנים מראש של צינורות עיבוד נתונים כדי להוסיף שירותי Vertex AI לצינור עיבוד הנתונים
  • תזמון משימות חוזרות בצינור עיבוד נתונים

מידע נוסף על החלקים השונים של Vertex זמין במסמכי העזרה.

7. הסרת המשאבים

כדי שלא תחויבו, מומלץ למחוק את המשאבים שנוצרו בשיעור ה-Lab הזה.

שלב 1: מפסיקים או מוחקים את המכונה של Notebooks

אם אתם רוצים להמשיך להשתמש במחברת שיצרתם במעבדה הזו, מומלץ לכבות אותה כשהיא לא בשימוש. בממשק המשתמש של Notebooks במסוף Cloud, בוחרים את המחברות ואז בוחרים באפשרות Stop. אם רוצים למחוק את המכונה לגמרי, בוחרים באפשרות Delete:

עצירת המופע

שלב 2: מוחקים את נקודת הקצה

כדי למחוק את נקודת הקצה שפרסמתם, עוברים לקטע Endpoints במסוף Vertex AI ולוחצים על סמל המחיקה:

מחיקת נקודת קצה

לאחר מכן לוחצים על Undeploy בהודעה הבאה:

ביטול הפריסה של מודל

לבסוף, עוברים לקטע Models במסוף, מחפשים את המודל הזה ובתפריט שלוש הנקודות בצד שמאל, לוחצים על Delete model:

מחיקת המודל

שלב 3: מחיקת קטגוריה של Cloud Storage

כדי למחוק את הקטגוריה של האחסון, עוברים לתפריט הניווט במסוף Cloud, לוחצים על Storage, בוחרים את הקטגוריה ולוחצים על Delete:

מחיקת אחסון