Introduzione a Vertex Pipelines

1. Panoramica

In questo lab imparerai a creare ed eseguire pipeline ML con Vertex Pipelines.

Cosa imparerai

Al termine del corso sarai in grado di:

  • Utilizzare l'SDK Kubeflow Pipelines per creare pipeline ML scalabili
  • Creare ed eseguire una pipeline introduttiva composta da 3 passaggi che accetta input di testo
  • Creare ed eseguire una pipeline che addestra, valuta ed esegue il deployment di un modello di classificazione AutoML
  • Utilizza componenti predefiniti per interagire con i servizi Vertex AI, forniti tramite la libreria google_cloud_pipeline_components
  • Pianificare un job di pipeline con Cloud Scheduler

Il costo totale per eseguire questo lab su Google Cloud è di circa 25$.

2. Introduzione a Vertex AI

Questo lab utilizza la più recente offerta di prodotti AI disponibile su Google Cloud. Vertex AI integra le offerte ML di Google Cloud in un'esperienza di sviluppo fluida. In precedenza, i modelli addestrati con AutoML e i modelli personalizzati erano accessibili tramite servizi separati. La nuova offerta combina entrambi in un'unica API, insieme ad altri nuovi prodotti. Puoi anche migrare progetti esistenti a Vertex AI.

Oltre ai servizi di addestramento e deployment dei modelli, Vertex AI include anche una varietà di prodotti MLOps, tra cui Vertex Pipelines (l'aspetto su cui si concentra questo lab), Model Monitoring, Feature Store e altri. Puoi vedere tutte le offerte di prodotti Vertex AI nel diagramma seguente.

Panoramica dei prodotti Vertex

In caso di feedback, consulta la pagina di assistenza.

Perché le pipeline ML sono utili?

Prima di entrare nel dettaglio dell'argomento, capiamo innanzitutto perché potresti voler utilizzare una pipeline. Immagina di creare un flusso di lavoro ML che includa l'elaborazione dei dati, l'addestramento di un modello, l'ottimizzazione degli iperparametri, la valutazione e il deployment del modello. Ciascuno di questi passaggi può avere dipendenze diverse, che potrebbero diventare difficili da gestire se consideri l'intero flusso di lavoro come un monolite. Quando inizi a scalare il tuo processo ML, potresti voler condividere il tuo flusso di lavoro ML con altri membri del tuo team in modo che possano eseguirlo e contribuire al codice. Senza un processo affidabile e riproducibile, questo può diventare difficile. Con le pipeline, ogni passaggio del processo ML è un container a sé stante. Ciò consente di sviluppare passaggi in modo indipendente e tenere traccia dell'input e output di ciascun passaggio in modo riproducibile. Puoi anche pianificare o attivare esecuzioni della pipeline in base ad altri eventi nel tuo ambiente Cloud, ad esempio l'avvio di un'esecuzione della pipeline quando sono disponibili nuovi dati di addestramento.

Il tl;dr: le pipeline ti aiutano ad automatizzare e riprodurre il tuo flusso di lavoro ML.

3. Configurazione dell'ambiente cloud

Per eseguire questo codelab, è necessario un progetto Google Cloud con fatturazione abilitata. Per creare un progetto, segui le istruzioni riportate qui.

Passaggio 1: avvia Cloud Shell

In questo lab lavorerai in una sessione di Cloud Shell, un interprete di comandi ospitato da una macchina virtuale in esecuzione nel cloud di Google. Potresti eseguire facilmente questa sezione in locale sul tuo computer, ma l'utilizzo di Cloud Shell offre a chiunque l'accesso a un'esperienza riproducibile in un ambiente coerente. Dopo il lab, puoi riprovare questa sezione sul tuo computer.

Autorizza Cloud Shell

Attiva Cloud Shell

In alto a destra nella console Cloud, fai clic sul pulsante di seguito per attivare Cloud Shell:

Attiva Cloud Shell

Se non hai mai avviato Cloud Shell, viene visualizzata una schermata intermedia (sotto la piega) che descrive di cosa si tratta. In questo caso, fai clic su Continua (e non la vedrai mai più). Ecco come appare la schermata una tantum:

Configurazione di Cloud Shell

Il provisioning e la connessione a Cloud Shell dovrebbero richiedere solo qualche istante.

Init Cloud Shell

Questa macchina virtuale contiene tutti gli strumenti di sviluppo di cui hai bisogno. Offre una home directory permanente da 5 GB e viene eseguita in Google Cloud, migliorando notevolmente le prestazioni e l'autenticazione della rete. Gran parte, se non tutto, del lavoro in questo codelab può essere svolto semplicemente con un browser o con il tuo Chromebook.

Una volta eseguita la connessione a Cloud Shell, dovresti vedere che il tuo account è già autenticato e il progetto è già impostato sul tuo ID progetto.

Esegui questo comando in Cloud Shell per verificare che l'account sia autenticato:

gcloud auth list

L'output comando dovrebbe essere simile al seguente:

Output di Cloud Shell

Esegui questo comando in Cloud Shell per confermare che il comando gcloud è a conoscenza del tuo progetto:

gcloud config list project

Output comando

[core]
project = <PROJECT_ID>

In caso contrario, puoi impostarlo con questo comando:

gcloud config set project <PROJECT_ID>

Output comando

Updated property [core/project].

Cloud Shell include alcune variabili di ambiente, tra cui GOOGLE_CLOUD_PROJECT che contiene il nome del nostro progetto Cloud corrente. La utilizzeremo in vari punti di questo lab. Puoi visualizzarla eseguendo questo comando:

echo $GOOGLE_CLOUD_PROJECT

Passaggio 2: attivazione delle API

Nei passaggi successivi vedrai dove (e perché) sono necessari questi servizi, ma per il momento esegui questo comando per concedere al tuo progetto l'accesso ai servizi Compute Engine, Container Registry e Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Dovrebbe essere visualizzato un messaggio di operazione riuscita simile a questo:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Passaggio 3: crea un bucket Cloud Storage

Per eseguire un job di addestramento su Vertex AI, abbiamo bisogno di un bucket di archiviazione in cui archiviare gli asset salvati nel modello. Il bucket deve essere regionale. Qui utilizziamo us-central, ma puoi anche usare un'altra regione (basta sostituirla durante questo lab). Se hai già un bucket, puoi saltare questo passaggio.

Esegui questi comandi nel terminale Cloud Shell per creare un bucket:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

A questo punto daremo al nostro account di servizio Compute l'accesso a questo bucket. In questo modo, Vertex Pipelines avrà le autorizzazioni necessarie per scrivere file in questo bucket. Esegui questo comando per aggiungere l'autorizzazione:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Passaggio 4: crea un'istanza di Vertex AI Workbench

Nella sezione Vertex AI della console Cloud, fai clic su Workbench:

Menu Vertex AI

Da qui, all'interno dei Blocchi note gestiti dall'utente, fai clic su Nuovo blocco note:

Crea nuovo notebook

Quindi seleziona il tipo di istanza TensorFlow Enterprise 2.3 (con LTS) senza GPU:

Istanza TFE

Utilizza le opzioni predefinite e poi fai clic su Crea.

Passaggio 5: apri il tuo notebook

Dopo aver creato l'istanza, seleziona Apri JupyterLab:

Apri notebook

4. Configurazione di Vertex Pipelines

Per utilizzare Vertex Pipelines, dobbiamo installare alcune librerie aggiuntive:

  • Kubeflow Pipelines: si tratta dell'SDK che utilizzeremo per creare la pipeline. Vertex Pipelines supporta l'esecuzione di pipeline create sia con Kubeflow Pipelines che con TFX.
  • Componenti della pipeline di Google Cloud: questa libreria fornisce componenti predefiniti che semplificano l'interazione con i servizi Vertex AI dai passaggi della pipeline.

Passaggio 1: crea un notebook Python e installa le librerie

Innanzitutto, dal menu Avvio nell'istanza del notebook, crea un notebook selezionando Python 3:

Crea un notebook Python3

Puoi accedere al menu Avvio app facendo clic sul segno + in alto a sinistra dell'istanza del notebook.

Per installare entrambi i servizi che utilizzeremo in questo lab, imposta prima il flag utente in una cella del notebook:

USER_FLAG = "--user"

Quindi, esegui quanto segue dal tuo notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Dopo aver installato questi pacchetti, dovrai riavviare il kernel:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Infine, verifica di aver installato correttamente i pacchetti. La versione dell'SDK KFP deve essere maggiore o uguale a 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Passaggio 2: imposta l'ID progetto e il bucket

In questo lab farai riferimento all'ID progetto Cloud e al bucket che hai creato in precedenza. Successivamente creeremo variabili per ciascuno di questi.

Se non conosci l'ID progetto, potresti riuscire a recuperarlo eseguendo quanto segue:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

In caso contrario, impostalo qui:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Quindi, crea una variabile per archiviare il nome del bucket. Se lo hai creato in questo lab, la seguente operazione funzionerà. In caso contrario, dovrai impostarlo manualmente:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Passaggio 3: importa le librerie

Aggiungi quanto segue per importare le librerie che utilizzeremo in questo codelab:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Passaggio 4: definisci le costanti

L'ultima cosa da fare prima di creare la pipeline è definire alcune variabili costanti. PIPELINE_ROOT è il percorso di Cloud Storage in cui verranno scritti gli elementi creati dalla nostra pipeline. Qui utilizziamo us-central1 come regione, ma se hai utilizzato una regione diversa quando hai creato il bucket, aggiorna la variabile REGION nel codice seguente:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Dopo aver eseguito il codice riportato sopra, dovresti vedere la directory principale della pipeline stampata. Questa è la posizione di Cloud Storage in cui verranno scritti gli elementi della pipeline. Il formato sarà gs://YOUR-BUCKET-NAME/pipeline_root/

5. Creazione della prima pipeline

Per acquisire familiarità con il funzionamento di Vertex Pipelines, creeremo innanzitutto una breve pipeline utilizzando l'SDK KFP. Questa pipeline non esegue alcuna operazione correlata all'apprendimento automatico (non preoccuparti, ci arriveremo), ma la utilizziamo per insegnarti:

  • Come creare componenti personalizzati nell'SDK KFP
  • Come eseguire e monitorare una pipeline in Vertex Pipelines

Creeremo una pipeline che stampa una frase utilizzando due output: un nome del prodotto e una descrizione emoji. Questa pipeline sarà composta da tre componenti:

  • product_name: questo componente prende in input un nome del prodotto (o qualsiasi altro sostantivo) e restituisce la stringa come output
  • emoji: questo componente prende la descrizione del testo di un'emoji e la converte in un'emoji. Ad esempio, il codice di testo per ✨ è "sparkles". Questo componente utilizza una libreria di emoji per mostrarti come gestire le dipendenze esterne nella tua pipeline
  • build_sentence: questo componente finale utilizzerà l'output dei due precedenti per creare una frase che utilizza l'emoji. Ad esempio, l'output risultante potrebbe essere "Vertex Pipelines è ✨".

Iniziamo a programmare.

Passaggio 1: crea un componente basato su funzioni Python

Con l'SDK KFP, possiamo creare componenti basati sulle funzioni Python. Lo utilizzeremo per i tre componenti della nostra prima pipeline. Per prima cosa creeremo il componente product_name, che prende semplicemente una stringa come input e la restituisce. Aggiungi quanto segue al tuo notebook:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Diamo un'occhiata più da vicino alla sintassi:

  • Il decorator @component compila questa funzione in un componente quando viene eseguita la pipeline. Lo utilizzerai ogni volta che scrivi un componente personalizzato.
  • Il parametro base_image specifica l'immagine del contenitore che verrà utilizzata da questo componente.
  • Il parametro output_component_file è facoltativo e specifica il file yaml in cui scrivere il componente compilato. Dopo aver eseguito la cella, dovresti vedere il file scritto nell'istanza del notebook. Se vuoi condividere questo componente con qualcuno, puoi inviargli il file YAML generato e chiedergli di caricarlo con quanto segue:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Il carattere -> str dopo la definizione della funzione specifica il tipo di output per questo componente.

Passaggio 2: crea altri due componenti

Per completare la pipeline, creeremo altri due componenti. Il primo che definiremo prende una stringa come input e la converte nell'emoji corrispondente, se esistente. Restituisce una tupla con il testo di input passato e l'emoji risultante:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Questo componente è un po' più complesso del precedente. Analizziamo le novità:

  • Il parametro packages_to_install indica al componente le eventuali dipendenze delle librerie esterne per questo contenitore. In questo caso, utilizziamo una libreria chiamata emoji.
  • Questo componente restituisce un NamedTuple denominato Outputs. Nota che ciascuna delle stringhe in questa tupla ha chiavi: emoji_text e emoji. Li utilizzeremo nel prossimo componente per accedere all'output.

Il componente finale di questa pipeline utilizzerà l'output dei primi due e li combinerà per restituire una stringa:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

A questo punto potresti chiederti: come fa questo componente a sapere di utilizzare l'output dei passaggi precedenti che hai definito? Ottima domanda. Unificheremo tutto nel passaggio successivo.

Passaggio 3: assemblare i componenti in una pipeline

Le definizioni dei componenti che abbiamo definito sopra hanno creato funzioni factory che possono essere utilizzate in una definizione della pipeline per creare passaggi. Per configurare una pipeline, utilizza il decorator @pipeline, assegnale un nome e una descrizione e fornisci il percorso principale in cui devono essere scritti gli elementi della pipeline. Per artefatti si intendono tutti i file di output generati dalla pipeline. Questa pipeline introduttiva non ne genera, ma la prossima lo farà.

Nel blocco di codice successivo definiamo una funzione intro_pipeline. Qui specifichiamo gli input per i passaggi iniziali della pipeline e il modo in cui i passaggi sono collegati tra loro:

  • product_task prende come input un nome prodotto. Qui stiamo passando "Vertex Pipelines", ma puoi modificare il nome come preferisci.
  • emoji_task prende come input il codice di testo di un'emoji. Puoi anche modificarlo come preferisci. Ad esempio, "party_face" si riferisce all'emoji 🥳. Tieni presente che, poiché questo componente e il componente product_task non hanno passaggi che forniscono input, specifichiamo manualmente l'input per questi componenti quando definiamo la pipeline.
  • L'ultimo passaggio della nostra pipeline, consumer_task, ha tre parametri di input:
    • L'output di product_task. Poiché questo passaggio produce un solo output, è possibile farvi riferimento tramite product_task.output.
    • L'output emoji del nostro passaggio emoji_task. Consulta il componente emoji definito sopra in cui abbiamo denominato i parametri di output.
    • Analogamente, l'output denominato emoji_text del componente emoji. Se alla nostra pipeline viene passato del testo che non corrisponde a un'emoji, lo utilizzerà per costruire una frase.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Passaggio 4: compila ed esegui la pipeline

Una volta definita la pipeline, puoi compilarla. Di seguito verrà generato un file JSON che utilizzerai per eseguire la pipeline:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Poi, crea una variabile TIMESTAMP. Lo utilizzeremo nel nostro ID job:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Poi definisci il job della pipeline:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Infine, esegui il job per creare una nuova esecuzione della pipeline:

job.submit()

Dopo aver eseguito questa cella, dovresti vedere i log con un link per visualizzare l'esecuzione della pipeline nella console:

Log dei job della pipeline

Vai a quel link. Al termine, la pipeline dovrebbe avere il seguente aspetto:

Pipeline di introduzione completata

L'esecuzione di questa pipeline richiederà 5-6 minuti. Al termine, puoi fare clic sul componente build-sentence per visualizzare l'output finale:

Output della pipeline introduttiva

Ora che hai familiarità con il funzionamento dell'SDK KFP e di Vertex Pipelines, puoi creare una pipeline che crea ed esegue il deployment di un modello ML utilizzando altri servizi Vertex AI. Iniziamo.

6. Creazione di una pipeline ML end-to-end

È il momento di creare la tua prima pipeline ML. In questa pipeline utilizzeremo il set di dati Fagioli secchi del progetto UCI Machine Learning, di KOKLU, M. e OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques", in Computers and Electronics in Agriculture, 174, 105507. DOI.

Si tratta di un set di dati tabulari che nella nostra pipeline utilizzeremo per addestrare, valutare ed eseguire il deployment di un modello AutoML che classifichi i fagioli in uno dei 7 tipi in base alle loro caratteristiche.

Questa pipeline:

  • Crea un set di dati in
  • Addestrare un modello di classificazione tabulare con AutoML
  • Ottieni metriche di valutazione su questo modello
  • In base alle metriche di valutazione, decidi se eseguire il deployment del modello utilizzando la logica condizionale in Vertex Pipelines
  • Esegui il deployment del modello in un endpoint utilizzando Vertex Prediction

Ciascuno dei passaggi descritti sarà un componente. La maggior parte dei passaggi della pipeline utilizzerà componenti predefiniti per i servizi Vertex AI tramite la libreria google_cloud_pipeline_components che abbiamo importato in precedenza in questo codelab. In questa sezione, definiremo innanzitutto un componente personalizzato. Poi, definiremo il resto dei passaggi della pipeline utilizzando componenti predefiniti. I componenti predefiniti semplificano l'accesso ai servizi Vertex AI, come l'addestramento e il deployment dei modelli.

Passaggio 1: un componente personalizzato per la valutazione del modello

Il componente personalizzato che definiamo verrà utilizzato verso la fine della pipeline al termine dell'addestramento del modello. Questo componente consente di:

  • Ottenere le metriche di valutazione dal modello di classificazione AutoML addestrato
  • Analizza le metriche e visualizzale nell'interfaccia utente di Vertex Pipelines
  • Confronta le metriche con una soglia per determinare se è necessario eseguire il deployment del modello

Prima di definire il componente, comprendiamo i suoi parametri di input e output. Questa pipeline prende come input alcuni metadati del nostro progetto Cloud, il modello addestrato risultante (definiremo questo componente in seguito), le metriche di valutazione del modello e un thresholds_dict_str. thresholds_dict_str è un valore che definiremo quando eseguiremo la pipeline. Nel caso di questo modello di classificazione, sarà l'area sotto il valore della curva ROC per cui dobbiamo eseguire il deployment del modello. Ad esempio, se passiamo il valore 0,95, significa che vorremmo che la nostra pipeline esegua il deployment del modello solo se questa metrica è superiore al 95%.

Il nostro componente di valutazione restituisce una stringa che indica se eseguire o meno il deployment del modello. Per creare questo componente personalizzato, aggiungi quanto segue in una cella del blocco note:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Passaggio 2: aggiungi i componenti predefiniti di Google Cloud

In questo passaggio definiremo gli altri componenti della pipeline e vedremo come si integrano fra loro. Per prima cosa, definisci il nome visualizzato per l'esecuzione della pipeline utilizzando un timestamp:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Poi copia il seguente codice in una nuova cella del notebook:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Vediamo cosa succede in questo codice:

  • Innanzitutto, come nella pipeline precedente, definiamo i parametri di input della pipeline. Dobbiamo impostarli manualmente perché non dipendono dall'output di altri passaggi della pipeline.
  • Il resto della pipeline utilizza alcuni componenti predefiniti per interagire con i servizi Vertex AI:
    • TabularDatasetCreateOp crea un set di dati tabulare in Vertex AI la cui origine è in Cloud Storage o BigQuery. In questa pipeline, passiamo i dati tramite l'URL di una tabella BigQuery
    • AutoMLTabularTrainingJobRunOp avvia un job di addestramento AutoML per un set di dati tabulare. A questo componente vengono passati alcuni parametri di configurazione, tra cui il tipo di modello (in questo caso, classificazione), alcuni dati sulle colonne, la durata dell'addestramento e un puntatore al set di dati. Tieni presente che per passare il set di dati a questo componente, forniamo l'output del componente precedente tramite dataset_create_op.outputs["dataset"]
    • EndpointCreateOp crea un endpoint in Vertex AI. L'endpoint creato in questo passaggio verrà passato come input al componente successivo
    • ModelDeployOp esegue il deployment di un dato modello su un endpoint in Vertex AI. In questo caso, utilizziamo l'endpoint creato nel passaggio precedente. Sono disponibili ulteriori opzioni di configurazione, ma qui forniamo il tipo di macchina e il modello dell'endpoint di cui vorremmo eseguire il deployment. Passiamo il modello accedendo agli output della fase di addestramento nella pipeline
  • Questa pipeline utilizza anche la logica condizionale, una funzionalità di Vertex Pipelines che consente di definire una condizione, insieme a diversi rami in base al risultato della condizione. Ricorda che quando abbiamo definito la pipeline abbiamo passato un parametro thresholds_dict_str. Questa è la soglia di accuratezza che utilizziamo per determinare se eseguire il deployment del modello in un endpoint. Per implementare questa funzionalità, utilizziamo la classe Condition dell'SDK KFP. La condizione che passiamo è l'output del componente personalizzato eval che abbiamo definito in precedenza in questo codelab. Se questa condizione è vera, la pipeline continuerà a eseguire il componente deploy_op. Se l'accuratezza non raggiunge la soglia predefinita, la pipeline si interrompe e non viene eseguito il deployment di un modello.

Passaggio 3: compila e esegui la pipeline ML end-to-end

Ora che la pipeline completa è stata definita, è il momento di compilarla:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Poi definisci il job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Infine, esegui il job:

ml_pipeline_job.submit()

Vai al link mostrato nei log dopo aver eseguito la cella sopra per vedere la pipeline nella console. L'esecuzione di questa pipeline richiederà poco più di un'ora. La maggior parte del tempo viene impiegata nel passaggio di addestramento di AutoML. La pipeline completata avrà questo aspetto:

Pipeline AutoML completata

Se attivi/disattivi il pulsante "Espandi elementi" in alto, potrai visualizzare i dettagli dei diversi elementi creati dalla pipeline. Ad esempio, se fai clic sull'elemento dataset, vedrai i dettagli del set di dati Vertex AI creato. Puoi fare clic sul link qui per andare alla pagina del set di dati:

Set di dati della pipeline

Analogamente, per visualizzare le visualizzazioni delle metriche risultanti dal nostro componente di valutazione personalizzato, fai clic sull'elemento chiamato metricsc. Sul lato destro della dashboard, potrai vedere la matrice di confusione per questo modello:

Visualizzazione delle metriche

Per visualizzare il modello e l'endpoint creati da questa esecuzione della pipeline, vai alla sezione Modelli e fai clic sul modello denominato automl-beans. Dovresti vedere questo modello di cui è stato eseguito il deployment in un endpoint:

Model-endpoint

Puoi anche accedere a questa pagina facendo clic sull'artefatto endpoint nel grafico della pipeline.

Oltre a esaminare il grafico della pipeline nella console, puoi utilizzare Vertex Pipelines anche per il monitoraggio della filiera. Per monitoraggio della derivazione, intendiamo il monitoraggio degli artefatti creati in tutta la pipeline. Questo può aiutarci a capire dove sono stati creati gli elementi e come vengono utilizzati in un flusso di lavoro di ML. Ad esempio, per visualizzare il monitoraggio della cronologia per il set di dati creato in questa pipeline, fai clic sull'elemento del set di dati e poi su Visualizza cronologia:

Visualizza derivazione

Mostra tutte le posizioni in cui viene utilizzato questo artefatto:

Dettagli derivazione

Passaggio 4: confronta le metriche tra le esecuzioni della pipeline

Se esegui questa pipeline più volte, ti consigliamo di confrontare le metriche tra le esecuzioni. Puoi utilizzare il metodo aiplatform.get_pipeline_df() per accedere ai metadati dell'esecuzione. Qui otteniamo i metadati per tutte le esecuzioni di questa pipeline e li carichiamo in un DataFrame Pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Hai completato il lab.

🎉 Complimenti! 🎉

Hai imparato come utilizzare Vertex AI per:

  • Utilizzare l'SDK Kubeflow Pipelines per creare pipeline end-to-end con componenti personalizzati
  • Esegui le pipeline su Vertex Pipelines e avvia le esecuzioni della pipeline con l'SDK
  • Visualizzare e analizzare il grafico di Vertex Pipelines nella console
  • Utilizzare componenti della pipeline predefiniti per aggiungere servizi Vertex AI alla pipeline
  • Pianificare job della pipeline ricorrenti

Per saperne di più sulle diverse parti di Vertex, consulta la documentazione.

7. Esegui la pulizia

Per evitare addebiti, ti consigliamo di eliminare le risorse create durante questo lab.

Passaggio 1: interrompi o elimina l'istanza di Notebooks

Se vuoi continuare a utilizzare il blocco note creato in questo lab, ti consigliamo di disattivarlo quando non lo usi. Dall'interfaccia utente di Notebooks nella console Cloud, seleziona il blocco note, quindi seleziona Interrompi. Se vuoi eliminare completamente l'istanza, seleziona Elimina:

Arresta istanza

Passaggio 2: elimina l'endpoint

Per eliminare l'endpoint di cui hai eseguito il deployment, vai alla sezione Endpoint della console Vertex AI e fai clic sull'icona Elimina:

Elimina endpoint

Quindi, fai clic su Annulla deployment dal seguente prompt:

Annulla il deployment del modello

Infine, vai alla sezione Modelli della console, individua il modello e fai clic su Elimina modello nel menu con tre puntini a destra:

Elimina modello

Passaggio 3: elimina il bucket Cloud Storage

Per eliminare il bucket di archiviazione, utilizza il menu di navigazione nella console Cloud, vai a Archiviazione, seleziona il bucket e fai clic su Elimina:

Eliminare lo spazio di archiviazione