Présentation de Vertex Pipelines

1. Présentation

Dans cet atelier, vous allez apprendre à créer et à exécuter des pipelines de ML avec Vertex Pipelines.

Objectifs

Vous allez apprendre à effectuer les opérations suivantes :

  • Utiliser le SDK Kubeflow Pipelines pour créer des pipelines de ML évolutifs
  • Créer et exécuter un pipeline d'introduction en trois étapes acceptant du texte en entrée
  • Créer et exécuter un pipeline qui entraîne, évalue et déploie un modèle de classification AutoML
  • Utiliser des composants prédéfinis pour interagir avec les services Vertex AI, fournis via la bibliothèque google_cloud_pipeline_components
  • Planifier un job de pipeline avec Cloud Scheduler

Le coût total d'exécution de cet atelier sur Google Cloud est d'environ 25$.

2. Présentation de Vertex AI

Cet atelier utilise la toute dernière offre de produits d'IA de Google Cloud. Vertex AI simplifie l'expérience de développement en intégrant toutes les offres de ML de Google Cloud. Auparavant, les modèles entraînés avec AutoML et les modèles personnalisés étaient accessibles depuis des services distincts. La nouvelle offre regroupe ces deux types de modèles mais aussi d'autres nouveaux produits en une seule API. Vous pouvez également migrer des projets existants vers Vertex AI.

Outre les services d'entraînement et de déploiement de modèles, Vertex AI inclut également divers produits MLOps comme Vertex Pipelines (sur lequel porte cet atelier), Model Monitoring, Feature Store, etc. Toutes les offres de produits de Vertex AI sont répertoriées dans le schéma ci-dessous.

Présentation des produits Vertex

Pour envoyer un commentaire, veuillez consulter la page d'assistance.

Pourquoi utiliser des pipelines de ML ?

Avant d'entrer dans le vif du sujet, vous devez comprendre les avantages d'un pipeline. Imaginez que vous créez un workflow de ML incluant le traitement de données, l'entraînement d'un modèle, le réglage d'hyperparamètres, l'évaluation et le déploiement d'un modèle. Chacune de ces étapes peut avoir des dépendances différentes, ce qui peut se révéler problématique si vous traitez l'ensemble du workflow comme un monolithe. Lorsque vous commencerez le scaling de votre processus de ML, vous souhaiterez peut-être partager votre workflow de ML avec le reste de votre équipe, afin qu'elle puisse l'exécuter et participer à l'écriture du code. Cela peut s'avérer difficile sans un processus fiable et reproductible. Avec les pipelines, chaque étape de votre processus de ML est son propre conteneur. Ainsi, vous pouvez développer des étapes de façon indépendante et suivre les entrées et sorties de chaque étape de manière reproductible. Vous pouvez également planifier ou déclencher des exécutions de votre pipeline en fonction d'autres événements dans votre environnement cloud. Par exemple, vous pouvez exécuter un pipeline lorsque de nouvelles données d'entraînement sont disponibles.

Résumé: les pipelines vous aident à automatiser et à reproduire votre workflow de ML.

3. Configurer l'environnement cloud

Pour suivre cet atelier de programmation, vous aurez besoin d'un projet Google Cloud Platform dans lequel la facturation est activée. Pour créer un projet, suivez ces instructions.

Étape 1: Démarrez Cloud Shell

Dans cet atelier, vous allez travailler dans une session Cloud Shell. Cet environnement est un interpréteur de commandes hébergé sur une machine virtuelle qui s'exécute dans le cloud de Google. Vous pourriez tout aussi facilement effectuer les tâches de cette section en local sur votre propre ordinateur, mais le fait d'utiliser Cloud Shell permet à chacun de bénéficier d'une expérience reproductible dans un environnement cohérent. Après l'atelier, libre à vous de reproduire cette section sur votre ordinateur.

Autoriser Cloud Shell

Activer Cloud Shell

En haut à droite de la console Cloud, cliquez sur le bouton ci-dessous pour activer Cloud Shell:

Activer Cloud Shell

Si vous n'avez jamais démarré Cloud Shell auparavant, un écran intermédiaire s'affiche en dessous de la ligne de flottaison, décrivant de quoi il s'agit. Si tel est le cas, cliquez sur Continuer. Cet écran ne s'affiche qu'une seule fois. Voici à quoi il ressemble :

Configuration de Cloud Shell

Le provisionnement et la connexion à Cloud Shell ne devraient pas prendre plus de quelques minutes.

Init Cloud Shell

Cette machine virtuelle contient tous les outils de développement dont vous avez besoin. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances du réseau et l'authentification. Vous pouvez réaliser une grande partie, voire la totalité, des activités de cet atelier dans un simple navigateur ou sur votre Chromebook.

Une fois connecté à Cloud Shell, vous êtes en principe authentifié et le projet est défini avec votre ID de projet.

Exécutez la commande suivante dans Cloud Shell pour vérifier que vous êtes authentifié :

gcloud auth list

Le résultat de la commande devrait ressembler à ceci:

Résultat Cloud Shell

Exécutez la commande suivante dans Cloud Shell pour vérifier que la commande gcloud connaît votre projet:

gcloud config list project

Résultat de la commande

[core]
project = <PROJECT_ID>

Si vous obtenez un résultat différent, exécutez cette commande :

gcloud config set project <PROJECT_ID>

Résultat de la commande

Updated property [core/project].

Cloud Shell comporte quelques variables d'environnement, parmi lesquelles GOOGLE_CLOUD_PROJECT, qui contient le nom de notre projet Cloud actuel. Nous l'utiliserons à différents endroits tout au long de cet atelier. Pour la voir, exécutez la commande suivante :

echo $GOOGLE_CLOUD_PROJECT

Étape 2 : Activez les API

Dans les étapes suivantes, vous verrez où ces services sont nécessaires (et pourquoi). Pour le moment, exécutez la commande suivante afin d'autoriser votre projet à accéder aux services Compute Engine, Container Registry et Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Un message semblable à celui qui suit s'affiche pour vous indiquer que l'opération s'est correctement déroulée :

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Étape 3: Créez un bucket Cloud Storage

Pour exécuter un job d'entraînement sur Vertex AI, nous avons besoin d'un bucket de stockage afin de stocker les ressources de modèle enregistrées. Ce bucket doit être régional. Nous utilisons ici us-central, mais vous pouvez utiliser une autre région (il vous suffit d'effectuer le remplacement tout au long de cet atelier). Si vous disposez déjà d'un bucket, vous pouvez ignorer cette étape.

Exécutez les commandes suivantes dans votre terminal Cloud Shell afin de créer un bucket :

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Nous allons ensuite autoriser notre compte de service Compute à accéder à ce bucket. Ainsi, Vertex Pipelines disposera des autorisations nécessaires pour écrire des fichiers dans ce bucket. Exécutez la commande suivante pour ajouter cette autorisation :

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Étape 4 : Créez une instance Vertex AI Workbench

Dans la section Vertex AI de Cloud Console, cliquez sur Workbench :

Menu Vertex AI

Dans Notebooks gérés par l'utilisateur, cliquez sur Nouveau notebook:

Créer un notebook

Sélectionnez ensuite le type d'instance TensorFlow Enterprise 2.3 (avec LTS) sans GPU:

Instance TFE

Utilisez les options par défaut, puis cliquez sur Créer.

Étape 5: Ouvrez votre notebook

Une fois l'instance créée, sélectionnez Ouvrir JupyterLab:

Ouvrir le notebook

4. Configuration de Vertex Pipelines

Pour utiliser Vertex Pipelines, il est nécessaire d'installer quelques bibliothèques supplémentaires.

  • Kubeflow Pipelines: SDK que nous utiliserons pour créer notre pipeline. Vertex Pipelines est compatible avec l'exécution de pipelines créés avec Kubeflow Pipelines ou TFX.
  • Google Cloud Pipeline Components : cette bibliothèque fournit des composants prédéfinis qui permettent d'interagir plus facilement avec les services Vertex AI à partir de vos étapes de pipeline.

Étape 1 : Créez un notebook Python et installez les bibliothèques

Tout d'abord, dans le menu de lancement de votre instance de notebook, créez un notebook en sélectionnant Python 3:

Créer un notebook Python3

Pour accéder au menu de lancement, cliquez sur le signe + en haut à gauche de votre instance de notebook.

Pour installer les deux services que nous utiliserons dans cet atelier, vous devez d'abord définir l'option utilisateur dans une cellule du notebook :

USER_FLAG = "--user"

Exécutez ensuite les commandes suivantes depuis votre notebook :

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Une fois ces packages installés, vous devrez redémarrer le noyau :

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Pour finir, vérifiez que vous avez correctement installé les packages. Utilisez la version 1.8 du SDK KFP ou une version ultérieure:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Étape 2 : Définissez l'ID et le bucket de votre projet

Au cours de cet atelier, vous allez référencer votre ID de projet Cloud et le bucket que vous avez créé un peu plus tôt. Nous allons ensuite créer des variables pour chacun d'eux.

Si vous ne connaissez pas votre ID de projet, vous pouvez l'obtenir en exécutant la commande suivante :

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Sinon, définissez-le ici:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Ensuite, créez une variable pour stocker le nom de votre bucket. Si vous l'avez créée au cours de cet atelier, les étapes suivantes devraient fonctionner. Sinon, vous devrez la définir manuellement :

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Étape 3 : Importez les bibliothèques

Ajoutez les éléments suivants pour importer les bibliothèques que nous utiliserons tout au long de cet atelier de programmation:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Étape 4 : Définissez les constantes

La dernière chose que nous devons faire avant de pouvoir créer notre pipeline est de définir des variables constantes. PIPELINE_ROOT est le chemin d'accès Cloud Storage dans lequel les artefacts créés par notre pipeline seront écrits. Ici, nous utilisons us-central1 comme région, mais si vous avez choisi une autre région lorsque vous avez créé votre bucket, modifiez la variable REGION dans le code ci-dessous:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Une fois le code ci-dessus exécuté, le répertoire racine de votre pipeline doit s'afficher. Il s'agit de l'emplacement Cloud Storage dans lequel les artefacts de votre pipeline seront écrits. Il se présentera au format gs://YOUR-BUCKET-NAME/pipeline_root/.

5. Créer votre premier pipeline

Pour vous permettre de vous familiariser avec Vertex Pipelines, nous commençons par créer un pipeline court à l'aide du SDK KFP. Ce pipeline n'implique pas de ML (ne vous inquiétez pas, on y viendra), car cet atelier vise à vous apprendre à :

  • créer des composants personnalisés dans le SDK KFP ;
  • exécuter et surveiller un pipeline dans Vertex Pipelines.

Nous allons créer un pipeline qui affiche une phrase en utilisant deux sorties : un nom de produit et une description d'emoji. Ce pipeline comporte trois composants :

  • product_name: ce composant accepte un nom de produit (ou n'importe quel nom de votre choix) en entrée et renvoie cette chaîne en sortie.
  • emoji: ce composant accepte la description textuelle d'un emoji et la convertit en emoji. Par exemple, le code textuel pour ✨ est "sparkles". Ce composant utilise une bibliothèque d'emoji pour vous montrer comment gérer des dépendances externes dans votre pipeline.
  • build_sentence: ce dernier composant utilise la sortie des deux précédents pour créer une phrase contenant cet emoji. Par exemple, le résultat peut être "Vertex Pipelines, c'est ✨".

Commençons à coder !

Étape 1: Créez un composant basé sur une fonction Python

Le SDK KFP nous permet de créer des composants basés sur des fonctions Python. Nous utiliserons cet outil pour les 3 composants de notre premier pipeline. Nous allons d'abord créer le composant product_name, qui accepte simplement une chaîne en entrée et la renvoie. Ajoutez le code suivant à votre notebook :

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Examinons de plus près cette syntaxe :

  • Le décorateur @component compile cette fonction en un composant lorsque le pipeline est exécuté. Vous l'utiliserez à chaque fois que vous écrirez un composant personnalisé.
  • Le paramètre base_image spécifie l'image de conteneur que ce composant utilise.
  • Le paramètre output_component_file est facultatif et spécifie le fichier yaml dans lequel écrire le composant compilé. Une fois la cellule exécutée, ce fichier doit être écrit dans votre instance de notebook. Si vous souhaitez partager ce composant avec une autre personne, vous pouvez lui envoyer le fichier yaml qui a été généré afin qu'elle le charge à l'aide de la commande suivante :
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Le code -> str après la définition de la fonction spécifie le type de sortie pour ce composant.

Étape 2 : Créez deux composants supplémentaires

Pour terminer notre pipeline, nous créerons deux autres composants. Le premier accepte une chaîne en entrée et la convertit en l'emoji correspondant, le cas échéant. Il renvoie un tuple avec le texte d'entrée transmis, ainsi que l'emoji correspondant :

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Ce composant est un peu plus complexe que le précédent. Voici les nouveautés :

  • Le paramètre packages_to_install indique au composant les éventuelles dépendances de bibliothèques externes pour ce conteneur. Dans ce cas, nous utilisons une bibliothèque appelée emoji.
  • Ce composant renvoie un NamedTuple appelé Outputs. Notez que chacune des chaînes de ce tuple dispose de clés: emoji_text et emoji. Dans le prochain composant, nous les utiliserons pour accéder à la sortie.

Le dernier composant de ce pipeline utilise la sortie des deux premiers et les combine pour renvoyer une chaîne :

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Vous vous demanderez peut-être comment ce composant sait qu'il doit utiliser la sortie des étapes précédentes. Bonne question. Vous le découvrirez à l'étape suivante.

Étape 3 : Assemblez les composants en un pipeline

Les définitions des composants ci-dessus ont permis de créer des fonctions de fabrique pouvant être utilisées dans la définition d'un pipeline afin de créer des étapes. Pour configurer un pipeline, utilisez le décorateur @pipeline, fournissez le nom et la description du pipeline, et indiquez le chemin d'accès racine dans lequel les artefacts de votre pipeline doivent être écrits. Le terme "artefacts" désigne tous les fichiers de sortie générés par votre pipeline. Ce pipeline d'introduction n'en génère pas, mais ce sera le cas du prochain.

Dans le bloc de code suivant, nous définissons une fonction intro_pipeline. C'est là que nous spécifions les entrées de nos étapes de pipeline initiales et les liens entre les différentes étapes :

  • product_task accepte un nom de produit en entrée. Nous transmettons ici "Vertex Pipelines", mais vous pouvez le remplacer par le nom de votre choix.
  • emoji_task accepte le code textuel d'un emoji en entrée. Vous pouvez également modifier cette information. Par exemple, "party_face" correspond à l'emoji 🥳. Étant donné que ce composant et le composant product_task ne reçoivent pas d'entrées d'autres étapes, nous devons indiquer manuellement les entrées lorsque nous définissons notre pipeline.
  • La dernière étape de notre pipeline, consumer_task, compte trois paramètres d'entrée:
    • Résultat de product_task. Cette étape ne produisant qu'une seule sortie, nous pouvons la référencer à l'aide de product_task.output.
    • La sortie emoji de notre étape emoji_task. Reportez-vous au composant emoji défini ci-dessus, pour lequel nous avons nommé les paramètres de sortie.
    • De même, la sortie nommée emoji_text du composant emoji. Si notre pipeline reçoit du texte qui ne correspond pas à un emoji, il utilisera ce texte pour construire une phrase.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Étape 4 : Compilez et exécutez le pipeline

Une fois que vous avez défini votre pipeline, vous pouvez le compiler. Le code suivant génère un fichier JSON qui vous permettra d'exécuter le pipeline :

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Ensuite, créez une variable TIMESTAMP. Nous l'utiliserons dans notre ID de tâche:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Définissez ensuite votre job de pipeline:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Enfin, exécutez la tâche pour créer une exécution de pipeline:

job.submit()

Après avoir exécuté cette cellule, vous devriez voir des journaux avec un lien permettant d'afficher l'exécution du pipeline dans votre console:

Journaux des tâches de pipeline

Accédez à ce lien. Une fois l'exécution terminée, votre pipeline doit ressembler à ceci:

Pipeline d&#39;introduction terminé

L'exécution de ce pipeline prend cinq à six minutes. Une fois l'opération terminée, vous pouvez cliquer sur le composant build-sentence pour afficher le résultat final:

Sortie du pipeline d&#39;introduction

Maintenant que vous maîtrisez le fonctionnement du SDK KFP et de Vertex Pipelines, vous êtes prêt à construire un pipeline qui crée et déploie un modèle de ML à l'aide d'autres services Vertex AI. C'est parti !

6. Créer un pipeline de ML de bout en bout

Il est temps de créer votre premier pipeline de ML. Dans ce pipeline, nous utiliserons l'ensemble de données UCI Machine Learning Dry beans (Fèves sèches) de KOKLU, M. et OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques." publié dans "Computers and Electronics in Agriculture", 174, 105507. DOI.

Il s'agit d'un ensemble de données tabulaire. Dans notre pipeline, nous l'utiliserons pour entraîner, évaluer et déployer un modèle AutoML qui classe les haricots en sept catégories en fonction de leurs caractéristiques.

Ce pipeline va :

  • Créer un ensemble de données dans
  • Entraîner un modèle de classification tabulaire avec AutoML
  • obtenir des métriques d'évaluation sur ce modèle ;
  • décider, à partir des métriques d'évaluation, s'il faut déployer le modèle à l'aide d'une logique conditionnelle dans Vertex Pipelines ;
  • Déployer le modèle sur un point de terminaison à l'aide de Vertex Prediction

Chacune des étapes énoncées correspond à un composant. La plupart des étapes du pipeline utilisent des composants prédéfinis pour les services Vertex AI via la bibliothèque google_cloud_pipeline_components que vous avez importée plus tôt au cours de cet atelier de programmation. Dans cette section, nous allons commencer par définir un composant personnalisé. Ensuite, nous définirons les autres étapes du pipeline à l'aide de composants prédéfinis. Ces composants facilitent l'accès aux services Vertex AI, comme l'entraînement et le déploiement de modèles.

Étape 1 : Composant personnalisé pour l'évaluation du modèle

Le composant personnalisé que nous allons définir servira vers la fin de notre pipeline, une fois l'entraînement du modèle terminé. Ce composant a plusieurs fonctions :

  • Obtenir les métriques d'évaluation du modèle de classification AutoML entraîné
  • Analyser les métriques et en générer un rendu dans l'interface Vertex Pipelines
  • Comparer les métriques à un seuil pour déterminer si le modèle doit être déployé

Avant de définir le composant, nous devons comprendre ses paramètres d'entrée et de sortie. Ce pipeline accepte en entrée des métadonnées de notre projet Cloud, le modèle entraîné obtenu (nous définirons ce composant plus tard), les métriques d'évaluation du modèle et un thresholds_dict_str. Nous définirons thresholds_dict_str lorsque nous exécuterons notre pipeline. Dans le cas de ce modèle de classification, il s'agira de l'aire sous la courbe ROC pour laquelle nous devons déployer le modèle. Par exemple, si nous transmettons 0,95, cela signifie que notre pipeline ne doit déployer le modèle que si cette métrique est supérieure à 95%.

Notre composant d'évaluation renvoie une chaîne qui indique s'il faut ou non déployer le modèle. Ajoutez le code suivant dans une cellule de notebook pour créer ce composant personnalisé :

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Étape 2 : Ajoutez les composants Google Cloud prédéfinis

À cette étape, nous allons définir les autres composants de notre pipeline et découvrir comment ils s'agencent. Tout d'abord, définissez le nom à afficher de votre exécution de pipeline à l'aide d'un code temporel :

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Ensuite, copiez le code suivant dans une nouvelle cellule de notebook :

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Voyons ce qui se passe dans ce code:

  • Tout d'abord, comme dans notre pipeline précédent, nous définissons les paramètres d'entrée du pipeline. Nous devons les définir manuellement, car elles ne dépendent pas du résultat d'autres étapes du pipeline.
  • Le reste du pipeline utilise quelques composants prédéfinis pour interagir avec les services Vertex AI:
    • TabularDatasetCreateOp crée un ensemble de données tabulaire dans Vertex AI à partir d'une source d'ensemble de données dans Cloud Storage ou BigQuery. Dans ce pipeline, nous transmettons les données via une URL de table BigQuery.
    • AutoMLTabularTrainingJobRunOp lance un job d'entraînement AutoML pour un ensemble de données tabulaire. Nous transmettons quelques paramètres de configuration à ce composant, y compris le type de modèle (dans ce cas, classification), des données sur les colonnes, la durée d'exécution souhaitée pour l'entraînement, ainsi qu'un pointeur vers l'ensemble de données. Notez que pour transmettre l'ensemble de données à ce composant, nous fournissons la sortie du composant précédent via dataset_create_op.outputs["dataset"].
    • EndpointCreateOp crée un point de terminaison dans Vertex AI. Le point de terminaison créé à partir de cette étape sera transmis en entrée au composant suivant.
    • ModelDeployOp déploie un modèle donné vers un point de terminaison dans Vertex AI. Dans ce cas, nous utilisons le point de terminaison créé à l'étape précédente. D'autres options de configuration sont disponibles, mais nous fournissons ici le type et le modèle de machine du point de terminaison que nous souhaitons déployer. Pour transmettre le modèle, nous accédons aux sorties de l'étape d'entraînement de notre pipeline.
  • Ce pipeline utilise également la logique conditionnelle, une fonctionnalité de Vertex Pipelines qui vous permet de définir une condition ainsi que des branches différentes selon le résultat de cette condition. N'oubliez pas que lorsque nous avons défini notre pipeline, nous avons transmis un paramètre thresholds_dict_str. Il s'agit du seuil de justesse que nous utilisons pour déterminer si le modèle doit être déployé sur un point de terminaison. Pour l'implémenter, nous utilisons la classe Condition du SDK KFP. La condition que nous transmettons est la sortie du composant d'évaluation personnalisé que nous avons défini plus tôt dans cet atelier de programmation. Si cette condition est vraie, le pipeline continue d'exécuter le composant deploy_op. Si la justesse n'atteint pas notre seuil prédéfini, le pipeline s'arrête et ne déploie pas de modèle.

Étape 3 : Compilez et exécutez le pipeline de ML de bout en bout

Maintenant que notre pipeline est entièrement défini, il est temps de le compiler:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Ensuite, définissez la tâche:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Enfin, exécutez le job:

ml_pipeline_job.submit()

Accédez au lien indiqué dans les journaux après avoir exécuté la cellule ci-dessus pour afficher votre pipeline dans la console. L'exécution de ce pipeline prend un peu plus d'une heure. principalement consacrée à l'étape d'entraînement AutoML. Le pipeline terminé doit ressembler à l'exemple suivant :

Pipeline AutoML terminé

Si vous activez le bouton "Développer les artefacts" en haut, vous pourrez consulter les détails des différents artefacts créés à partir de votre pipeline. Par exemple, si vous cliquez sur l'artefact dataset, vous pourrez consulter les détails de l'ensemble de données Vertex AI créé. Vous pouvez cliquer sur le lien affiché pour accéder à la page de cet ensemble de données :

Ensemble de données du pipeline

De même, pour consulter les visualisations de métriques résultant de notre composant d'évaluation personnalisé, cliquez sur l'artefact nommé metricsc. Sur le côté droit du tableau de bord, vous pourrez consulter la matrice de confusion du modèle :

Visualisation des métriques

Pour voir le modèle et le point de terminaison créés à partir de cette exécution de pipeline, accédez à la section "Modèles", puis cliquez sur le modèle nommé automl-beans. Le modèle déployé sur un point de terminaison doit s'afficher :

Model-endpoint

Vous pouvez également accéder à cette page en cliquant sur l'artefact endpoint dans le graphique du pipeline.

En plus de consulter le graphique du pipeline dans la console, vous pouvez également utiliser Vertex Pipelines pour effectuer le suivi de la traçabilité. Le suivi de la traçabilité désigne le suivi des artefacts créés via votre pipeline. Cela peut nous aider à comprendre où les artefacts ont été créés et comment ils sont utilisés dans un workflow de ML. Par exemple, pour consulter le suivi de la traçabilité pour l'ensemble de données créé dans ce pipeline, cliquez sur l'artefact de l'ensemble de données, puis sur Afficher la traçabilité :

Afficher la traçabilité

Nous voyons ici tous les emplacements où cet artefact est utilisé:

Détails de la traçabilité

Étape 4 : Comparez les métriques de plusieurs exécutions de pipeline

Si vous exécutez ce pipeline plusieurs fois, vous pouvez chercher à comparer les métriques des différentes exécutions. Vous pouvez utiliser la méthode aiplatform.get_pipeline_df() pour accéder aux métadonnées des exécutions. Voici comment obtenir les métadonnées de toutes les exécutions du pipeline et les charger dans un DataFrame Pandas :

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Vous avez maintenant terminé l'atelier.

🎉 Félicitations ! 🎉

Vous savez désormais utiliser Vertex AI pour :

  • Utiliser le SDK Kubeflow Pipelines pour créer des pipelines de bout en bout avec des composants personnalisés
  • Exécuter vos pipelines sur Vertex Pipelines et lancer des exécutions de pipeline avec le SDK
  • Afficher et analyser votre graphique Vertex Pipelines dans la console
  • Utiliser des composants de pipeline prédéfinis pour ajouter des services Vertex AI à votre pipeline
  • Planifier des jobs de pipeline récurrents

Pour en savoir plus sur les différents composants de Vertex, consultez la documentation.

7. Nettoyage

Pour éviter que des frais ne vous soient facturés, nous vous recommandons de supprimer les ressources créées tout au long de cet atelier.

Étape 1: Arrêtez ou supprimez votre instance Notebooks

Si vous souhaitez continuer à utiliser le notebook que vous avez créé dans cet atelier, nous vous recommandons de le désactiver quand vous ne vous en servez pas. À partir de l'interface utilisateur de Notebooks dans la console Cloud, sélectionnez le notebook et cliquez sur Arrêter. Si vous souhaitez supprimer définitivement l'instance, sélectionnez Delete (Supprimer).

Arrêter l&#39;instance

Étape 2: Supprimez votre point de terminaison

Pour supprimer le point de terminaison que vous avez déployé, accédez à la section Points de terminaison de votre console Vertex AI, puis cliquez sur l'icône de suppression:

Supprimer un point de terminaison

Cliquez ensuite sur Désinstaller dans l'invite suivante:

Annuler le déploiement du modèle

Enfin, accédez à la section Modèles de la console, recherchez le modèle, puis cliquez sur Supprimer le modèle dans le menu à trois points situé à droite:

Supprimer un modèle

Étape 3: Supprimez votre bucket Cloud Storage

Pour supprimer le bucket de stockage, utilisez le menu de navigation de la console Cloud pour accéder à Stockage, sélectionnez votre bucket puis cliquez sur "Supprimer" :

Supprimer l&#39;espace de stockage