Einführung in Vertex Pipelines

1. Übersicht

In diesem Lab erfahren Sie, wie Sie ML-Pipelines mit Vertex Pipelines erstellen und ausführen.

Lerninhalte

Die folgenden Themen werden behandelt:

  • Mit dem Kubeflow Pipelines SDK skalierbare ML-Pipelines erstellen
  • Einführungspipeline in drei Schritten mit Texteingabe erstellen und ausführen
  • Pipeline zum Trainieren, Bewerten und Bereitstellen eines AutoML-Klassifizierungsmodells erstellen und ausführen
  • Vordefinierte Komponenten für die Interaktion mit Vertex AI-Diensten verwenden, die über die google_cloud_pipeline_components-Bibliothek bereitgestellt werden
  • Pipeline-Job mit Cloud Scheduler planen

Die Gesamtkosten für die Ausführung dieses Labs in Google Cloud belaufen sich auf etwa 25$.

2. Einführung in Vertex AI

In diesem Lab wird das neueste KI-Produktangebot von Google Cloud verwendet. Vertex AI integriert die ML-Angebote in Google Cloud für eine nahtlose Entwicklung. Bisher konnten auf mit AutoML trainierte Modelle und benutzerdefinierte Modelle über separate Dienste zugegriffen werden. Das neue Angebot kombiniert beide zusammen mit anderen neuen Produkten in einer einzigen API. Sie können auch vorhandene Projekte zu Vertex AI migrieren.

Neben Modelltrainings- und Bereitstellungsdiensten umfasst Vertex AI auch eine Vielzahl von MLOps-Produkten, darunter Vertex Pipelines (Schwerpunkt dieses Labs), Modellmonitoring, Feature Store und mehr. Im Diagramm unten finden Sie alle Vertex AI-Produktangebote.

Vertex-Produktübersicht

Wenn du Feedback hast, sieh auf der Supportseite nach.

Warum sind ML-Pipelines nützlich?

Bevor wir uns näher damit befassen, sollten wir uns überlegen, warum Sie eine Pipeline verwenden sollten. Angenommen, Sie erstellen einen ML-Workflow, der die Verarbeitung von Daten, das Training eines Modells, die Hyperparameter-Abstimmung, die Bewertung und die Bereitstellung des Modells umfasst. Jeder dieser Schritte kann unterschiedliche Abhängigkeiten haben, was unübersichtlich werden kann, wenn Sie den gesamten Workflow als monolithischen Prozess behandeln. Wenn Sie mit der Skalierung Ihres ML-Prozesses beginnen, möchten Sie Ihren ML-Workflow möglicherweise mit anderen Mitgliedern Ihres Teams teilen, damit diese ihn ausführen und Code beisteuern können. Ohne einen zuverlässigen, reproduzierbaren Prozess kann das schwierig werden. Bei Pipelines hat jeder Schritt in Ihrem ML-Prozess einen eigenen Container. So können Sie Schritte unabhängig entwickeln und die Eingabe und Ausgabe jedes Schritts auf reproduzierbare Weise verfolgen. Sie können auch Ausführungen Ihrer Pipeline basierend auf anderen Ereignissen in Ihrer Cloud-Umgebung planen oder auslösen, z. B. einen Pipelinelauf starten, wenn neue Trainingsdaten verfügbar sind.

Zusammenfassung: Pipelines helfen Ihnen, Ihren ML-Workflow zu automatisieren und zu reproduzieren.

3. Cloud-Umgebung einrichten

Sie benötigen ein Google Cloud Platform-Projekt mit aktivierter Abrechnung, um dieses Codelab auszuführen. Eine Anleitung zum Erstellen eines Projekts finden Sie hier.

Schritt 1: Cloud Shell starten

In diesem Lab arbeiten Sie in einer Cloud Shell-Sitzung. Das ist ein Befehlszeileninterpreter, der von einer virtuellen Maschine gehostet wird, die in der Google-Cloud ausgeführt wird. Sie können diesen Abschnitt auch lokal auf Ihrem eigenen Computer ausführen. Mit Cloud Shell haben alle Nutzer jedoch Zugriff auf eine reproduzierbare Umgebung. Nach dem Lab können Sie diesen Abschnitt auf Ihrem eigenen Computer noch einmal wiederholen.

Cloud Shell autorisieren

Cloud Shell aktivieren

Klicken Sie rechts oben in der Cloud Console auf die Schaltfläche unten, um Cloud Shell zu aktivieren:

Cloud Shell aktivieren

Wenn Sie Cloud Shell noch nie gestartet haben, wird ein Zwischenbildschirm (unten) angezeigt, auf dem beschrieben wird, was es ist. Klicken Sie in diesem Fall auf Weiter. Dieser Bildschirm wird dann nicht mehr angezeigt. So sieht dieser einmalige Bildschirm aus:

Cloud Shell einrichten

Die Bereitstellung und Verbindung mit Cloud Shell dauert nur einen Moment.

Cloud Shell-Initialisierung

Auf dieser virtuellen Maschine sind alle erforderlichen Entwicklungstools installiert. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft in Google Cloud, was die Netzwerkleistung und Authentifizierung erheblich verbessert. Die meisten, wenn nicht alle Aufgaben in diesem Codelab können Sie einfach mit einem Browser oder Ihrem Chromebook erledigen.

Sobald Sie mit Cloud Shell verbunden sind, sollten Sie sehen, dass Sie bereits authentifiziert sind und dass das Projekt bereits auf Ihre Projekt-ID eingestellt ist.

Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:

gcloud auth list

Die Befehlsausgabe sollte in etwa so aussehen:

Cloud Shell-Ausgabe

Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob der gcloud-Befehl Ihr Projekt kennt:

gcloud config list project

Befehlsausgabe

[core]
project = <PROJECT_ID>

Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:

gcloud config set project <PROJECT_ID>

Befehlsausgabe

Updated property [core/project].

Cloud Shell hat einige Umgebungsvariablen, darunter GOOGLE_CLOUD_PROJECT, die den Namen unseres aktuellen Cloud-Projekts enthält. Wir verwenden diese Variable an verschiedenen Stellen in diesem Lab. Sie können sie mit dem folgenden Befehl aufrufen:

echo $GOOGLE_CLOUD_PROJECT

Schritt 2: APIs aktivieren

In späteren Schritten erfahren Sie, wo und warum diese Dienste erforderlich sind. Führen Sie jetzt einfach diesen Befehl aus, um Ihrem Projekt Zugriff auf die Compute Engine, Container Registry und Vertex AI zu gewähren:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Wenn die Aktivierung erfolgreich war, erhalten Sie eine Meldung, die ungefähr so aussieht:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Schritt 3: Cloud Storage-Bucket erstellen

Zum Ausführen eines Trainingsjobs in Vertex AI benötigen Sie einen Storage-Bucket zum Speichern der gespeicherten Modell-Assets. Der Bucket muss regional sein. Wir verwenden hier us-central, Sie können aber auch eine andere Region verwenden. Ersetzen Sie sie dann einfach in diesem Lab. Wenn Sie bereits einen Bucket haben, können Sie diesen Schritt überspringen.

Führen Sie die folgenden Befehle in Ihrem Cloud Shell-Terminal aus, um einen Bucket zu erstellen:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Als Nächstes gewähren wir dem Compute-Dienstkonto Zugriff auf diesen Bucket. Dadurch wird sichergestellt, dass Vertex Pipelines die erforderlichen Berechtigungen zum Schreiben von Dateien in diesen Bucket hat. Führen Sie den folgenden Befehl aus, um diese Berechtigung hinzuzufügen:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Schritt 4: Vertex AI Workbench-Instanz erstellen

Klicken Sie in der Cloud Console im Bereich „Vertex AI“ auf „Workbench“:

Vertex AI-Menü

Klicken Sie dort unter Nutzerverwaltete Notebooks auf Neues Notebook:

Neues Notebook erstellen

Wählen Sie dann den Instanztyp TensorFlow Enterprise 2.3 (mit LTS) ohne GPUs aus:

TFE-Instanz

Verwenden Sie die Standardoptionen und klicken Sie dann auf Erstellen.

Schritt 5: Notebook öffnen

Wählen Sie nach dem Erstellen der Instanz JupyterLab öffnen aus:

Notebook öffnen

4. Vertex Pipelines einrichten

Es gibt noch einige zusätzliche Bibliotheken, die wir installieren müssen, um Vertex Pipelines verwenden zu können:

  • Kubeflow Pipelines: Das SDK, mit dem wir unsere Pipeline erstellen. Vertex Pipelines unterstützt die Ausführung von Pipelines, die sowohl mit Kubeflow Pipelines als auch mit TFX erstellt wurden.
  • Google Cloud-Pipelinekomponenten: Diese Bibliothek enthält vordefinierte Komponenten, die die Interaktion mit Vertex AI-Diensten über Ihre Pipelineschritte erleichtern.

Schritt 1: Python-Notebook erstellen und Bibliotheken installieren

Erstellen Sie zuerst ein Notebook in Ihrer Notebook-Instanz. Wählen Sie dazu im Launcher-Menü die Option Python 3 aus:

Python 3-Notebook erstellen

Sie können auf das Launcher-Menü zugreifen, indem Sie links oben in Ihrer Notebookinstanz auf das Pluszeichen + klicken.

Um beide Dienste zu installieren, die wir in diesem Lab verwenden werden, setzen Sie zuerst das Nutzerflag in einer Notebookzelle:

USER_FLAG = "--user"

Führen Sie dann Folgendes in Ihrem Notebook aus:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Nach der Installation dieser Pakete müssen Sie den Kernel neu starten:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Prüfen Sie abschließend, ob Sie die Pakete richtig installiert haben. Die KFP SDK-Version muss mindestens 1.8 sein:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Schritt 2: Projekt-ID und Bucket festlegen

In diesem Lab verweisen Sie auf Ihre Cloud-Projekt-ID und den Bucket, den Sie zuvor erstellt haben. Als Nächstes erstellen wir Variablen für jeden davon.

Wenn Sie Ihre Projekt-ID nicht kennen, können Sie sie möglicherweise mit dem folgenden Befehl abrufen:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Andernfalls legen Sie sie hier fest:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Erstellen Sie dann eine Variable, um den Namen Ihres Buckets zu speichern. Wenn Sie sie in diesem Lab erstellt haben, funktioniert Folgendes: Andernfalls müssen Sie dies manuell festlegen:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Schritt 3: Bibliotheken importieren

Fügen Sie Folgendes hinzu, um die Bibliotheken zu importieren, die wir in diesem Codelab verwenden werden:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Schritt 4: Konstanten definieren

Bevor wir unsere Pipeline erstellen, müssen wir noch einige Konstantenvariablen definieren. PIPELINE_ROOT ist der Cloud Storage-Pfad, in den die von unserer Pipeline erstellten Artefakte geschrieben werden. Wir verwenden hier us-central1 als Region. Wenn Sie beim Erstellen des Buckets jedoch eine andere Region verwendet haben, aktualisieren Sie die Variable REGION im Code unten:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Nachdem Sie den obigen Code ausgeführt haben, sollte das Stammverzeichnis Ihrer Pipeline ausgegeben werden. Dies ist der Cloud Storage-Speicherort, an dem die Artefakte aus Ihrer Pipeline geschrieben werden. Sie hat das Format gs://YOUR-BUCKET-NAME/pipeline_root/.

5. Erste Pipeline erstellen

Um sich mit der Funktionsweise von Vertex Pipelines vertraut zu machen, erstellen wir zuerst eine kurze Pipeline mit dem KFP SDK. Diese Pipeline erfüllt nichts mit ML (keine Sorge, wir schaffen das!). Wir verwenden sie, um Ihnen Folgendes zu vermitteln:

  • Benutzerdefinierte Komponenten im KFP SDK erstellen
  • Pipeline in Vertex Pipelines ausführen und überwachen

Wir erstellen eine Pipeline, die einen Satz mit zwei Ausgaben ausgibt: einem Produktnamen und einer Emoji-Beschreibung. Diese Pipeline besteht aus drei Komponenten:

  • product_name: Diese Komponente nimmt einen Produktnamen (oder ein beliebiges Nomen) als Eingabe entgegen und gibt diesen String als Ausgabe zurück.
  • emoji: Diese Komponente wandelt die Textbeschreibung eines Emojis in ein Emoji um. Der Textcode für ✨ lautet beispielsweise „sparkles“. In dieser Komponente wird anhand einer Emoji-Bibliothek gezeigt, wie Sie externe Abhängigkeiten in Ihrer Pipeline verwalten.
  • build_sentence: Diese letzte Komponente verwendet die Ausgabe der beiden vorherigen Komponenten, um einen Satz mit dem Emoji zu erstellen. Die Ausgabe könnte beispielsweise „Vertex Pipelines ist ✨“ lauten.

Los gehts mit dem Programmieren!

Schritt 1: Funktionsbasierte Python-Komponente erstellen

Mit dem KFP SDK können wir Komponenten basierend auf Python-Funktionen erstellen. Wir verwenden diese für die drei Komponenten in unserer ersten Pipeline. Wir erstellen zuerst die product_name-Komponente, die einfach einen String als Eingabe nimmt und diesen String zurückgibt. Fügen Sie Ihrem Notebook Folgendes hinzu:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Sehen wir uns die Syntax einmal genauer an:

  • Der @component-Decorator kompiliert diese Funktion in eine Komponente, wenn die Pipeline ausgeführt wird. Sie benötigen sie immer dann, wenn Sie eine benutzerdefinierte Komponente schreiben.
  • Der Parameter base_image gibt das Container-Image an, das von dieser Komponente verwendet wird.
  • Der Parameter output_component_file ist optional und gibt die YAML-Datei an, in die die kompilierte Komponente geschrieben werden soll. Nachdem Sie die Zelle ausgeführt haben, sollte die Datei in Ihre Notebookinstanz geschrieben werden. Wenn Sie diese Komponente für einen Nutzer freigeben möchten, können Sie ihm die generierte YAML-Datei senden, die dann so geladen wird:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Mit dem -> str nach der Funktionsdefinition wird der Ausgabetyp für diese Komponente angegeben.

Schritt 2: Zwei weitere Komponenten erstellen

Zur Fertigstellung unserer Pipeline erstellen wir zwei weitere Komponenten. Die erste Funktion, die wir definieren, nimmt einen String als Eingabe entgegen und konvertiert ihn in das entsprechende Emoji, falls vorhanden. Es gibt ein Tupel mit dem übergebenen Eingabetext und dem resultierenden Emoji zurück:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Diese Komponente ist etwas komplexer als die vorherige. Das ist neu:

  • Der Parameter packages_to_install gibt der Komponente alle externen Bibliotheksabhängigkeiten für diesen Container an. In diesem Fall verwenden wir eine Bibliothek namens Emojis.
  • Diese Komponente gibt eine NamedTuple namens Outputs zurück. Beachten Sie, dass jeder String in diesem Tupel Schlüssel hat: emoji_text und emoji. Diese verwenden wir in unserer nächsten Komponente, um auf die Ausgabe zuzugreifen.

Die letzte Komponente in dieser Pipeline verarbeitet die Ausgabe der ersten beiden und kombiniert sie, um einen String zurückzugeben:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Sie fragen sich vielleicht, woher diese Komponente weiß, dass sie die Ausgabe aus den vorherigen Schritten verwenden soll. Im nächsten Schritt bringen wir alles zusammen.

Schritt 3: Komponenten in einer Pipeline zusammenstellen

Mit den oben definierten Komponentendefinitionen wurden Fabrikfunktionen erstellt, die in einer Pipelinedefinition zum Erstellen von Schritten verwendet werden können. Verwenden Sie zum Einrichten einer Pipeline den @pipeline-Decorator, geben Sie der Pipeline einen Namen und eine Beschreibung sowie den Stammpfad, in den die Artefakte Ihrer Pipeline geschrieben werden sollen. Als Artefakte bezeichnen wir alle Ausgabedateien, die von Ihrer Pipeline generiert werden. Diese Einführungspipeline generiert keine, aber unsere nächste Pipeline schon.

Im nächsten Codeblock definieren wir eine intro_pipeline-Funktion. Hier geben wir die Eingaben für unsere ersten Pipelineschritte an und wie die Schritte miteinander verbunden sind:

  • product_task nimmt einen Produktnamen als Eingabe an. Hier wird „Vertex Pipelines“ übergeben, aber Sie können es beliebig ändern.
  • emoji_task nimmt den Textcode für ein Emoji als Eingabe an. Sie können diesen Namen auch beliebig ändern. „party_face“ bezieht sich beispielsweise auf das Emoji 🥳. Da weder diese noch die product_task-Komponente Schritte haben, die ihnen Eingaben zuführen, geben wir die Eingabe für diese manuell an, wenn wir die Pipeline definieren.
  • Der letzte Schritt in unserer Pipeline – consumer_task hat drei Eingabeparameter:
    • Die Ausgabe von product_task. Da dieser Schritt nur eine Ausgabe erzeugt, können wir über product_task.output darauf verweisen.
    • Die emoji-Ausgabe unseres emoji_task-Schritts. Sehen Sie sich die oben definierte emoji-Komponente an, in der wir die Ausgabeparameter benannt haben.
    • Ähnlich die benannte Ausgabe von emoji_text der Komponente emoji. Wenn unserer Pipeline Text übergeben wird, der keinem Emoji entspricht, wird dieser Text verwendet, um einen Satz zu bilden.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Schritt 4: Pipeline kompilieren und ausführen

Nachdem Sie die Pipeline definiert haben, können Sie sie kompilieren. Dadurch wird eine JSON-Datei generiert, mit der Sie die Pipeline ausführen:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Erstellen Sie als Nächstes eine TIMESTAMP-Variable. Diese verwenden wir in unserer Job-ID:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Definieren Sie dann Ihren Pipelinejob:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Führen Sie den Job aus, um eine neue Pipelineausführung zu erstellen:

job.submit()

Nach dem Ausführen dieser Zelle sollten Sie Protokolle mit einem Link sehen, über den Sie die Pipelineausführung in der Konsole aufrufen können:

Pipeline job logs

Rufen Sie diesen Link auf. Ihre Pipeline sollte dann so aussehen:

Abgeschlossene Einführungspipeline

Die Ausführung dieser Pipeline dauert 5 bis 6 Minuten. Wenn Sie fertig sind, können Sie auf die Komponente build-sentence klicken, um die endgültige Ausgabe zu sehen:

Ausgabe der Einführungspipeline

Sie sind nun mit der Funktionsweise des KFP SDK und Vertex Pipelines vertraut und können eine Pipeline erstellen, die ein ML-Modell mithilfe anderer Vertex AI-Dienste erstellt und bereitstellt. Legen wir los.

6. End-to-End-ML-Pipeline erstellen

Jetzt ist es an der Zeit, Ihre erste ML-Pipeline zu erstellen. In dieser Pipeline verwenden wir das Dataset „Dry beans“ (Trockenbohnen) aus dem UCI Machine Learning Repository von KOKLU, M. und OZKAN, I.A., (2020), „Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques“ (Mehrklassenklassifizierung von Trockenbohnen mithilfe von Computer Vision und maschinellem Lernen). In Computers and Electronics in Agriculture, 174, 105507. DOI.

Dies ist ein tabellarisches Dataset. In unserer Pipeline verwenden wir das Dataset, um ein AutoML-Modell zu trainieren, zu bewerten und bereitzustellen, das Bohnen anhand ihrer Eigenschaften in einen von sieben Typen einteilt.

Diese Pipeline bietet folgende Vorteile:

  • Dataset in erstellen
  • Tabellarisches Klassifizierungsmodell mit AutoML trainieren
  • Bewertungsmesswerte für dieses Modell abrufen
  • Anhand der Bewertungsmesswerte entscheiden, ob das Modell mithilfe bedingter Logik in Vertex Pipelines bereitgestellt werden soll
  • Modell mit Vertex Prediction auf einem Endpunkt bereitstellen

Jeder der beschriebenen Schritte ist eine Komponente. Für die meisten Pipelineschritte werden vordefinierte Komponenten für Vertex AI-Dienste über die google_cloud_pipeline_components-Bibliothek verwendet, die wir zuvor in diesem Codelab importiert haben. In diesem Abschnitt definieren wir zuerst eine benutzerdefinierte Komponente. Anschließend definieren wir die restlichen Pipelineschritte mit vordefinierten Komponenten. Vorgefertigte Komponenten erleichtern den Zugriff auf Vertex AI-Dienste wie Modelltraining und Bereitstellung.

Schritt 1: Benutzerdefinierte Komponente für die Modellbewertung

Die benutzerdefinierte Komponente, die wir definieren, wird gegen Ende unserer Pipeline verwendet, sobald das Modelltraining abgeschlossen ist. Diese Komponente hat mehrere Funktionen:

  • Bewertungsmesswerte aus dem trainierten AutoML-Klassifizierungsmodell abrufen
  • Messwerte parsen und in der Vertex Pipelines-Benutzeroberfläche rendern
  • Messwerte mit einem Schwellenwert vergleichen, um festzustellen, ob das Modell bereitgestellt werden soll

Bevor wir die Komponente definieren, sehen wir uns ihre Eingabe- und Ausgabeparameter an. Als Eingabe verwendet diese Pipeline einige Metadaten für unser Cloud-Projekt, das resultierende trainierte Modell (diese Komponente wird später definiert), die Bewertungsmesswerte des Modells und ein thresholds_dict_str. thresholds_dict_str wird definiert, wenn wir die Pipeline ausführen. Bei diesem Klassifizierungsmodell ist dies der Bereich unter dem ROC-Kurvenwert, für den das Modell bereitgestellt werden sollte. Wenn wir beispielsweise 0,95 übergeben, soll das Modell nur dann in unserer Pipeline bereitgestellt werden, wenn dieser Messwert über 95 % liegt.

Unsere Bewertungskomponente gibt einen String zurück, der angibt, ob das Modell bereitgestellt werden soll oder nicht. Fügen Sie den folgenden Code in eine Notebookzelle ein, um diese benutzerdefinierte Komponente zu erstellen:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Schritt 2: Vordefinierte Google Cloud-Komponenten hinzufügen

In diesem Schritt definieren wir die restlichen Pipelinekomponenten und sehen uns an, wie sie zusammenpassen. Definieren Sie zuerst den Anzeigenamen für Ihre Pipelineausführung mit einem Zeitstempel:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Kopieren Sie dann Folgendes in eine neue Notebookzelle:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Sehen wir uns an, was in diesem Code passiert:

  • Zuerst definieren wir wie in der vorherigen Pipeline die Eingabeparameter dieser Pipeline. Diese müssen manuell festgelegt werden, da sie nicht von der Ausgabe anderer Schritte in der Pipeline abhängen.
  • Der Rest der Pipeline verwendet einige vordefinierte Komponenten für die Interaktion mit Vertex AI-Diensten:
    • TabularDatasetCreateOp erstellt anhand einer Dataset-Quelle entweder in Cloud Storage oder BigQuery ein tabellarisches Dataset in Vertex AI. In dieser Pipeline übergeben wir die Daten über eine BigQuery-Tabellen-URL.
    • AutoMLTabularTrainingJobRunOp startet einen AutoML-Trainingsjob für ein tabellarisches Dataset. Wir übergeben dieser Komponente einige Konfigurationsparameter, darunter den Modelltyp (in diesem Fall die Klassifizierung), einige Daten in den Spalten, die Dauer des Trainings und ein Verweis auf das Dataset. Um den Datensatz an diese Komponente weiterzugeben, geben wir die Ausgabe der vorherigen Komponente über dataset_create_op.outputs["dataset"] an.
    • EndpointCreateOp erstellt einen Endpunkt in Vertex AI. Der in diesem Schritt erstellte Endpunkt wird als Eingabe an die nächste Komponente übergeben.
    • ModelDeployOp stellt ein bestimmtes Modell in einem Endpunkt in Vertex AI bereit. In diesem Fall verwenden wir den Endpunkt, der im vorherigen Schritt erstellt wurde. Es gibt zusätzliche Konfigurationsoptionen, aber hier stellen wir den Maschinentyp und das Modell für den Endpunkt bereit, die bereitgestellt werden sollen. Wir übergeben das Modell, indem wir auf die Ausgaben des Trainingsschritts in unserer Pipeline zugreifen.
  • Diese Pipeline verwendet auch bedingte Logik, eine Funktion von Vertex Pipelines, mit der Sie eine Bedingung und verschiedene Verzweigungen basierend auf dem Ergebnis dieser Bedingung definieren können. Denken Sie daran, dass wir bei der Definition unserer Pipeline einen thresholds_dict_str-Parameter übergeben haben. Dies ist der Genauigkeitsschwellenwert, den wir verwenden, um zu bestimmen, ob unser Modell auf einem Endpunkt bereitgestellt werden soll. Zur Implementierung verwenden wir die Klasse Condition aus dem KFP SDK. Die übergebene Bedingung ist die Ausgabe der benutzerdefinierten Auswertungskomponente, die wir zuvor in diesem Codelab definiert haben. Wenn diese Bedingung erfüllt ist, wird die deploy_op-Komponente in der Pipeline weiterhin ausgeführt. Wenn die Genauigkeit nicht unseren vordefinierten Grenzwert erreicht, wird die Pipeline an dieser Stelle beendet und es wird kein Modell bereitgestellt.

Schritt 3: End-to-End-ML-Pipeline kompilieren und ausführen

Nachdem unsere vollständige Pipeline definiert ist, ist es an der Zeit, sie zu kompilieren:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Definieren Sie als Nächstes den Job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Führen Sie abschließend den Job aus:

ml_pipeline_job.submit()

Gehen Sie zu dem Link, der in den Logs angezeigt wird, nachdem Sie die Zelle oben ausgeführt haben, um Ihre Pipeline in der Konsole anzuzeigen. Die Ausführung dieser Pipeline dauert etwas mehr als eine Stunde. Der Großteil der Zeit wird für den AutoML-Trainingsschritt aufgewendet. Die fertige Pipeline sieht in etwa so aus:

Fertige AutoML-Pipeline

Wenn Sie oben die Schaltfläche „Artefakte maximieren“ aktivieren, werden Details zu den verschiedenen Artefakten angezeigt, die aus Ihrer Pipeline erstellt wurden. Wenn Sie beispielsweise auf das Artefakt dataset klicken, werden Details zum erstellten Vertex AI-Dataset angezeigt. Klicken Sie auf den Link, um die Seite für dieses Dataset aufzurufen:

Pipeline-Datensatz

Wenn Sie die resultierenden Messwertvisualisierungen aus unserer benutzerdefinierten Bewertungskomponente sehen möchten, klicken Sie auf das Artefakt metricsc. Auf der rechten Seite Ihres Dashboards sehen Sie die Wahrheitsmatrix für dieses Modell:

Messwertvisualisierung

Wenn Sie das Modell und den Endpunkt sehen möchten, die durch diesen Pipelinelauf erstellt wurden, rufen Sie den Abschnitt „Modelle“ auf und klicken Sie auf das Modell mit dem Namen automl-beans. Dort sollte dieses Modell für einen Endpunkt bereitgestellt werden:

Modellendpunkt

Sie können diese Seite auch aufrufen, indem Sie in der Pipelinegrafik auf das Artefakt endpoint klicken.

Sie können sich nicht nur das Pipelinediagramm in der Console ansehen, sondern auch Vertex Pipelines für das Herkunfts-Tracking verwenden. Bei der Herkunftsverfolgung geht es darum, Artefakte zu verfolgen, die in Ihrer Pipeline erstellt wurden. So können wir nachvollziehen, wo Artefakte erstellt wurden und wie sie in einem ML-Workflow verwendet werden. Wenn Sie beispielsweise das Lineage-Tracking für das in dieser Pipeline erstellte Dataset sehen möchten, klicken Sie auf das Dataset-Artefakt und dann auf Lineage anzeigen:

Herkunft ansehen

Hier sehen wir alle Stellen, an denen dieses Artefakt verwendet wird:

Details zur Herkunft

Schritt 4: Messwerte für verschiedene Pipelineausführungen vergleichen

Wenn Sie diese Pipeline mehrmals ausführen, sollten Sie die Messwerte verschiedener Ausführungen vergleichen. Mit der Methode aiplatform.get_pipeline_df() können Sie auf Metadaten zu Ausführungen zugreifen. Hier rufen wir Metadaten für alle Ausführungen dieser Pipeline ab und laden sie in einen Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Damit ist das Lab abgeschlossen.

🎉 Glückwunsch! 🎉

Sie haben gelernt, wie Sie mit Vertex AI Folgendes tun können:

  • Mit dem Kubeflow Pipelines SDK End-to-End-Pipelines mit benutzerdefinierten Komponenten erstellen
  • Pipelines in Vertex Pipelines ausführen und Pipelineausführungen mit dem SDK starten
  • Vertex Pipelines-Diagramm in der Console ansehen und analysieren
  • Vordefinierte Pipeline-Komponenten verwenden, um Ihrer Pipeline Vertex AI-Dienste hinzuzufügen
  • Wiederkehrende Pipeline-Jobs planen

Weitere Informationen zu den verschiedenen Bereichen von Vertex finden Sie in der Dokumentation.

7. Bereinigen

Damit Ihnen keine Kosten in Rechnung gestellt werden, sollten Sie die in diesem Lab erstellten Ressourcen löschen.

Schritt 1: Notebooks-Instanz beenden oder löschen

Wenn Sie das in diesem Lab erstellte Notebook weiterhin verwenden möchten, sollten Sie es deaktivieren, wenn Sie es nicht verwenden. Wählen Sie in der Notebooks-UI in der Cloud Console das Notebook und dann Beenden aus. Wenn Sie die Instanz vollständig löschen möchten, wählen Sie Löschen aus:

Instanz beenden

Schritt 2: Endpunkt löschen

Wenn Sie den bereitgestellten Endpunkt löschen möchten, rufen Sie in der Vertex AI-Konsole den Bereich Endpunkte auf und klicken Sie auf das Symbol „Löschen“:

Endpunkt löschen

Klicken Sie dann in der folgenden Aufforderung auf Undeploy (Deaktivierung):

Bereitstellung des Modells aufheben

Gehen Sie schließlich zum Abschnitt Models (Modelle) der Konsole, suchen Sie das gewünschte Modell und klicken Sie im Dreipunkt-Menü rechts auf Delete model (Modell löschen):

Modell löschen

Schritt 3: Cloud Storage-Bucket löschen

Wenn Sie den Speicher-Bucket löschen möchten, klicken Sie in der Cloud Console im Navigationsmenü auf „Speicher“, wählen Sie den Bucket aus und klicken Sie auf „Löschen“:

Speicher löschen