Introduzione a Vertex Pipelines

1. Panoramica

In questo lab imparerai a creare ed eseguire pipeline ML con Vertex Pipelines.

Cosa imparerai

Imparerai a:

  • Utilizzare l'SDK Kubeflow Pipelines per creare pipeline ML scalabili
  • Crea ed esegui una pipeline introduttiva composta da 3 passaggi che accetta input di testo
  • Creare ed eseguire una pipeline che addestra, valuta ed esegue il deployment di un modello di classificazione AutoML
  • Usa componenti predefiniti per interagire con i servizi Vertex AI, forniti tramite la libreria google_cloud_pipeline_components
  • Pianifica un job di pipeline con Cloud Scheduler

Il costo totale per l'esecuzione di questo lab su Google Cloud è di circa 25$.

2. Introduzione a Vertex AI

Questo lab utilizza la più recente offerta di prodotti AI disponibile su Google Cloud. Vertex AI integra le offerte ML di Google Cloud in un'esperienza di sviluppo fluida. In precedenza, i modelli addestrati con AutoML e i modelli personalizzati erano accessibili tramite servizi separati. La nuova offerta combina entrambi in un'unica API, insieme ad altri nuovi prodotti. Puoi anche migrare progetti esistenti a Vertex AI.

Oltre ai servizi di addestramento e deployment dei modelli, Vertex AI include anche una varietà di prodotti MLOps, tra cui Vertex Pipelines (l'aspetto su cui si concentra questo lab), Model Monitoring, Feature Store e altri. Puoi vedere tutte le offerte di prodotti Vertex AI nel diagramma seguente.

Panoramica del prodotto Vertex

In caso di feedback, consulta la pagina di assistenza.

Perché le pipeline ML sono utili?

Prima di entrare nel dettaglio dell'argomento, capiamo innanzitutto perché potresti voler utilizzare una pipeline. Immagina di creare un flusso di lavoro ML che includa l'elaborazione dei dati, l'addestramento di un modello, l'ottimizzazione degli iperparametri, la valutazione e il deployment del modello. Ciascuno di questi passaggi può avere dipendenze diverse, che potrebbero diventare difficili da gestire se consideri l'intero flusso di lavoro come un monolite. Quando inizi a scalare il tuo processo ML, potresti voler condividere il tuo flusso di lavoro ML con altri membri del tuo team in modo che possano eseguirlo e contribuire al codice. Senza un processo affidabile e riproducibile, questo può diventare difficile. Con le pipeline, ogni passaggio del processo ML è un container a sé stante. Ciò consente di sviluppare passaggi in modo indipendente e tenere traccia dell'input e output di ciascun passaggio in modo riproducibile. Puoi anche pianificare o attivare esecuzioni della pipeline in base ad altri eventi nel tuo ambiente Cloud, ad esempio l'avvio di un'esecuzione della pipeline quando sono disponibili nuovi dati di addestramento.

Il tl;dr: le pipeline ti aiutano ad automatizzare e riprodurre il tuo flusso di lavoro ML.

3. Configurazione dell'ambiente cloud

Per eseguire questo codelab, è necessario un progetto Google Cloud con fatturazione abilitata. Per creare un progetto, segui le istruzioni riportate qui.

Passaggio 1: avvia Cloud Shell

In questo lab lavorerai in una sessione di Cloud Shell, un interprete di comandi ospitato da una macchina virtuale in esecuzione nel cloud di Google. Potresti eseguire facilmente questa sezione in locale sul tuo computer, ma l'utilizzo di Cloud Shell offre a chiunque l'accesso a un'esperienza riproducibile in un ambiente coerente. Dopo il lab, puoi riprovare questa sezione sul tuo computer.

Autorizza Cloud Shell

Attiva Cloud Shell

In alto a destra nella console Cloud, fai clic sul pulsante in basso per attivare Cloud Shell:

Attiva Cloud Shell

Se non hai mai avviato Cloud Shell, ti viene mostrata una schermata intermedia (below the fold) che descrive di cosa si tratta. In tal caso, fai clic su Continua (e non la vedrai più). Ecco come appare quella singola schermata:

Configurazione di Cloud Shell

Il provisioning e la connessione a Cloud Shell dovrebbero richiedere solo qualche istante.

Init Cloud Shell

Questa macchina virtuale viene caricata con tutti gli strumenti di sviluppo di cui hai bisogno. Offre una home directory permanente da 5 GB e viene eseguita in Google Cloud, migliorando notevolmente le prestazioni di rete e l'autenticazione. Gran parte, se non tutto, del lavoro in questo codelab può essere svolto semplicemente con un browser o Chromebook.

Una volta eseguita la connessione a Cloud Shell, dovresti vedere che il tuo account è già autenticato e il progetto è già impostato sul tuo ID progetto.

Esegui questo comando in Cloud Shell per verificare che l'account sia autenticato:

gcloud auth list

L'output comando dovrebbe essere simile al seguente:

Output di Cloud Shell

Esegui questo comando in Cloud Shell per confermare che il comando gcloud è a conoscenza del tuo progetto:

gcloud config list project

Output comando

[core]
project = <PROJECT_ID>

In caso contrario, puoi impostarlo con questo comando:

gcloud config set project <PROJECT_ID>

Output comando

Updated property [core/project].

Cloud Shell ha alcune variabili di ambiente, tra cui GOOGLE_CLOUD_PROJECT, che contiene il nome del nostro attuale progetto Cloud. Lo utilizzeremo in vari punti di questo lab. Puoi visualizzarla eseguendo questo comando:

echo $GOOGLE_CLOUD_PROJECT

Passaggio 2: attivazione delle API

Nei passaggi successivi vedrai dove (e perché) sono necessari questi servizi, ma per il momento esegui questo comando per concedere al tuo progetto l'accesso ai servizi Compute Engine, Container Registry e Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Dovrebbe essere visualizzato un messaggio di operazione riuscita simile a questo:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Passaggio 3: crea un bucket Cloud Storage

Per eseguire un job di addestramento su Vertex AI, avremo bisogno di un bucket di archiviazione in cui archiviare gli asset salvati nel modello. Il bucket deve essere regionale. Qui utilizziamo us-central, ma puoi anche usare un'altra regione (basta sostituirla durante questo lab). Se hai già un bucket, puoi saltare questo passaggio.

Esegui questi comandi nel terminale Cloud Shell per creare un bucket:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

A questo punto daremo al nostro account di servizio Compute l'accesso a questo bucket. In questo modo, Vertex Pipelines avrà le autorizzazioni necessarie per scrivere file in questo bucket. Esegui questo comando per aggiungere l'autorizzazione:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Passaggio 4: crea un'istanza di Vertex AI Workbench

Dalla sezione Vertex AI della console Cloud, fai clic su Workbench:

Menu Vertex AI

Da qui, all'interno dei Blocchi note gestiti dall'utente, fai clic su Nuovo blocco note:

Crea nuovo blocco note

Quindi seleziona il tipo di istanza TensorFlow Enterprise 2.3 (con LTS) senza GPU:

Istanza TFE

Utilizza le opzioni predefinite e fai clic su Crea.

Passaggio 5: apri il blocco note

Una volta creata l'istanza, seleziona Apri JupyterLab:

Apri blocco note

4. Configurazione di Vertex Pipelines

Per utilizzare Vertex Pipelines, dovremo installare alcune librerie aggiuntive:

  • Kubeflow Pipelines: si tratta dell'SDK che utilizzeremo per creare la pipeline. Vertex Pipelines supporta l'esecuzione di pipeline create sia con Kubeflow Pipelines che con TFX.
  • Componenti della pipeline di Google Cloud: questa libreria fornisce componenti predefiniti che semplificano l'interazione con i servizi Vertex AI dai passaggi della pipeline.

Passaggio 1: crea un blocco note Python e installa le librerie

Innanzitutto, dal menu Avvio app dell'istanza di blocco note, crea un blocco note selezionando Python 3:

Crea blocco note Python3

Puoi accedere al menu Avvio app facendo clic sul segno + in alto a sinistra dell'istanza del blocco note.

Per installare entrambi i servizi che utilizzeremo in questo lab, per prima cosa imposta il flag utente nella cella di un blocco note:

USER_FLAG = "--user"

Poi esegui il codice seguente dal blocco note:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Dopo aver installato questi pacchetti, è necessario riavviare il kernel:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Infine, verifica di aver installato correttamente i pacchetti. La versione dell'SDK KFP deve essere maggiore o uguale a 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Passaggio 2: imposta l'ID progetto e il bucket

Durante questo lab, farai riferimento all'ID del progetto Cloud e al bucket che hai creato in precedenza. Ora creeremo le variabili per ciascuno di questi.

Se non conosci l'ID progetto, potresti riuscire a recuperarlo eseguendo il comando seguente:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Altrimenti, impostala qui:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Quindi crea una variabile per archiviare il nome del bucket. Se lo hai creato in questo lab, funzionerà quanto segue. In caso contrario, dovrai impostarla manualmente:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Passaggio 3: importa le librerie

Aggiungi quanto segue per importare le librerie che utilizzeremo in questo codelab:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Passaggio 4: definisci le costanti

L'ultima cosa che dobbiamo fare prima di creare la pipeline è definire alcune variabili costanti. PIPELINE_ROOT è il percorso di Cloud Storage in cui verranno scritti gli artefatti creati dalla nostra pipeline. Qui stiamo utilizzando us-central1 come regione, ma se hai utilizzato una regione diversa quando hai creato il bucket, aggiorna la variabile REGION nel codice seguente:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Dopo aver eseguito il codice riportato sopra, dovresti vedere la directory root della pipeline stampata. Questa è la località di Cloud Storage in cui verranno scritti gli artefatti della tua pipeline. Sarà nel formato gs://YOUR-BUCKET-NAME/pipeline_root/

5. Creazione della prima pipeline in corso...

Per acquisire familiarità con il funzionamento di Vertex Pipelines, creeremo prima una breve pipeline utilizzando l'SDK KFP. Questa pipeline non fa nulla di correlato all'ML (non preoccuparti, ci arriveremo!), la stiamo usando per insegnarti:

  • Come creare componenti personalizzati nell'SDK KFP
  • Come eseguire e monitorare una pipeline in Vertex Pipelines

Creeremo una pipeline che stampa una frase utilizzando due output: un nome di prodotto e una descrizione di emoji. Questa pipeline sarà composta da tre componenti:

  • product_name: questo componente prenderà il nome di un prodotto (o qualsiasi sostantivo che preferisci) come input e restituirà la stringa come output.
  • emoji: questo componente prenderà la descrizione testuale di un'emoji e la convertirà in un'emoji. Ad esempio, il codice di testo di ✨ è "sparkles". Questo componente utilizza una libreria di emoji per mostrarti come gestire le dipendenze esterne nella tua pipeline
  • build_sentence: questo componente finale consumerà l'output dei due precedenti per creare una frase che utilizza l'emoji. Ad esempio, l'output risultante potrebbe essere "Vertex Pipelines è ✨".

Iniziamo a programmare!

Passaggio 1: crea un componente basato su una funzione Python

Con l'SDK KFP, possiamo creare componenti basati sulle funzioni Python. Lo utilizzeremo per i tre componenti della nostra prima pipeline. Per prima cosa creeremo il componente product_name, che prende semplicemente una stringa come input e restituisce questa stringa. Aggiungi quanto segue al blocco note:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Osserviamo più da vicino la sintassi qui:

  • Il decorator @component compila questa funzione in un componente quando viene eseguita la pipeline. Da utilizzare ogni volta che scrivi un componente personalizzato.
  • Il parametro base_image specifica l'immagine container utilizzata dal componente.
  • Il parametro output_component_file è facoltativo e specifica il file YAML in cui scrivere il componente compilato. Dopo aver eseguito la cella, dovresti vedere il file scritto nell'istanza del blocco note. Per condividere questo componente con un altro utente, puoi inviare il file YAML generato e chiedergli di caricarlo con quanto segue:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Il parametro -> str dopo la definizione della funzione specifica il tipo di output per questo componente.

Passaggio 2: crea due componenti aggiuntivi

Per completare la pipeline, creeremo altri due componenti. La prima che definiamo prende una stringa come input e la converte nella corrispondente emoji, se presente. Restituisce una tupla con il testo di input passato e l'emoji risultante:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Questo componente è un po' più complesso di quello precedente. Analizziamo le novità:

  • Il parametro packages_to_install indica al componente eventuali dipendenze della libreria esterna per il container. In questo caso, stiamo utilizzando una raccolta chiamata emoji.
  • Questo componente restituisce un valore NamedTuple denominato Outputs. Nota che ciascuna delle stringhe in questa tupla ha chiavi: emoji_text e emoji. Le utilizzeremo nel nostro prossimo componente per accedere all'output.

Il componente finale di questa pipeline consumerà l'output dei primi due e li combina per restituire una stringa:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Forse ti starai chiedendo come fa questo componente a utilizzare l'output dei passaggi precedenti che hai definito. Ottima domanda. Spiegheremo tutto nel prossimo passaggio.

Passaggio 3: assemblare i componenti in una pipeline

Le definizioni dei componenti che abbiamo definito sopra hanno creato funzioni di fabbrica che possono essere utilizzate in una definizione della pipeline per creare i passaggi. Per configurare una pipeline, utilizza il decorator @pipeline, assegna un nome e una descrizione alla pipeline e indica il percorso principale in cui devono essere scritti gli artefatti della pipeline. Per artefatti si intende qualsiasi file di output generato dalla pipeline. Questa pipeline introduttiva non ne genera nessuna, ma lo farà la pipeline successiva.

Nel blocco di codice successivo definiamo una funzione intro_pipeline. È qui che specifichiamo gli input per le fasi iniziali della nostra pipeline e come le varie fasi si connettono tra loro:

  • product_task accetta un nome di prodotto come input. Qui passiamo "Vertex Pipelines" ma puoi modificarla come preferisci.
  • emoji_task accetta il codice di testo di un'emoji come input. Puoi anche modificarlo come preferisci. Ad esempio, "party_face" si riferisce all'emoji 🥳. Tieni presente che, poiché sia questo componente sia il componente product_task non hanno passaggi che alimentano l'input, specifichiamo manualmente l'input quando definiamo la pipeline.
  • L'ultimo passaggio della pipeline, consumer_task, prevede tre parametri di input:
    • L'output di product_task. Poiché questo passaggio produce un solo output, è possibile farvi riferimento tramite product_task.output.
    • L'output emoji del nostro passaggio emoji_task. Guarda il componente emoji definito sopra dove abbiamo denominato i parametri di output.
    • Allo stesso modo, l'output denominato emoji_text dal componente emoji. Se nella nostra pipeline viene passato testo che non corrisponde a un'emoji, utilizzerà questo testo per costruire una frase.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Passaggio 4: compila ed esegui la pipeline

Dopo aver definito la pipeline, puoi compilarla. Di seguito verrà generato un file JSON che utilizzerai per eseguire la pipeline:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

A questo punto, crea una variabile TIMESTAMP. Utilizzeremo questo indirizzo nel nostro ID job:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Quindi definisci il job della pipeline:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Infine, esegui il job per creare una nuova esecuzione della pipeline:

job.submit()

Dopo aver eseguito questa cella, dovresti vedere i log con un link per visualizzare l'esecuzione della pipeline nella console:

Log dei job di pipeline

Vai a quel link. Al termine, la pipeline dovrebbe avere questo aspetto:

Pipeline introduttiva completata

L'esecuzione di questa pipeline richiederà 5-6 minuti. Al termine, puoi fare clic sul componente build-sentence per visualizzare l'output finale:

Output pipeline introduttivo

Ora che hai familiarità con il funzionamento dell'SDK KFP e di Vertex Pipelines, puoi creare una pipeline che crea ed esegue il deployment di un modello ML utilizzando altri servizi Vertex AI. Iniziamo.

6. Creazione di una pipeline ML end-to-end

È il momento di creare la tua prima pipeline ML. In questa pipeline, utilizzeremo il set di dati dei fagioli secchi di UCI Machine Learning, proveniente da KOKLU, M. e OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques", "In Computers and Electronics in Agriculture, 174, 105507". DOI.

Questo è un set di dati tabulare e nella nostra pipeline lo utilizzeremo per addestrare, valutare ed eseguire il deployment di un modello AutoML che classifica i bean in una di sette tipologie in base alle loro caratteristiche.

Questa pipeline:

  • Crea un set di dati in
  • Addestra un modello di classificazione tabulare con AutoML
  • Ottieni metriche di valutazione su questo modello
  • In base alle metriche di valutazione, decidi se eseguire il deployment del modello utilizzando la logica condizionale in Vertex Pipelines
  • Eseguire il deployment del modello su un endpoint utilizzando Vertex Prediction

Ciascuno dei passaggi descritti sarà un componente. La maggior parte dei passaggi della pipeline utilizzerà componenti predefiniti per i servizi Vertex AI tramite la libreria google_cloud_pipeline_components che abbiamo importato in precedenza in questo codelab. In questa sezione, per prima cosa definiremo un componente personalizzato. Quindi, definirai gli altri passaggi della pipeline utilizzando componenti predefiniti. I componenti predefiniti semplificano l'accesso ai servizi Vertex AI, come l'addestramento e il deployment del modello.

Passaggio 1: un componente personalizzato per la valutazione del modello

Il componente personalizzato che definiamo verrà utilizzato verso la fine della pipeline al termine dell'addestramento del modello. Questo componente consente di:

  • Ottieni le metriche di valutazione dal modello di classificazione AutoML addestrato
  • Analizza le metriche ed esegui il rendering nella UI di Vertex Pipelines
  • Confronta le metriche con una soglia per determinare se deve essere eseguito il deployment del modello

Prima di definire il componente, comprendiamo i suoi parametri di input e output. Come input, questa pipeline prende alcuni metadati sul nostro progetto Cloud, sul modello addestrato risultante (definiremo questo componente in seguito), sulle metriche di valutazione del modello e un thresholds_dict_str. thresholds_dict_str è un valore che definiamo durante l'esecuzione della pipeline. Nel caso di questo modello di classificazione, questa sarà l'area sotto il valore della curva ROC per la quale dovremmo eseguire il deployment del modello. Ad esempio, se passiamo il valore 0,95, significa che vorremmo che la nostra pipeline esegua il deployment del modello solo se questa metrica è superiore al 95%.

Il nostro componente di valutazione restituisce una stringa che indica se eseguire o meno il deployment del modello. Per creare questo componente personalizzato, aggiungi quanto segue in una cella del blocco note:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Passaggio 2: aggiungi i componenti predefiniti di Google Cloud

In questo passaggio definiremo gli altri componenti della pipeline e vedremo come si integrano fra loro. Innanzitutto, definisci il nome visualizzato per l'esecuzione della pipeline utilizzando un timestamp:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Quindi copia quanto segue in una nuova cella del blocco note:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Vediamo cosa succede con questo codice:

  • In primo luogo, come nella pipeline precedente, definiamo i parametri di input richiesti dalla pipeline. Dobbiamo impostarli manualmente poiché non dipendono dall'output di altri passaggi nella pipeline.
  • Il resto della pipeline utilizza alcuni componenti predefiniti per l'interazione con i servizi Vertex AI:
    • TabularDatasetCreateOp crea un set di dati tabulare in Vertex AI la cui origine è in Cloud Storage o BigQuery. In questa pipeline, passiamo i dati tramite l'URL di una tabella BigQuery
    • AutoMLTabularTrainingJobRunOp avvia un job di addestramento AutoML per un set di dati tabulare. Passiamo alcuni parametri di configurazione a questo componente, tra cui il tipo di modello (in questo caso, la classificazione), alcuni dati sulle colonne, per quanto tempo eseguire l'addestramento e un puntatore al set di dati. Tieni presente che per passare il set di dati a questo componente, forniamo l'output del componente precedente tramite dataset_create_op.outputs["dataset"].
    • EndpointCreateOp crea un endpoint in Vertex AI. L'endpoint creato da questo passaggio verrà passato come input al componente successivo
    • ModelDeployOp esegue il deployment di un determinato modello su un endpoint in Vertex AI. In questo caso, utilizziamo l'endpoint creato nel passaggio precedente. Sono disponibili ulteriori opzioni di configurazione, ma qui forniamo il tipo di macchina e il modello dell'endpoint di cui vorremmo eseguire il deployment. Passiamo il modello accedendo agli output della fase di addestramento nella pipeline
  • Questa pipeline utilizza inoltre la logica condizionale, una funzionalità di Vertex Pipelines che consente di definire una condizione, insieme a diversi rami in base al risultato di quella condizione. Ricorda che quando abbiamo definito la pipeline abbiamo passato un parametro thresholds_dict_str. Questa è la soglia di accuratezza che stiamo utilizzando per determinare se eseguire il deployment del modello su un endpoint. Per implementare questa funzionalità, utilizziamo la classe Condition dell'SDK KFP. La condizione che trasmettiamo è l'output del componente di valutazione personalizzato che abbiamo definito in precedenza in questo codelab. Se questa condizione è vera, la pipeline continuerà a eseguire il componente deploy_op. Se l'accuratezza non soddisfa la soglia predefinita, la pipeline si ferma qui e non esegue il deployment di un modello.

Passaggio 3: compila ed esegui la pipeline ML end-to-end

Dopo aver definito la pipeline completa, occorre compilarla:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Quindi, definisci il job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Infine, esegui il job:

ml_pipeline_job.submit()

Vai al link mostrato nei log dopo aver eseguito la cella sopra per vedere la pipeline nella console. L'esecuzione di questa pipeline richiederà poco più di un'ora. La maggior parte del tempo viene dedicata alla fase di addestramento AutoML. La pipeline completa sarà simile a questa:

Pipeline AutoML completata

Se attivi/disattivi l'opzione "Espandi elementi" in alto, potrai visualizzare i dettagli dei diversi artefatti creati dalla pipeline. Ad esempio, se fai clic sull'artefatto dataset, visualizzerai i dettagli relativi al set di dati Vertex AI creato. Puoi fare clic sul link qui per andare alla pagina del set di dati:

Set di dati della pipeline

Allo stesso modo, per vedere le visualizzazioni delle metriche risultanti dal nostro componente di valutazione personalizzata, fai clic sull'artefatto metricsc. Sul lato destro della dashboard, potrai vedere la matrice di confusione per questo modello:

Visualizzazione delle metriche

Per visualizzare il modello e l'endpoint creati dall'esecuzione di questa pipeline, vai alla sezione dei modelli e fai clic sul modello denominato automl-beans. Qui dovresti vedere il deployment di questo modello in un endpoint:

Endpoint modello

Puoi anche accedere a questa pagina facendo clic sull'artefatto endpoint nel grafico della pipeline.

Oltre a guardare il grafico della pipeline nella console, puoi anche utilizzare Vertex Pipelines per il monitoraggio della derivazione. Per monitoraggio della derivazione, intendiamo il monitoraggio degli artefatti creati in tutta la pipeline. Questo può aiutarci a capire dove sono stati creati gli artefatti e come vengono utilizzati in un flusso di lavoro ML. Ad esempio, per visualizzare il monitoraggio della derivazione per il set di dati creato in questa pipeline, fai clic sull'artefatto del set di dati e poi su Visualizza derivazione:

Visualizza derivazione

Mostra tutte le posizioni in cui viene utilizzato questo artefatto:

Dettagli derivazione

Passaggio 4: confronta le metriche tra le esecuzioni della pipeline

Se esegui questa pipeline più volte, potrebbe essere utile confrontare le metriche tra le esecuzioni. Puoi utilizzare il metodo aiplatform.get_pipeline_df() per accedere ai metadati delle esecuzioni. Qui, otterremo i metadati per tutte le esecuzioni di questa pipeline e li caricheremo in un DataFrame Pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Con questo, hai terminato il lab.

🎉 Complimenti! 🎉

Hai imparato come utilizzare Vertex AI per:

  • Utilizzare l'SDK Kubeflow Pipelines per creare pipeline end-to-end con componenti personalizzati
  • Esegui le pipeline su Vertex Pipelines e avvia le esecuzioni della pipeline con l'SDK
  • Visualizza e analizza il grafico Vertex Pipelines nella console
  • Usa i componenti predefiniti della pipeline per aggiungere servizi Vertex AI alla tua pipeline
  • Pianifica job della pipeline ricorrenti

Per scoprire di più sulle diverse parti di Vertex, consulta la documentazione.

7. Esegui la pulizia

Per evitare addebiti, ti consigliamo di eliminare le risorse create durante questo lab.

Passaggio 1: arresta o elimina l'istanza di Notebooks

Se vuoi continuare a utilizzare il blocco note creato in questo lab, ti consigliamo di disattivarlo quando non lo usi. Dall'interfaccia utente di Notebooks nella console Cloud, seleziona il blocco note, quindi seleziona Interrompi. Se vuoi eliminare completamente l'istanza, seleziona Elimina:

Arresta istanza

Passaggio 2: elimina l'endpoint

Per eliminare l'endpoint di cui hai eseguito il deployment, vai alla sezione Endpoint della console Vertex AI e fai clic sull'icona Elimina:

Elimina endpoint

Quindi, fai clic su Annulla deployment dal seguente prompt:

Annulla il deployment del modello

Infine, vai alla sezione Modelli della console, individua il modello e, nel menu con tre puntini a destra, fai clic su Elimina modello:

Elimina modello

Passaggio 3: elimina il bucket Cloud Storage

Per eliminare il bucket di archiviazione, utilizzando il menu di navigazione nella console Cloud, vai a Storage, seleziona il bucket e fai clic su Elimina:

Elimina spazio di archiviazione