Einführung in Vertex Pipelines

1. Übersicht

In diesem Lab lernen Sie, wie Sie mit Vertex Pipelines ML-Pipelines erstellen und ausführen.

Lerninhalte

Die folgenden Themen werden behandelt:

  • Mit dem Kubeflow Pipelines SDK skalierbare ML-Pipelines erstellen
  • Einführungspipeline in drei Schritten mit Texteingabe erstellen und ausführen
  • Pipeline zum Trainieren, Auswerten und Bereitstellen eines AutoML-Klassifizierungsmodells erstellen und ausführen
  • Vordefinierte Komponenten für die Interaktion mit Vertex AI-Diensten verwenden, die über die google_cloud_pipeline_components-Bibliothek bereitgestellt werden
  • Pipelinejob mit Cloud Scheduler planen

Die Gesamtkosten für das Lab in Google Cloud betragen etwa 25$.

2. Einführung in Vertex AI

In diesem Lab wird das neueste KI-Produktangebot von Google Cloud verwendet. Vertex AI bindet die ML-Angebote in Google Cloud in eine nahtlose Entwicklungsumgebung ein. Zuvor waren mit AutoML trainierte und benutzerdefinierte Modelle über separate Dienste zugänglich. Das neue Angebot vereint beides in einer einzigen API sowie weitere neue Produkte. Sie können auch vorhandene Projekte zu Vertex AI migrieren.

Neben Modelltrainings- und Bereitstellungsdiensten umfasst Vertex AI auch eine Vielzahl von MLOps-Produkten, darunter Vertex Pipelines (Schwerpunkt dieses Labs), Modellmonitoring, Feature Store und mehr. Im Diagramm unten sehen Sie alle Vertex AI-Produktangebote.

Vertex-Produktübersicht

Wenn Sie uns Feedback geben möchten, besuchen Sie die Supportseite.

Warum sind ML-Pipelines nützlich?

Bevor wir loslegen, müssen wir verstehen, warum Sie eine Pipeline verwenden sollten. Stellen Sie sich vor, Sie erstellen einen ML-Workflow, der die Verarbeitung von Daten, das Trainieren eines Modells, die Hyperparameter-Abstimmung, die Bewertung und die Modellbereitstellung umfasst. Jeder dieser Schritte kann unterschiedliche Abhängigkeiten haben, die unübersichtlich werden können, wenn Sie den gesamten Workflow als monolithische Anwendung behandeln. Wenn Sie mit der Skalierung Ihres ML-Prozesses beginnen, möchten Sie möglicherweise Ihren ML-Workflow mit anderen in Ihrem Team teilen, damit diese ihn ausführen und Code beitragen können. Ohne einen zuverlässigen, reproduzierbaren Prozess kann dies schwierig werden. Bei Pipelines ist jeder Schritt in Ihrem ML-Prozess ein eigener Container. Auf diese Weise können Sie Schritte unabhängig entwickeln und die Ein- und Ausgabe jedes Schritts auf reproduzierbare Weise verfolgen. Sie können Ausführungen Ihrer Pipeline auch basierend auf anderen Ereignissen in Ihrer Cloud-Umgebung planen oder auslösen, z. B. eine Pipelineausführung starten, wenn neue Trainingsdaten verfügbar sind.

Kurzfassung: Mit Pipelines können Sie Ihren ML-Workflow automatisieren und reproduzieren.

3. Cloud-Umgebung einrichten

Sie benötigen ein Google Cloud Platform-Projekt mit aktivierter Abrechnung, um dieses Codelab auszuführen. Folgen Sie dieser Anleitung, um ein Projekt zu erstellen.

Schritt 1: Cloud Shell starten

In diesem Lab arbeiten Sie in einer Cloud Shell-Sitzung, einem Befehlsinterpreter, der von einer virtuellen Maschine in der Cloud von Google gehostet wird. Sie können diesen Abschnitt genauso einfach lokal auf Ihrem Computer ausführen, aber mit Cloud Shell erhalten alle Zugriff auf eine reproduzierbare Version in einer einheitlichen Umgebung. Sie können diesen Abschnitt nach dem Lab gerne auf Ihrem eigenen Computer wiederholen.

Cloud Shell autorisieren

Cloud Shell aktivieren

Klicken Sie rechts oben in der Cloud Console auf die Schaltfläche unten, um Cloud Shell zu aktivieren:

Cloud Shell aktivieren

Wenn Sie Cloud Shell noch nie gestartet haben, wird ein Zwischenbildschirm (below the fold) angezeigt, in dem beschrieben wird, worum es sich dabei handelt. Klicken Sie in diesem Fall auf Weiter. Der Chat wird nie wieder angezeigt. So sieht dieser einmalige Bildschirm aus:

Cloud Shell-Einrichtung

Die Bereitstellung und Verbindung mit Cloud Shell dauert nur einen Moment.

Cloud Shell-Init

Diese virtuelle Maschine verfügt über alle Entwicklungstools, die Sie benötigen. Es bietet ein Basisverzeichnis mit 5 GB nichtflüchtigem Speicher und wird in Google Cloud ausgeführt. Dadurch werden die Netzwerkleistung und die Authentifizierung erheblich verbessert. Viele, wenn nicht sogar alle Arbeiten in diesem Codelab können Sie ganz einfach mit einem Browser oder Ihrem Chromebook erledigen.

Sobald Sie mit Cloud Shell verbunden sind, sollten Sie sehen, dass Sie bereits authentifiziert sind und dass das Projekt bereits auf Ihre Projekt-ID eingestellt ist.

Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob Sie authentifiziert sind:

gcloud auth list

In der Befehlsausgabe sollte in etwa Folgendes angezeigt werden:

Cloud Shell-Ausgabe

Führen Sie in Cloud Shell den folgenden Befehl aus, um zu prüfen, ob der gcloud-Befehl Ihr Projekt kennt:

gcloud config list project

Befehlsausgabe

[core]
project = <PROJECT_ID>

Ist dies nicht der Fall, können Sie die Einstellung mit diesem Befehl vornehmen:

gcloud config set project <PROJECT_ID>

Befehlsausgabe

Updated property [core/project].

Cloud Shell hat einige Umgebungsvariablen, darunter GOOGLE_CLOUD_PROJECT, das den Namen des aktuellen Cloud-Projekts enthält. Wir werden sie in diesem Lab an verschiedenen Stellen verwenden. Sie können es sich ansehen, indem Sie folgenden Befehl ausführen:

echo $GOOGLE_CLOUD_PROJECT

Schritt 2: APIs aktivieren

In späteren Schritten werden Sie sehen, wo diese Dienste benötigt werden (und warum). Führen Sie vorerst diesen Befehl aus, um Ihrem Projekt Zugriff auf die Compute Engine-, Container Registry- und Vertex AI-Dienste zu gewähren:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Wenn die Aktivierung erfolgreich war, erhalten Sie eine Meldung, die ungefähr so aussieht:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Schritt 3: Cloud Storage-Bucket erstellen

Zum Ausführen eines Trainingsjobs in Vertex AI benötigen wir einen Storage-Bucket zum Speichern der gespeicherten Modell-Assets. Der Bucket muss regional sein. Wir verwenden hier us-central. Sie können aber auch eine andere Region verwenden. Ersetzen Sie sie einfach in diesem Lab. Wenn Sie bereits einen Bucket haben, können Sie diesen Schritt überspringen.

Führen Sie die folgenden Befehle in Ihrem Cloud Shell-Terminal aus, um einen Bucket zu erstellen:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Als Nächstes gewähren wir dem Compute-Dienstkonto Zugriff auf diesen Bucket. Dadurch wird sichergestellt, dass Vertex Pipelines die erforderlichen Berechtigungen zum Schreiben von Dateien in diesen Bucket hat. Führen Sie den folgenden Befehl aus, um diese Berechtigung hinzuzufügen:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Schritt 4: Vertex AI Workbench-Instanz erstellen

Klicken Sie im Bereich Vertex AI der Cloud Console auf Workbench:

Vertex AI-Menü

Klicken Sie dort unter Nutzerverwaltete Notebooks auf Neues Notebook:

Neues Notebook erstellen

Wählen Sie dann den Instanztyp TensorFlow Enterprise 2.3 (mit LTS) ohne GPUs aus:

TFE-Instanz

Verwenden Sie die Standardoptionen und klicken Sie dann auf Erstellen.

Schritt 5: Notebook öffnen

Nachdem die Instanz erstellt wurde, wählen Sie JupyterLab öffnen aus:

Notebook öffnen

4. Vertex Pipelines einrichten

Zur Verwendung von Vertex Pipelines müssen einige zusätzliche Bibliotheken installiert werden:

  • Kubeflow Pipelines: Dies ist das SDK, das wir zum Erstellen unserer Pipeline verwenden. Vertex Pipelines unterstützt die Ausführung von Pipelines, die mit Kubeflow Pipelines oder TFX erstellt wurden.
  • Google Cloud-Pipelinekomponenten: Diese Bibliothek enthält vordefinierte Komponenten, die die Interaktion mit Vertex AI-Diensten über Ihre Pipelineschritte erleichtern.

Schritt 1: Python-Notebook erstellen und Bibliotheken installieren

Erstellen Sie zuerst im Launcher-Menü in Ihrer Notebook-Instanz ein Notebook, indem Sie Python 3 auswählen:

Python3-Notebook erstellen

Sie können auf das Launcher-Menü zugreifen, indem Sie links oben in Ihrer Notebookinstanz auf das Pluszeichen + klicken.

Um beide Dienste zu installieren, die wir in diesem Lab verwenden werden, legen Sie zuerst das Nutzer-Flag in einer Notebook-Zelle fest:

USER_FLAG = "--user"

Führen Sie dann in Ihrem Notebook folgenden Befehl aus:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Nach der Installation dieser Pakete müssen Sie den Kernel neu starten:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Prüfen Sie abschließend, ob Sie die Pakete richtig installiert haben. Die KFP SDK-Version muss größer als 1.8 sein:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Schritt 2: Projekt-ID und Bucket festlegen

In diesem Lab verweisen Sie auf Ihre Cloud-Projekt-ID und den Bucket, den Sie zuvor erstellt haben. Als Nächstes erstellen wir Variablen für jede dieser Elemente.

Wenn Sie Ihre Projekt-ID nicht kennen, können Sie sie möglicherweise abrufen, indem Sie den folgenden Befehl ausführen:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Andernfalls legen Sie sie hier fest:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Erstellen Sie dann eine Variable zum Speichern des Bucket-Namens. Wenn Sie sie in diesem Lab erstellt haben, funktioniert Folgendes: Andernfalls müssen Sie die Einstellungen manuell vornehmen:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Schritt 3: Bibliotheken importieren

Fügen Sie Folgendes hinzu, um die Bibliotheken zu importieren, die wir in diesem Codelab verwenden:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Schritt 4: Konstanten definieren

Als Letztes müssen wir vor dem Erstellen der Pipeline noch einige konstante Variablen definieren. PIPELINE_ROOT ist der Cloud Storage-Pfad, in den die von unserer Pipeline erstellten Artefakte geschrieben werden. Wir verwenden hier us-central1 als Region. Wenn Sie beim Erstellen des Buckets jedoch eine andere Region verwendet haben, aktualisieren Sie die Variable REGION im Code unten:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Nachdem Sie den obigen Code ausgeführt haben, sollte das Stammverzeichnis Ihrer Pipeline ausgegeben werden. Dies ist der Cloud Storage-Speicherort, an den die Artefakte aus Ihrer Pipeline geschrieben werden. Sie hat das Format gs://YOUR-BUCKET-NAME/pipeline_root/.

5. Erste Pipeline erstellen

Damit Sie sich mit der Funktionsweise von Vertex Pipelines vertraut machen können, erstellen wir zuerst eine kurze Pipeline mit dem KFP SDK. Diese Pipeline hat keine ML-bezogenen Funktionen (keine Sorge, wir schaffen das!). Wir verwenden sie, um Ihnen Folgendes zu vermitteln:

  • Benutzerdefinierte Komponenten im KFP SDK erstellen
  • Pipeline in Vertex Pipelines ausführen und überwachen

Wir erstellen eine Pipeline, die einen Satz aus zwei Ausgaben ausgibt: einem Produktnamen und einer Emoji-Beschreibung. Diese Pipeline besteht aus drei Komponenten:

  • product_name: Diese Komponente nimmt einen Produktnamen (oder ein beliebiges Substantiv) als Eingabe und gibt diesen String als Ausgabe zurück
  • emoji: Diese Komponente wandelt die Textbeschreibung eines Emojis in ein Emoji um. Der Textcode für ✨ ist beispielsweise „Glitzer“. Diese Komponente verwendet eine Emoji-Bibliothek, um Ihnen zu zeigen, wie Sie externe Abhängigkeiten in Ihrer Pipeline verwalten
  • build_sentence: Diese letzte Komponente verwendet die Ausgabe der beiden vorherigen Elemente, um einen Satz zu erstellen, in dem das Emoji verwendet wird. Die resultierende Ausgabe könnte beispielsweise „Vertex Pipelines is ✨“ lauten.

Beginnen wir mit dem Programmieren!

Schritt 1: Funktionsbasierte Python-Komponente erstellen

Mit dem KFP SDK können wir Komponenten basierend auf Python-Funktionen erstellen. Wir verwenden das für die drei Komponenten in unserer ersten Pipeline. Zuerst erstellen wir die Komponente product_name. Sie nimmt einfach einen String als Eingabe an und gibt diesen String zurück. Fügen Sie Ihrem Notebook Folgendes hinzu:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Sehen wir uns die Syntax einmal genauer an:

  • Der @component-Decorator kompiliert diese Funktion bei Ausführung der Pipeline in eine Komponente. Sie benötigen sie immer dann, wenn Sie eine benutzerdefinierte Komponente schreiben.
  • Der Parameter base_image gibt das Container-Image an, das von dieser Komponente verwendet wird.
  • Der Parameter output_component_file ist optional und gibt die YAML-Datei an, in die die kompilierte Komponente geschrieben werden soll. Nachdem Sie die Zelle ausgeführt haben, sollte die Datei in Ihre Notebookinstanz geschrieben werden. Wenn Sie diese Komponente für einen Nutzer freigeben möchten, können Sie ihm die generierte YAML-Datei senden, die dann so geladen wird:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Die -> str nach der Funktionsdefinition gibt den Ausgabetyp für diese Komponente an.

Schritt 2: Zwei zusätzliche Komponenten erstellen

Zur Fertigstellung unserer Pipeline erstellen wir zwei weitere Komponenten. Das erste, das wir definieren, verwendet einen String als Eingabe und wandelt diesen String in das entsprechende Emoji um, sofern vorhanden. Sie gibt ein Tupel mit übergebenem Eingabetext und dem resultierenden Emoji zurück:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Diese Komponente ist etwas komplexer als die vorherige. Die Neuerungen im Überblick:

  • Der Parameter packages_to_install teilt der Komponente alle externen Bibliotheksabhängigkeiten für diesen Container mit. In diesem Fall verwenden wir eine Bibliothek namens Emojis.
  • Diese Komponente gibt ein NamedTuple namens Outputs zurück. Beachten Sie, dass jeder der Strings in diesem Tupel Schlüssel hat: emoji_text und emoji. Wir verwenden diese in der nächsten Komponente, um auf die Ausgabe zuzugreifen.

Die letzte Komponente in dieser Pipeline verarbeitet die Ausgabe der ersten beiden und kombiniert sie, um einen String zurückzugeben:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Sie fragen sich vielleicht: Woher weiß diese Komponente, dass sie die Ausgabe der zuvor definierten Schritte verwendet? Im nächsten Schritt führen wir alles zusammen.

Schritt 3: Komponenten in einer Pipeline kombinieren

Mit den oben definierten Komponentendefinitionen wurden Factory-Funktionen erstellt, die in einer Pipelinedefinition zur Erstellung von Schritten verwendet werden können. Verwenden Sie zum Einrichten einer Pipeline den @pipeline-Decorator, geben Sie der Pipeline einen Namen und eine Beschreibung sowie den Stammpfad, in den die Artefakte Ihrer Pipeline geschrieben werden sollen. Mit Artefakten meinen wir alle Ausgabedateien, die von Ihrer Pipeline generiert werden. In der Einführungspipeline werden keine generiert, in der nächsten Pipeline jedoch schon.

Im nächsten Codeblock definieren wir eine intro_pipeline-Funktion. Hier geben wir die Eingaben für unsere ersten Pipelineschritte an und wie die Schritte miteinander verbunden sind:

  • Für product_task wird ein Produktname als Eingabe verwendet. Hier übergeben wir „Vertex Pipelines“ aber Sie können ihn beliebig ändern.
  • emoji_task verwendet den Textcode für ein Emoji zur Eingabe. Sie können diesen Namen auch beliebig ändern. Beispiel: „party_face“ bezieht sich auf das 🥳 Emoji. Da sowohl diese als auch die product_task-Komponente keine Schritte haben, die Eingaben in diese übergeben, geben wir die Eingabe für diese Komponenten manuell an, wenn wir unsere Pipeline definieren.
  • Der letzte Schritt in unserer Pipeline – consumer_task hat drei Eingabeparameter:
    • Die Ausgabe von product_task. Da dieser Schritt nur eine Ausgabe erzeugt, können wir über product_task.output darauf verweisen.
    • Die emoji-Ausgabe unseres emoji_task-Schritts. Sehen Sie sich die oben definierte Komponente emoji an, in der die Ausgabeparameter benannt wurden.
    • Ähnlich die benannte Ausgabe von emoji_text der Komponente emoji. Falls unserer Pipeline Text übergeben wird, der keinem Emoji entspricht, wird dieser Text verwendet, um einen Satz zu erstellen.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Schritt 4: Pipeline kompilieren und ausführen

Nachdem Sie Ihre Pipeline definiert haben, können Sie sie kompilieren. Mit dem folgenden Befehl wird eine JSON-Datei generiert, mit der Sie die Pipeline ausführen:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Erstellen Sie als Nächstes eine TIMESTAMP-Variable. Wir verwenden dies in unserer Job-ID:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Definieren Sie dann Ihren Pipelinejob:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Führen Sie schließlich den Job aus, um eine neue Pipelineausführung zu erstellen:

job.submit()

Nachdem Sie diese Zelle ausgeführt haben, sollten Sie Logs mit einem Link zum Ansehen der Pipelineausführung in Ihrer Konsole sehen:

Pipeline job logs

Rufen Sie diesen Link auf. Ihre Pipeline sollte nach Abschluss wie folgt aussehen:

Einführungspipeline abgeschlossen

Die Ausführung dieser Pipeline dauert 5 bis 6 Minuten. Wenn Sie fertig sind, können Sie auf die Komponente build-sentence klicken, um die endgültige Ausgabe zu sehen:

Ausgabe der einführenden Pipeline

Sie sind nun mit der Funktionsweise des KFP SDK und Vertex Pipelines vertraut und können eine Pipeline erstellen, die ein ML-Modell mithilfe anderer Vertex AI-Dienste erstellt und bereitstellt. Legen wir los!

6. End-to-End-ML-Pipeline erstellen

Jetzt können Sie Ihre erste ML-Pipeline erstellen. In dieser Pipeline verwenden wir das UCI Machine Learning Dry Beans Dataset von: KOKLU, M. und OZKAN, I.A., (2020), „Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques“, „In Computers and Electronics in Agriculture“, 174, 105507. DOI:

Dies ist ein tabellarisches Dataset. In unserer Pipeline verwenden wir das Dataset, um ein AutoML-Modell zu trainieren, zu bewerten und bereitzustellen, das Bohnen anhand ihrer Eigenschaften in einen von sieben Typen einteilt.

Diese Pipeline wird:

  • Erstellen Sie ein Dataset in .
  • Tabellarisches Klassifizierungsmodell mit AutoML trainieren
  • Bewertungsmesswerte für dieses Modell abrufen
  • Entscheiden Sie anhand der Bewertungsmesswerte, ob das Modell mit bedingter Logik in Vertex Pipelines bereitgestellt werden soll
  • Modell mit Vertex Prediction auf einem Endpunkt bereitstellen

Jeder der beschriebenen Schritte stellt eine Komponente dar. In den meisten Pipelineschritten werden vordefinierte Komponenten für Vertex AI-Dienste über die google_cloud_pipeline_components-Bibliothek verwendet, die Sie zuvor in dieses Codelab importiert haben. In diesem Abschnitt definieren wir zuerst eine benutzerdefinierte Komponente. Anschließend definieren wir die restlichen Pipelineschritte mithilfe von vordefinierten Komponenten. Vordefinierte Komponenten erleichtern den Zugriff auf Vertex AI-Dienste wie Modelltraining und -bereitstellung.

Schritt 1: Benutzerdefinierte Komponente für die Modellbewertung

Die benutzerdefinierte Komponente, die wir definieren, wird gegen Ende unserer Pipeline verwendet, sobald das Modelltraining abgeschlossen ist. Diese Komponente führt Folgendes aus:

  • Bewertungsmesswerte aus dem trainierten AutoML-Klassifizierungsmodell abrufen
  • Messwerte parsen und in der Vertex Pipelines-UI rendern
  • Vergleichen Sie die Messwerte mit einem Schwellenwert, um zu bestimmen, ob das Modell bereitgestellt werden sollte

Bevor wir die Komponente definieren, betrachten wir die Eingabe- und Ausgabeparameter. Als Eingabe verwendet diese Pipeline einige Metadaten für unser Cloud-Projekt, das resultierende trainierte Modell (diese Komponente wird später definiert), die Bewertungsmesswerte des Modells und ein thresholds_dict_str. thresholds_dict_str wird definiert, wenn wir unsere Pipeline ausführen. Im Fall dieses Klassifizierungsmodells ist dies der Bereich unter dem Wert der ROC-Kurve, für den das Modell bereitgestellt werden sollte. Wenn wir beispielsweise 0,95 übergeben, soll die Pipeline das Modell nur dann bereitstellen, wenn dieser Messwert über 95 % liegt.

Unsere Bewertungskomponente gibt einen String zurück, der angibt, ob das Modell bereitgestellt werden soll. Fügen Sie einer Notebook-Zelle Folgendes hinzu, um diese benutzerdefinierte Komponente zu erstellen:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Schritt 2: Vordefinierte Google Cloud-Komponenten hinzufügen

In diesem Schritt definieren wir die restlichen Pipelinekomponenten und sehen uns an, wie sie zusammenpassen. Definieren Sie zuerst den Anzeigenamen für Ihre Pipelineausführung mit einem Zeitstempel:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Kopieren Sie dann Folgendes in eine neue Notebook-Zelle:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Sehen wir uns an, was in diesem Code passiert:

  • Zunächst definieren wir wie in unserer vorherigen Pipeline die Eingabeparameter, die diese Pipeline benötigt. Diese müssen manuell festgelegt werden, da sie nicht von der Ausgabe anderer Schritte in der Pipeline abhängen.
  • Der Rest der Pipeline verwendet einige vordefinierte Komponenten für die Interaktion mit Vertex AI-Diensten:
    • TabularDatasetCreateOp erstellt anhand einer Dataset-Quelle entweder in Cloud Storage oder BigQuery ein tabellarisches Dataset in Vertex AI. In dieser Pipeline werden die Daten über eine BigQuery-Tabellen-URL
    • AutoMLTabularTrainingJobRunOp startet einen AutoML-Trainingsjob für ein tabellarisches Dataset. Wir übergeben dieser Komponente einige Konfigurationsparameter, darunter den Modelltyp (in diesem Fall die Klassifizierung), einige Daten in den Spalten, die Dauer des Trainings und ein Verweis auf das Dataset. Zur Übergabe des Datasets an diese Komponente stellen wir die Ausgabe der vorherigen Komponente über dataset_create_op.outputs["dataset"] bereit.
    • EndpointCreateOp erstellt einen Endpunkt in Vertex AI. Der in diesem Schritt erstellte Endpunkt wird als Eingabe an die nächste Komponente übergeben
    • ModelDeployOp stellt ein bestimmtes Modell an einem Endpunkt in Vertex AI bereit. In diesem Fall verwenden wir den Endpunkt, der im vorherigen Schritt erstellt wurde. Es gibt zusätzliche Konfigurationsoptionen, aber hier stellen wir den Maschinentyp und das Modell für den Endpunkt bereit, die bereitgestellt werden sollen. Wir übergeben das Modell, indem wir auf die Ausgaben des Trainingsschritts in unserer Pipeline zugreifen.
  • Diese Pipeline nutzt auch bedingte Logik, eine Funktion von Vertex-Pipelines, mit der Sie eine Bedingung zusammen mit verschiedenen Zweigen basierend auf dem Ergebnis dieser Bedingung definieren können. Denken Sie daran, dass beim Definieren der Pipeline ein thresholds_dict_str-Parameter übergeben wurde. Dies ist der Genauigkeitsschwellenwert, den wir verwenden, um zu bestimmen, ob unser Modell auf einem Endpunkt bereitgestellt werden soll. Zur Implementierung verwenden wir die Klasse Condition aus dem KFP SDK. Die übergebene Bedingung ist die Ausgabe der benutzerdefinierten Auswertungskomponente, die wir zuvor in diesem Codelab definiert haben. Wenn diese Bedingung erfüllt ist, führt die Pipeline weiterhin die Komponente deploy_op aus. Wenn die Genauigkeit unseren vordefinierten Grenzwert nicht erreicht, stoppt die Pipeline hier und stellt kein Modell bereit.

Schritt 3: End-to-End-ML-Pipeline kompilieren und ausführen

Nachdem unsere vollständige Pipeline definiert ist, ist es an der Zeit, sie zu kompilieren:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Definieren Sie als Nächstes den Job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Und schließlich führen Sie den Job aus:

ml_pipeline_job.submit()

Gehen Sie nach dem Ausführen der Zelle oben zu dem Link, der in den Logs angezeigt wird, um Ihre Pipeline in der Konsole anzuzeigen. Die Ausführung dieser Pipeline dauert etwas über eine Stunde. Die meiste Zeit wird für den AutoML-Trainingsschritt aufgewendet. Die fertige Pipeline sieht in etwa so aus:

Abgeschlossene AutoML-Pipeline

Wenn Sie die Option „Artefakte maximieren“ werden Details zu den verschiedenen Artefakten angezeigt, die aus Ihrer Pipeline erstellt wurden. Wenn Sie beispielsweise auf das Artefakt dataset klicken, werden Details zum erstellten Vertex AI-Dataset angezeigt. Sie können auf diesen Link klicken, um zur Seite für dieses Dataset zu gelangen:

Pipeline-Dataset

Um sich die resultierenden Messwertvisualisierungen aus unserer benutzerdefinierten Bewertungskomponente anzusehen, klicken Sie auf das Artefakt metricsc. Auf der rechten Seite Ihres Dashboards sehen Sie die Wahrheitsmatrix für dieses Modell:

Messwertvisualisierung

Wenn Sie das Modell und den Endpunkt sehen möchten, die aus dieser Pipelineausführung erstellt wurden, klicken Sie im Abschnitt „Modelle“ auf das Modell automl-beans. Hier sollten Sie dieses Modell auf einem Endpunkt sehen:

Modellendpunkt

Sie können diese Seite auch aufrufen, indem Sie in der Pipelinegrafik auf das Artefakt endpoint klicken.

Sie können sich nicht nur das Pipelinediagramm in der Console ansehen, sondern auch Vertex Pipelines für das Lineage-Tracking verwenden. Mit Lineage-Tracking meinen wir das Tracking von Artefakten, die in Ihrer Pipeline erstellt werden. So können Sie besser verstehen, wo Artefakte erstellt wurden und wie sie in einem ML-Workflow verwendet werden. Wenn Sie beispielsweise das Lineage-Tracking für das in dieser Pipeline erstellte Dataset sehen möchten, klicken Sie auf das Dataset-Artefakt und dann auf Lineage ansehen:

Herkunft ansehen

Hier sehen Sie, wo dieses Artefakt verwendet wird:

Herkunftsdetails

Schritt 4: Messwerte aus Pipelineausführungen vergleichen

Wenn Sie diese Pipeline mehrmals ausführen, sollten Sie die Messwerte verschiedener Ausführungen vergleichen. Mit der Methode aiplatform.get_pipeline_df() können Sie auf Ausführungsmetadaten zugreifen. Hier erhalten wir Metadaten für alle Ausführungen dieser Pipeline und laden sie in einen Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Damit haben Sie das Lab abgeschlossen!

🎉 Glückwunsch! 🎉

Sie haben gelernt, wie Sie mit Vertex AI Folgendes tun können:

  • Mit dem Kubeflow Pipelines SDK End-to-End-Pipelines mit benutzerdefinierten Komponenten erstellen
  • Pipelines in Vertex Pipelines ausführen und Pipelineausführungen mit dem SDK starten
  • Vertex Pipelines-Diagramm in der Console ansehen und analysieren
  • Vordefinierte Pipelinekomponenten verwenden, um Ihrer Pipeline Vertex AI-Dienste hinzuzufügen
  • Wiederkehrende Pipelinejobs planen

Weitere Informationen zu den verschiedenen Teilen von Vertex finden Sie in der Dokumentation.

7. Bereinigen

Damit Ihnen keine Kosten entstehen, sollten Sie die in diesem Lab erstellten Ressourcen löschen.

Schritt 1: Notebooks-Instanz beenden oder löschen

Wenn Sie das Notebook, das Sie in diesem Lab erstellt haben, weiter verwenden möchten, sollten Sie es deaktivieren, wenn Sie es nicht verwenden. Wählen Sie in der Notebooks-UI in der Cloud Console das Notebook und dann Beenden aus. Wenn Sie die Instanz vollständig löschen möchten, wählen Sie Löschen aus:

Instanz beenden

Schritt 2: Endpunkt löschen

Wenn Sie den von Ihnen bereitgestellten Endpunkt löschen möchten, gehen Sie in der Vertex AI-Konsole zum Abschnitt Endpunkte und klicken Sie auf das Symbol zum Löschen:

Endpunkt löschen

Klicken Sie dann in der folgenden Eingabeaufforderung auf Bereitstellung aufheben:

Bereitstellung des Modells aufheben

Gehen Sie schließlich zum Abschnitt Models (Modelle) der Konsole, suchen Sie das entsprechende Modell und klicken Sie rechts über das Dreipunkt-Menü auf Delete model (Modell löschen):

Modell löschen

Schritt 3: Cloud Storage-Bucket löschen

Wenn Sie den Storage-Bucket löschen möchten, gehen Sie im Navigationsmenü der Cloud Console zu „Storage“, wählen Sie den Bucket aus und klicken Sie auf „Löschen“:

Speicher löschen