Einführung in Vertex Pipelines

1. Übersicht

In diesem Lab lernen Sie, wie Sie ML-Pipelines mit Vertex Pipelines erstellen und ausführen.

Lerninhalte

Die folgenden Themen werden behandelt:

  • Mit dem Kubeflow Pipelines SDK skalierbare ML-Pipelines erstellen
  • Einführungspipeline mit drei Schritten und Text als Eingabe erstellen und ausführen
  • Pipeline erstellen und ausführen, die ein AutoML-Klassifizierungsmodell trainiert, bewertet und bereitstellt
  • Vordefinierte Komponenten, die über die Bibliothek google_cloud_pipeline_components bereitgestellt werden, für die Interaktion mit Vertex AI-Diensten nutzen
  • Pipelinejob mit Cloud Scheduler planen

Die Gesamtkosten für die Ausführung dieses Labs in Google Cloud betragen etwa 25$.

2. Einführung in Vertex AI

In diesem Lab wird das neueste KI-Produkt von Google Cloud verwendet. Vertex AI vereint die ML-Angebote von Google Cloud in einer nahtlosen Entwicklungsumgebung. Bisher musste auf mit AutoML trainierte und benutzerdefinierte Modelle über verschiedene Dienste zugegriffen werden. Das neue Angebot kombiniert diese und weitere, neue Produkte zu einer einzigen API. Sie können auch vorhandene Projekte zu Vertex AI migrieren.

Neben Diensten für das Modelltraining und die Bereitstellung umfasst Vertex AI auch eine Vielzahl von MLOps-Produkten, darunter Vertex Pipelines (der Schwerpunkt dieses Labs), Model Monitoring und Feature Store. Alle Vertex AI-Produkte finden Sie im folgenden Diagramm.

Vertex-Produktübersicht

Wenn Sie Feedback haben, lesen Sie bitte die Supportseite.

Warum sind ML-Pipelines nützlich?

Sehen wir uns erst einmal an, warum der Einsatz von Pipelines sinnvoll ist. Stellen Sie sich vor, Sie entwickeln einen ML-Workflow, der Datenverarbeitung, Modelltraining, Hyperparameter-Abstimmung, Bewertung und Modellbereitstellung umfasst. Jeder dieser Schritte kann andere Abhängigkeiten haben, was die Bearbeitung erschweren kann, wenn Sie den gesamten Workflow als eine Einheit behandeln. Beim Skalieren des ML-Prozesses kann es sinnvoll sein, Ihren ML-Workflow mit anderen Teammitgliedern zu teilen, damit diese ihn ebenfalls ausführen und Code beitragen können. Ohne zuverlässigen, reproduzierbaren Prozess kann dies schwierig sein. Pipelines sorgen dafür, dass es für jeden Schritt des ML-Prozesses einen eigenen Container gibt. So können Sie Schritte unabhängig voneinander entwickeln und die Ein- und Ausgaben jedes Schrittes verfolgen und reproduzieren. Außerdem können Sie Ausführungen der Pipeline basierend auf anderen Ereignissen in Ihrer Cloud-Umgebung planen oder auslösen, zum Beispiel wenn neue Trainingsdaten verfügbar sind.

Kurz gesagt: Pipelines helfen Ihnen, Ihren ML-Workflow zu automatisieren und zu reproduzieren.

3. Cloud-Umgebung einrichten

Für dieses Codelab benötigen Sie ein Google Cloud Platform-Projekt mit aktivierter Abrechnung. Folgen Sie dieser Anleitung, um ein Projekt zu erstellen.

Schritt 1: Cloud Shell starten

In diesem Lab arbeiten Sie in einer Cloud Shell-Sitzung. Das ist ein Befehlsinterpreter, der auf einer virtuellen Maschine in der Google-Cloud gehostet wird. Sie könnten diesen Abschnitt genauso gut lokal auf Ihrem eigenen Computer ausführen, aber mit Cloud Shell hat jeder Zugriff auf eine reproduzierbare Umgebung. Nach dem Lab können Sie diesen Abschnitt gern auf Ihrem eigenen Computer wiederholen.

Cloud Shell autorisieren

Cloud Shell aktivieren

Klicken Sie oben rechts in der Cloud Console auf die Schaltfläche unten, um Cloud Shell zu aktivieren:

Cloud Shell aktivieren

Wenn Sie die Cloud Shell zuvor noch nicht gestartet haben, wird ein Fenster mit einer Beschreibung eingeblendet. Klicken Sie in diesem Fall einfach auf Weiter. So sieht dieses Fenster aus:

Cloud Shell einrichten

Das Herstellen der Verbindung mit der Cloud Shell sollte nur wenige Augenblicke dauern.

Cloud Shell-Initialisierung

Auf dieser virtuellen Maschine sind alle Entwicklungstools installiert, die Sie benötigen. Sie bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und läuft in Google Cloud, was die Netzwerkleistung und Authentifizierung erheblich verbessert. Die meisten, wenn nicht sogar alle Aufgaben in diesem Codelab können mit einem Browser oder Ihrem Chromebook erledigt werden.

Sobald die Verbindung mit der Cloud Shell hergestellt ist, sehen Sie, dass Sie bereits authentifiziert sind und für das Projekt schon Ihre Projekt-ID eingestellt ist.

Führen Sie in der Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:

gcloud auth list

Die Befehlsausgabe sollte in etwa so aussehen:

Cloud Shell-Ausgabe

Führen Sie den folgenden Befehl in Cloud Shell aus, um zu bestätigen, dass der gcloud-Befehl Ihr Projekt kennt:

gcloud config list project

Befehlsausgabe

[core]
project = <PROJECT_ID>

Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:

gcloud config set project <PROJECT_ID>

Befehlsausgabe

Updated property [core/project].

Cloud Shell hat einige Umgebungsvariablen, darunter GOOGLE_CLOUD_PROJECT, die den Namen unseres aktuellen Cloud-Projekts enthält. Wir werden diese Variable an verschiedenen Stellen in diesem Lab verwenden. Sie können sie mit folgendem Befehl aufrufen:

echo $GOOGLE_CLOUD_PROJECT

Schritt 2: APIs aktivieren

In späteren Schritten sehen Sie, wo diese Dienste benötigt werden und warum. Führen Sie jetzt diesen Befehl aus, um Ihrem Projekt Zugriff auf die Dienste Compute Engine, Container Registry und Vertex AI zu gewähren:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Wenn die Aktivierung erfolgreich war, erhalten Sie eine Meldung, die ungefähr so aussieht:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Schritt 3: Cloud Storage-Bucket erstellen

Wenn wir einen Trainingsjob in Vertex AI ausführen möchten, benötigen wir einen Speicher-Bucket zum Speichern unserer gespeicherten Modell-Assets. Der Bucket muss regional sein. Wir verwenden hier us-central, Sie können aber auch eine andere Region verwenden (ersetzen Sie sie einfach in diesem Lab). Wenn Sie bereits einen Bucket haben, können Sie diesen Schritt überspringen.

Führen Sie die folgenden Befehle in Ihrem Cloud Shell-Terminal aus, um einen Bucket zu erstellen:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Als Nächstes gewähren wir unserem Compute-Dienstkonto Zugriff auf diesen Bucket. So wird sichergestellt, dass Vertex Pipelines die erforderlichen Berechtigungen zum Schreiben von Dateien in diesen Bucket hat. Führen Sie den folgenden Befehl aus, um diese Berechtigung hinzuzufügen:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Schritt 4: Vertex AI Workbench-Instanz erstellen

Klicken Sie in der Cloud Console im Bereich „Vertex AI“ auf „Workbench“:

Vertex AI-Menü

Klicken Sie dort unter Nutzerverwaltete Notebooks auf Neues Notebook:

Neues Notebook erstellen

Wählen Sie dann den Instanztyp TensorFlow Enterprise 2.3 (mit LTS) ohne GPUs aus:

TFE-Instanz

Übernehmen Sie die Standardoptionen und klicken Sie auf Erstellen.

Schritt 5: Notebook öffnen

Nachdem die Instanz erstellt wurde, klicken Sie auf JupyterLab öffnen:

Notebook öffnen

4. Vertex Pipelines einrichten

Damit wir Vertex Pipelines nutzen können, müssen wir noch einige zusätzliche Bibliotheken installieren:

  • Kubeflow Pipelines: Dieses SDK wird zum Erstellen der Pipeline verwendet. Vertex Pipelines unterstützt das Ausführen von mit Kubeflow Pipelines oder TFX erstellten Pipelines.
  • Google Cloud Pipeline Components: Diese Bibliothek enthält vordefinierte Komponenten, die das Interagieren mit Vertex AI-Diensten in Ihren Pipelineschritten erleichtern.

1. Schritt: Python-Notebook erstellen und Bibliotheken installieren

Erstellen Sie zuerst im Launcher-Menü in Ihrer Notebook-Instanz ein Notebook, indem Sie Python 3 auswählen:

Python3-Notebook erstellen

In das Launcher-Menü gelangen Sie, wenn Sie oben links in der Notebook-Instanz auf das +-Zeichen klicken.

Legen Sie zuerst das Flag „user“ in einer Notebook-Zelle fest, um beide für dieses Lab benötigten Dienste zu installieren:

USER_FLAG = "--user"

Führen Sie dann folgenden Code im Notebook aus:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Nach dem Installieren dieser Pakete müssen Sie den Kernel neu starten:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Prüfen Sie zum Abschluss, ob die Pakete korrekt installiert wurden. Die KFP SDK-Version sollte mindestens 1.8 sein:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

2. Schritt: Projekt-ID und Bucket einrichten

Während des Labs benötigen Sie immer wieder Ihre Cloud-Projekt-ID und den zuvor erstellten Bucket. Als Nächstes erstellen wir dafür Variablen.

Falls Sie die Projekt-ID nicht kennen, können Sie sie abrufen, indem Sie diesen Befehl ausführen:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Andernfalls legen Sie sie hier fest:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Erstellen Sie dann eine Variable zum Speichern des Bucket-Namens. Wenn Sie sie in diesem Lab erstellt haben, funktioniert Folgendes. Andernfalls müssen Sie dies manuell festlegen:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

3. Schritt: Bibliotheken importieren

Fügen Sie Folgendes hinzu, um die Bibliotheken für dieses Codelab zu importieren:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

4. Schritt: Konstanten definieren

Jetzt definieren wir noch einige konstante Variablen. Anschließend können wir mit dem Erstellen der Pipeline beginnen. PIPELINE_ROOT ist der Cloud Storage-Pfad, in den die durch die Pipeline erstellten Artefakte geschrieben werden. Wir verwenden hier us-central1 als Region, aber falls Sie beim Erstellen des Buckets eine andere Region ausgewählt haben, passen Sie die Variable REGION im folgenden Code an:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Nach dem Ausführen des obigen Codes sollte das Stammverzeichnis für Ihre Pipeline ausgegeben werden. Dies ist der Cloud Storage-Speicherort, in den die Artefakte aus Ihrer Pipeline geschrieben werden. Sie hat das Format gs://YOUR-BUCKET-NAME/pipeline_root/.

5. Erste Pipeline erstellen

Um sich mit der Funktionsweise von Vertex Pipelines vertraut zu machen, erstellen wir zuerst eine kurze Pipeline mit dem KFP SDK. Diese Pipeline erfüllt noch keine ML-Aufgaben (das kommt später), aber Sie sollen dadurch lernen:

  • wie im KFP SDK benutzerdefinierte Komponenten erstellt werden
  • wie eine Pipeline in Vertex Pipelines ausgeführt und überwacht wird

Wir erstellen eine Pipeline, die anhand von zwei Ausgaben – eines Produktnamens und einer Emojibeschreibung – einen Satz bildet. Diese Pipeline besteht aus drei Komponenten:

  • product_name: Diese Komponente verwendet einen Produktnamen (oder ein beliebiges Nomen) als Eingabe und gibt den String als Ausgabe zurück.
  • emoji: Diese Komponente verwendet die Beschreibung eines Emojis als Eingabe und wandelt sie in einen Emoji um. Der Textcode für ✨ lautet beispielsweise „sparkles“. Diese Komponente zeigt Ihnen anhand einer Emojibibliothek, wie Sie externe Abhängigkeiten in Ihrer Pipeline verwalten.
  • build_sentence: Diese letzte Komponente erstellt aus der Ausgabe der anderen beiden einen Satz, der das Emoji enthält. Die Ausgabe könnte zum Beispiel „Vertex Pipelines ist ✨“ sein.

Los gehts!

Schritt 1: Auf Python-Funktionen basierende Komponente erstellen

Mit dem KFP SDK können wir auf Python-Funktionen basierende Komponenten erstellen. Wir verwenden sie für die drei Komponenten in unserer ersten Pipeline. Zuerst erstellen wir die Komponente product_name, die einen String als Eingabe verwendet und diesen wieder ausgibt. Fügen Sie Folgendes zu Ihrem Notebook hinzu:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Sehen wir uns die Syntax einmal genauer an:

  • Der Decorator @component kompiliert diese Funktion in eine Komponente, wenn die Pipeline ausgeführt wird. Dies verwenden Sie immer, wenn Sie eine benutzerdefinierte Komponente schreiben.
  • Mit dem Parameter base_image wird das Container-Image angegeben, das diese Komponente verwendet.
  • Der Parameter output_component_file ist optional. Er gibt die YAML-Datei an, in die die kompilierte Komponente geschrieben werden soll. Nachdem die Zelle ausgeführt wurde, sollte diese Datei in Ihre Notebook-Instanz geschrieben worden sein. Wenn Sie die Komponente für eine andere Person freigeben möchten, können Sie ihr die erstellte YAML-Datei senden und sie bitten, diese mit folgendem Befehl zu laden:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Mit -> str nach der Funktionsdefinition wird der Ausgabetyp für diese Komponente angegeben.

2. Schritt: Zwei weitere Komponenten erstellen

Erstellen Sie zwei weitere Komponenten, um die Pipeline zu vervollständigen. Die erste Funktion, die wir definieren, verwendet einen String als Eingabe und wandelt diesen in das entsprechende Emoji um (sofern vorhanden). Sie gibt ein Tupel mit dem übergebenen Eingabetext und das zugehörige Emoji zurück:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Diese Komponente ist etwas komplexer als die vorherige. Das ist neu:

  • Der Parameter packages_to_install liefert der Komponente alle externen Bibliotheksabhängigkeiten für diesen Container. In diesem Fall verwenden wir die Bibliothek emoji.
  • Diese Komponente gibt ein NamedTuple namens Outputs zurück. Die Strings in diesem Tupel haben Schlüssel: emoji_text und emoji. Diese verwenden wir in der nächsten Komponente, um auf die Ausgabe zuzugreifen.

Die letzte Komponente in dieser Pipeline nimmt die Ausgaben der ersten beiden auf und macht daraus einen String, den sie dann ausgibt:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Vielleicht fragen Sie sich: Woher weiß diese Komponente, dass sie die Ausgaben der vorherigen, von Ihnen definierten Schritte verwenden soll? Gute Frage. Die Erklärung kommt im nächsten Schritt.

3. Schritt: Komponenten in einer Pipeline verknüpfen

Durch die oben festgelegten Komponentendefinitionen wurden Fabrikmethoden erstellt, die in einer Pipelinedefinition zum Erstellen von Schritten verwendet werden können. Verwenden Sie zum Einrichten einer Pipeline den Decorator @pipeline, geben Sie der Pipeline einen Namen und eine Beschreibung und geben Sie den Stammpfad an, an den die Artefakte der Pipeline geschrieben werden sollen. Artefakte sind in diesem Fall alle von der Pipeline generierten Ausgabedateien. Diese Einführungspipeline erstellt keine, aber die nächste Pipeline schon.

Im nächsten Codeblock definieren wir eine intro_pipeline-Funktion. Hier legen wir die Eingaben für die ersten Pipelineschritte und die Verbindung der Schritte untereinander fest:

  • product_task verwendet einen Produktnamen als Eingabe. Hier übergeben Sie „Vertex Pipelines“, aber Sie können auch etwas anderes angeben.
  • emoji_task verwendet den Textcode eines Emojis als Eingabe. Auch hier können Sie einen beliebigen Wert angeben. „party_face“ entspricht beispielsweise diesem Emoji: 🥳. Da diese Komponente und product_task keine Schritte haben, die ihnen eine Eingabe liefern, legen wir die Eingabe dafür manuell beim Definieren der Pipeline fest.
  • Der letzte Schritt in unserer Pipeline – consumer_task – hat drei Eingabeparameter:
    • Die Ausgabe von product_task. Da dieser Schritt nur eine Ausgabe liefert, können wir darauf mit product_task.output verweisen.
    • Die emoji-Ausgabe unseres emoji_task-Schritts. Oben haben wir für die emoji-Komponente die Ausgabeparameter festgelegt.
    • Die Ausgabe emoji_text der Komponente emoji. Falls Text an unsere Pipeline übergeben wird, zu dem es kein Emoji gibt, wird dieser Text verwendet, um einen Satz zu formulieren.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

4. Schritt: Pipeline kompilieren und ausführen

Nachdem Sie Ihre Pipeline definiert haben, können Sie sie kompilieren. Mit dem folgenden Befehl erstellen Sie eine JSON-Datei, die Sie zum Ausführen der Pipeline verwenden:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Erstellen Sie als Nächstes eine TIMESTAMP-Variable. Wir verwenden dies in unserer Job-ID:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Definieren Sie dann Ihren Pipeline-Job:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Führen Sie den Job aus, um eine neue Pipelineausführung zu erstellen:

job.submit()

Nach dem Ausführen dieser Zelle sollten Logs mit einem Link angezeigt werden, über den Sie die Pipelineausführung in der Console sehen können:

Pipelinejob-Logs

Rufen Sie diesen Link auf. Die Pipeline sollte am Ende so aussehen:

Abgeschlossene Einführungspipeline

Das Ausführen der Pipeline dauert fünf bis sechs Minuten. Wenn der Vorgang abgeschlossen ist, können Sie auf die Komponente build-sentence klicken, um die endgültige Ausgabe zu sehen:

Ausgabe der Intro-Pipeline

Jetzt wissen Sie, wie das KFP SDK und Vertex Pipelines funktionieren, und können eine Pipeline erstellen, die mit anderen Vertex AI-Diensten ein ML-Modell erstellt und bereitstellt. Sehen wir uns das genauer an.

6. End-to-End-ML-Pipeline erstellen

Jetzt erstellen Sie Ihre erste ML-Pipeline. In dieser Pipeline verwenden wir das UCI Machine Learning-Dataset Dry Beans aus: KOKLU, M. und OZKAN, I.A., (2020), „Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques“ in Computers and Electronics in Agriculture, 174, 105507. DOI

Dies ist ein tabellarisches Dataset, das wir in unserer Pipeline dazu verwenden, ein AutoML-Modell zu trainieren, zu bewerten und bereitzustellen. Dieses Modell ordnet Bohnen anhand ihrer Merkmale einer von sieben Arten zu.

Diese Pipeline:

  • Dataset in erstellen
  • Tabellarisches Klassifizierungsmodell mit AutoML trainieren
  • ruft Bewertungsmesswerte für das Modell ab
  • entscheidet abhängig von den Bewertungsmesswerten, ob das Modell mit bedingter Logik in Vertex Pipelines bereitgestellt wird
  • stellt das Modell anhand von Vertex Prediction auf einem Endpunkt bereit

Jeder der beschriebenen Schritte entspricht einer Komponente. Für die meisten Pipelineschritte werden vordefinierte Komponenten für Vertex AI-Dienste aus der Bibliothek google_cloud_pipeline_components verwendet, die Sie in diesem Codelab bereits importiert haben. In diesem Abschnitt definieren wir zuerst eine benutzerdefinierte Komponente. Anschließend definieren wir den Rest der Pipelineschritte mithilfe vordefinierter Komponenten. Diese erleichtern den Zugriff auf Vertex AI-Dienste beispielsweise für Modelltraining und ‑bereitstellung.

1. Schritt: Benutzerdefinierte Komponente für die Modellbewertung erstellen

Die von uns erstellte benutzerdefinierte Komponente kommt am Ende der Pipeline zum Einsatz, wenn das Modelltraining abgeschlossen ist. Diese Komponente hat die folgenden Aufgaben:

  • Bewertungsmesswerte aus dem trainierten AutoML-Klassifikationsmodell abrufen
  • Messwerte parsen und in der Vertex Pipelines-Benutzeroberfläche rendern
  • Messwerte mit einem Schwellenwert vergleichen, um festzustellen, ob das Modell bereitgestellt werden soll

Bevor wir die Komponente definieren, sollten wir ihre Eingabe- und Ausgabeparameter verstehen. Als Eingabe verwendet diese Pipeline einige Metadaten über unser Cloud-Projekt, das daraus resultierende trainierte Modell (diese Komponente definieren wir später), die Bewertungsmesswerte des Modells und thresholds_dict_str. Wir definieren thresholds_dict_str, wenn wir die Pipeline ausführen. Bei diesem Klassifikationsmodell entspricht dieser Wert der Fläche unterhalb der Kurve des ROC-Werts, bei dem das Modell bereitgestellt werden soll. Wenn Sie beispielsweise den Wert 0,95 übergeben, bedeutet dies, dass die Pipeline das Modell nur bereitstellen soll, wenn dieser Messwert über 95 % liegt.

Unsere Bewertungskomponente gibt einen String zurück, der angibt, ob das Modell bereitgestellt werden soll. Fügen Sie den folgenden Code in eine Notebook-Zelle ein, um die benutzerdefinierte Komponente zu erstellen:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

2. Schritt: Vordefinierte Google Cloud-Komponenten hinzufügen

In diesem Schritt definieren wir die restlichen Pipelinekomponenten und sehen uns an, wie sie zusammenpassen. Definieren Sie als Erstes mit einem Zeitstempel den Anzeigenamen für die Pipelineausführung:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Kopieren Sie dann folgenden Code in eine neue Notebook-Zelle:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Was dieser Code bewirkt:

  • Zuerst definieren wir wie in der vorherigen Pipeline die Eingabeparameter. Diese müssen wir manuell festlegen, da sie nicht von der Ausgabe anderer Schritte in der Pipeline abhängen.
  • In der restlichen Pipeline kommen einige vordefinierte Komponenten für die Interaktion mit Vertex AI-Diensten zum Einsatz:
    • TabularDatasetCreateOp erstellt ein tabellarisches Dataset in Vertex AI, wenn eine Dataset-Quelle in Cloud Storage oder BigQuery vorhanden ist. In dieser Pipeline übergeben wir die Daten über eine BigQuery-Tabellen-URL.
    • AutoMLTabularTrainingJobRunOp startet einen AutoML-Trainingsjob für ein tabellarisches Dataset. Wir übergeben dieser Komponente einige Konfigurationsparameter, darunter den Modelltyp (in diesem Fall: Klassifikation), einige Daten zu den Spalten, die gewünschte Trainingsdauer und einen Verweis auf das Dataset. Damit das Dataset an diese Komponente übergeben werden kann, stellen wir die Ausgabe der vorherigen Komponente über dataset_create_op.outputs["dataset"] bereit.
    • Mit EndpointCreateOp wird ein Endpunkt in Vertex AI erstellt. Der in diesem Schritt erstellte Endpunkt wird als Eingabe an die nächste Komponente übergeben.
    • ModelDeployOp stellt ein Modell auf einem Endpunkt in Vertex AI bereit. In diesem Fall verwenden wir den Endpunkt, der im vorherigen Schritt erstellt wurde. Es sind weitere Konfigurationsoptionen verfügbar, aber hier geben wir den Maschinentyp des Endpunkts und das Modell zur Bereitstellung an. Wir übergeben das Modell, indem wir auf die Ausgaben des Trainingsschritts in unserer Pipeline zugreifen.
  • Diese Pipeline verwendet außerdem bedingte Logik, eine Funktion von Vertex Pipelines, mit der Sie eine Bedingung und, basierend auf deren Ergebnis, verschiedene Verzweigungen definieren können. Beim Definieren der Pipeline haben wir den Parameter thresholds_dict_str übergeben. Dies ist der Accuracy-Schwellenwert, mit dem wir festlegen, ob das Modell auf einem Endpunkt bereitgestellt werden soll. Dazu verwenden wir die Klasse Condition aus dem KFP SDK. Die übergebene Bedingung ist die Ausgabe der benutzerdefinierten Bewertungskomponente, die wir zuvor in diesem Codelab definiert haben. Wenn die Bedingung erfüllt ist, führt die Pipeline die Komponente deploy_op weiter aus. Entspricht die Genauigkeit nicht dem vordefinierten Schwellenwert, endet die Pipeline und es wird kein Modell bereitgestellt.

3. Schritt: End-to-End-ML-Pipeline kompilieren und ausführen

Nachdem die komplette Pipeline definiert wurde, können Sie sie kompilieren:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Definieren Sie als Nächstes den Job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Führen Sie den Job schließlich aus:

ml_pipeline_job.submit()

Klicken Sie auf den Link, der in den Logs angezeigt wird, nachdem Sie die obige Zelle ausgeführt haben, um Ihre Pipeline in der Console zu sehen. Das Ausführen dieser Pipeline dauert etwas mehr als eine Stunde. Der größte Zeitaufwand entfällt auf den AutoML-Trainingsschritt. Die abgeschlossene Pipeline sieht in etwa so aus:

Abgeschlossene AutoML-Pipeline

Wenn Sie oben auf den Button zum Ein-/Ausblenden der Artefakte klicken, werden Ihnen Details zu den durch Ihre Pipeline erstellten Artefakten angezeigt. Wenn Sie beispielsweise auf das Artefakt dataset klicken, sehen Sie Details zum erstellten Vertex AI-Dataset. Sie können hier auf den Link klicken, um zur Seite für dieses Dataset zu wechseln:

Pipeline-Dataset

Wenn Sie die Messwertvisualisierungen aus unserer benutzerdefinierten Bewertungskomponente sehen möchten, klicken Sie auf das Artefakt metricsc. Auf der rechten Seite des Dashboards sehen Sie die Wahrheitsmatrix für dieses Modell:

Messwerte visualisieren

Wechseln Sie zum Modellabschnitt und klicken Sie auf das Modell mit dem Namen automl-beans, um sich das Modell und den Endpunkt anzusehen, die durch diese Pipelineausführung erstellt wurden. Dort sollten Sie sehen, dass das Modell auf einem Endpunkt bereitgestellt wurde:

Modellendpunkt

Auf diese Seite gelangen Sie auch, wenn Sie in der Pipelinegrafik auf das Artefakt endpoint klicken.

Neben dem Betrachten der Pipelinegrafik in der Console können Sie auch Vertex Pipelines für das Verfolgen der Lineage nutzen. Mit dem Verfolgen der Lineage werden Artefakte verfolgt, die in Ihrer Pipeline erstellt wurden. Dadurch erfahren wir, wo Artefakte erstellt wurden und wie sie in einem ML-Workflow verwendet werden. Wenn Sie die Lineage-Verfolgung für das in dieser Pipeline erstellte Dataset sehen möchten, klicken Sie auf das Dataset-Artefakt und dann auf Lineage ansehen:

Herkunft ansehen

Hier sehen wir alle Orte, an denen dieses Artefakt verwendet wird:

Details zur Herkunft

Schritt 4: Messwerte mehrerer Pipelineausführungen vergleichen

Wenn Sie diese Pipeline mehrfach ausführen, können Sie die Messwerte der einzelnen Ausführungen vergleichen. Mit der Methode aiplatform.get_pipeline_df() können Sie auf die Metadaten der Ausführungen zugreifen. Hier erhalten wir Metadaten für alle Ausführungen dieser Pipeline und laden diese in einen Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Damit haben Sie das Lab abgeschlossen.

🎉 Das wars! 🎉

Sie haben gelernt, wie Sie Vertex AI für folgende Aufgaben verwenden:

  • Mit dem Kubeflow Pipelines SDK End-to-End-Pipelines mit benutzerdefinierten Komponenten erstellen
  • Pipelines in Vertex Pipelines ausführen und Pipelineausführungen mit dem SDK starten
  • Vertex Pipelines-Diagramm in der Console ansehen und analysieren
  • Vordefinierte Pipelinekomponenten verwenden, um Vertex AI-Dienste in Ihre Pipeline einzufügen
  • Wiederkehrende Pipelinejobs planen

Weitere Informationen zu den verschiedenen Bereichen von Vertex finden Sie in der Dokumentation.

7. Bereinigen

Damit keine Gebühren anfallen, sollten Sie die in diesem Lab erstellten Ressourcen löschen.

Schritt 1: Notebooks-Instanz beenden oder löschen

Wenn Sie das in diesem Lab erstellte Notebook weiterhin verwenden möchten, sollten Sie es deaktivieren, wenn Sie es nicht nutzen. Wählen Sie in der Notebooks-Benutzeroberfläche in der Cloud Console das Notebook und dann Beenden aus. Wenn Sie die Instanz vollständig löschen möchten, wählen Sie Löschen aus:

Instanz beenden

Schritt 2: Endpunkt löschen

Wenn Sie den bereitgestellten Endpunkt löschen möchten, rufen Sie in der Vertex AI Console den Abschnitt Endpunkte auf und klicken Sie auf das Symbol zum Löschen:

Endpunkt löschen

Klicken Sie dann bei der folgenden Aufforderung auf Undeploy:

Bereitstellung des Modells aufheben

Rufen Sie schließlich den Bereich Modelle in der Konsole auf, suchen Sie das Modell und klicken Sie im Dreipunkt-Menü rechts auf Modell löschen:

Modell löschen

Schritt 3: Cloud Storage-Bucket löschen

Wenn Sie den Storage-Bucket löschen möchten, rufen Sie in der Cloud Console über das Navigationsmenü „Storage“ auf, wählen Sie den Bucket aus und klicken Sie auf „Löschen“:

Speicher löschen