1. Présentation
Dans cet atelier, vous allez apprendre à créer et à exécuter des pipelines de ML avec Vertex Pipelines.
Objectifs
Vous allez apprendre à effectuer les opérations suivantes :
- Utiliser le SDK Kubeflow Pipelines pour créer des pipelines de ML évolutifs
- Créer et exécuter un pipeline d'introduction en trois étapes acceptant du texte en entrée
- Créer et exécuter un pipeline qui entraîne, évalue et déploie un modèle de classification AutoML
- Utiliser des composants prédéfinis pour interagir avec les services Vertex AI, fournis via la bibliothèque
google_cloud_pipeline_components
- Planifier un job de pipeline avec Cloud Scheduler
Le coût total d'exécution de cet atelier sur Google Cloud est d'environ 25$.
2. Présentation de Vertex AI
Cet atelier utilise la toute dernière offre de produits d'IA de Google Cloud. Vertex AI simplifie l'expérience de développement en intégrant toutes les offres de ML de Google Cloud. Auparavant, les modèles entraînés avec AutoML et les modèles personnalisés étaient accessibles depuis des services distincts. La nouvelle offre regroupe ces deux types de modèles mais aussi d'autres nouveaux produits en une seule API. Vous pouvez également migrer des projets existants vers Vertex AI.
Outre les services d'entraînement et de déploiement de modèles, Vertex AI inclut également divers produits MLOps comme Vertex Pipelines (sur lequel porte cet atelier), Model Monitoring, Feature Store, etc. Toutes les offres de produits de Vertex AI sont répertoriées dans le schéma ci-dessous.
Pour envoyer un commentaire, veuillez consulter la page d'assistance.
Pourquoi utiliser des pipelines de ML ?
Avant d'entrer dans le vif du sujet, vous devez comprendre les avantages d'un pipeline. Imaginez que vous créez un workflow de ML incluant le traitement de données, l'entraînement d'un modèle, le réglage d'hyperparamètres, l'évaluation et le déploiement d'un modèle. Chacune de ces étapes peut avoir des dépendances différentes, ce qui peut se révéler problématique si vous traitez l'ensemble du workflow comme un monolithe. Lorsque vous commencerez le scaling de votre processus de ML, vous souhaiterez peut-être partager votre workflow de ML avec le reste de votre équipe, afin qu'elle puisse l'exécuter et participer à l'écriture du code. Cela peut s'avérer difficile sans un processus fiable et reproductible. Avec les pipelines, chaque étape de votre processus de ML est son propre conteneur. Ainsi, vous pouvez développer des étapes de façon indépendante et suivre les entrées et sorties de chaque étape de manière reproductible. Vous pouvez également planifier ou déclencher des exécutions de votre pipeline en fonction d'autres événements dans votre environnement cloud. Par exemple, vous pouvez exécuter un pipeline lorsque de nouvelles données d'entraînement sont disponibles.
Résumé: les pipelines vous aident à automatiser et à reproduire votre workflow de ML.
3. Configurer l'environnement cloud
Pour suivre cet atelier de programmation, vous aurez besoin d'un projet Google Cloud Platform dans lequel la facturation est activée. Pour créer un projet, suivez ces instructions.
Étape 1: Démarrez Cloud Shell
Dans cet atelier, vous allez travailler dans une session Cloud Shell. Cet environnement est un interpréteur de commandes hébergé sur une machine virtuelle qui s'exécute dans le cloud de Google. Vous pourriez tout aussi facilement effectuer les tâches de cette section en local sur votre propre ordinateur, mais le fait d'utiliser Cloud Shell permet à chacun de bénéficier d'une expérience reproductible dans un environnement cohérent. Après l'atelier, libre à vous de reproduire cette section sur votre ordinateur.
Activer Cloud Shell
En haut à droite de la console Cloud, cliquez sur le bouton ci-dessous pour activer Cloud Shell:
Si vous n'avez jamais démarré Cloud Shell auparavant, un écran intermédiaire s'affiche en dessous de la ligne de flottaison, décrivant de quoi il s'agit. Si tel est le cas, cliquez sur Continuer. Cet écran ne s'affiche qu'une seule fois. Voici à quoi il ressemble :
Le provisionnement et la connexion à Cloud Shell ne devraient pas prendre plus de quelques minutes.
Cette machine virtuelle contient tous les outils de développement dont vous avez besoin. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud, ce qui améliore nettement les performances du réseau et l'authentification. Vous pouvez réaliser une grande partie, voire la totalité, des activités de cet atelier dans un simple navigateur ou sur votre Chromebook.
Une fois connecté à Cloud Shell, vous êtes en principe authentifié et le projet est défini avec votre ID de projet.
Exécutez la commande suivante dans Cloud Shell pour vérifier que vous êtes authentifié :
gcloud auth list
Le résultat de la commande devrait ressembler à ceci:
Exécutez la commande suivante dans Cloud Shell pour vérifier que la commande gcloud connaît votre projet:
gcloud config list project
Résultat de la commande
[core] project = <PROJECT_ID>
Si vous obtenez un résultat différent, exécutez cette commande :
gcloud config set project <PROJECT_ID>
Résultat de la commande
Updated property [core/project].
Cloud Shell comporte quelques variables d'environnement, parmi lesquelles GOOGLE_CLOUD_PROJECT
, qui contient le nom de notre projet Cloud actuel. Nous l'utiliserons à différents endroits tout au long de cet atelier. Pour la voir, exécutez la commande suivante :
echo $GOOGLE_CLOUD_PROJECT
Étape 2 : Activez les API
Dans les étapes suivantes, vous verrez où ces services sont nécessaires (et pourquoi). Pour le moment, exécutez la commande suivante afin d'autoriser votre projet à accéder aux services Compute Engine, Container Registry et Vertex AI:
gcloud services enable compute.googleapis.com \
containerregistry.googleapis.com \
aiplatform.googleapis.com \
cloudbuild.googleapis.com \
cloudfunctions.googleapis.com
Un message semblable à celui qui suit s'affiche pour vous indiquer que l'opération s'est correctement déroulée :
Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.
Étape 3: Créez un bucket Cloud Storage
Pour exécuter un job d'entraînement sur Vertex AI, nous avons besoin d'un bucket de stockage afin de stocker les ressources de modèle enregistrées. Ce bucket doit être régional. Nous utilisons ici us-central
, mais vous pouvez utiliser une autre région (il vous suffit d'effectuer le remplacement tout au long de cet atelier). Si vous disposez déjà d'un bucket, vous pouvez ignorer cette étape.
Exécutez les commandes suivantes dans votre terminal Cloud Shell afin de créer un bucket :
BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME
Nous allons ensuite autoriser notre compte de service Compute à accéder à ce bucket. Ainsi, Vertex Pipelines disposera des autorisations nécessaires pour écrire des fichiers dans ce bucket. Exécutez la commande suivante pour ajouter cette autorisation :
gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin
Étape 4 : Créez une instance Vertex AI Workbench
Dans la section Vertex AI de Cloud Console, cliquez sur Workbench :
Dans Notebooks gérés par l'utilisateur, cliquez sur Nouveau notebook:
Sélectionnez ensuite le type d'instance TensorFlow Enterprise 2.3 (avec LTS) sans GPU:
Utilisez les options par défaut, puis cliquez sur Créer.
Étape 5: Ouvrez votre notebook
Une fois l'instance créée, sélectionnez Ouvrir JupyterLab:
4. Configuration de Vertex Pipelines
Pour utiliser Vertex Pipelines, il est nécessaire d'installer quelques bibliothèques supplémentaires.
- Kubeflow Pipelines: SDK que nous utiliserons pour créer notre pipeline. Vertex Pipelines est compatible avec l'exécution de pipelines créés avec Kubeflow Pipelines ou TFX.
- Google Cloud Pipeline Components : cette bibliothèque fournit des composants prédéfinis qui permettent d'interagir plus facilement avec les services Vertex AI à partir de vos étapes de pipeline.
Étape 1 : Créez un notebook Python et installez les bibliothèques
Tout d'abord, dans le menu de lancement de votre instance de notebook, créez un notebook en sélectionnant Python 3:
Pour accéder au menu de lancement, cliquez sur le signe + en haut à gauche de votre instance de notebook.
Pour installer les deux services que nous utiliserons dans cet atelier, vous devez d'abord définir l'option utilisateur dans une cellule du notebook :
USER_FLAG = "--user"
Exécutez ensuite les commandes suivantes depuis votre notebook :
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0
Une fois ces packages installés, vous devrez redémarrer le noyau :
import os
if not os.getenv("IS_TESTING"):
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
Pour finir, vérifiez que vous avez correctement installé les packages. Utilisez la version 1.8 du SDK KFP ou une version ultérieure:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
Étape 2 : Définissez l'ID et le bucket de votre projet
Au cours de cet atelier, vous allez référencer votre ID de projet Cloud et le bucket que vous avez créé un peu plus tôt. Nous allons ensuite créer des variables pour chacun d'eux.
Si vous ne connaissez pas votre ID de projet, vous pouvez l'obtenir en exécutant la commande suivante :
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
print("Project ID: ", PROJECT_ID)
Sinon, définissez-le ici:
if PROJECT_ID == "" or PROJECT_ID is None:
PROJECT_ID = "your-project-id" # @param {type:"string"}
Ensuite, créez une variable pour stocker le nom de votre bucket. Si vous l'avez créée au cours de cet atelier, les étapes suivantes devraient fonctionner. Sinon, vous devrez la définir manuellement :
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"
Étape 3 : Importez les bibliothèques
Ajoutez les éléments suivants pour importer les bibliothèques que nous utiliserons tout au long de cet atelier de programmation:
import kfp
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple
Étape 4 : Définissez les constantes
La dernière chose que nous devons faire avant de pouvoir créer notre pipeline est de définir des variables constantes. PIPELINE_ROOT
est le chemin d'accès Cloud Storage dans lequel les artefacts créés par notre pipeline seront écrits. Ici, nous utilisons us-central1
comme région, mais si vous avez choisi une autre région lorsque vous avez créé votre bucket, modifiez la variable REGION
dans le code ci-dessous:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT
Une fois le code ci-dessus exécuté, le répertoire racine de votre pipeline doit s'afficher. Il s'agit de l'emplacement Cloud Storage dans lequel les artefacts de votre pipeline seront écrits. Il se présentera au format gs://YOUR-BUCKET-NAME/pipeline_root/
.
5. Créer votre premier pipeline
Pour vous permettre de vous familiariser avec Vertex Pipelines, nous commençons par créer un pipeline court à l'aide du SDK KFP. Ce pipeline n'implique pas de ML (ne vous inquiétez pas, on y viendra), car cet atelier vise à vous apprendre à :
- créer des composants personnalisés dans le SDK KFP ;
- exécuter et surveiller un pipeline dans Vertex Pipelines.
Nous allons créer un pipeline qui affiche une phrase en utilisant deux sorties : un nom de produit et une description d'emoji. Ce pipeline comporte trois composants :
product_name
: ce composant accepte un nom de produit (ou n'importe quel nom de votre choix) en entrée et renvoie cette chaîne en sortie.emoji
: ce composant accepte la description textuelle d'un emoji et la convertit en emoji. Par exemple, le code textuel pour ✨ est "sparkles". Ce composant utilise une bibliothèque d'emoji pour vous montrer comment gérer des dépendances externes dans votre pipeline.build_sentence
: ce dernier composant utilise la sortie des deux précédents pour créer une phrase contenant cet emoji. Par exemple, le résultat peut être "Vertex Pipelines, c'est ✨".
Commençons à coder !
Étape 1: Créez un composant basé sur une fonction Python
Le SDK KFP nous permet de créer des composants basés sur des fonctions Python. Nous utiliserons cet outil pour les 3 composants de notre premier pipeline. Nous allons d'abord créer le composant product_name
, qui accepte simplement une chaîne en entrée et la renvoie. Ajoutez le code suivant à votre notebook :
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
return text
Examinons de plus près cette syntaxe :
- Le décorateur
@component
compile cette fonction en un composant lorsque le pipeline est exécuté. Vous l'utiliserez à chaque fois que vous écrirez un composant personnalisé. - Le paramètre
base_image
spécifie l'image de conteneur que ce composant utilise. - Le paramètre
output_component_file
est facultatif et spécifie le fichier yaml dans lequel écrire le composant compilé. Une fois la cellule exécutée, ce fichier doit être écrit dans votre instance de notebook. Si vous souhaitez partager ce composant avec une autre personne, vous pouvez lui envoyer le fichier yaml qui a été généré afin qu'elle le charge à l'aide de la commande suivante :
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
- Le code
-> str
après la définition de la fonction spécifie le type de sortie pour ce composant.
Étape 2 : Créez deux composants supplémentaires
Pour terminer notre pipeline, nous créerons deux autres composants. Le premier accepte une chaîne en entrée et la convertit en l'emoji correspondant, le cas échéant. Il renvoie un tuple avec le texte d'entrée transmis, ainsi que l'emoji correspondant :
@component(packages_to_install=["emoji"])
def emoji(
text: str,
) -> NamedTuple(
"Outputs",
[
("emoji_text", str), # Return parameters
("emoji", str),
],
):
import emoji
emoji_text = text
emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
return (emoji_text, emoji_str)
Ce composant est un peu plus complexe que le précédent. Voici les nouveautés :
- Le paramètre
packages_to_install
indique au composant les éventuelles dépendances de bibliothèques externes pour ce conteneur. Dans ce cas, nous utilisons une bibliothèque appelée emoji. - Ce composant renvoie un
NamedTuple
appeléOutputs
. Notez que chacune des chaînes de ce tuple dispose de clés:emoji_text
etemoji
. Dans le prochain composant, nous les utiliserons pour accéder à la sortie.
Le dernier composant de ce pipeline utilise la sortie des deux premiers et les combine pour renvoyer une chaîne :
@component
def build_sentence(
product: str,
emoji: str,
emojitext: str
) -> str:
print("We completed the pipeline, hooray!")
end_str = product + " is "
if len(emoji) > 0:
end_str += emoji
else:
end_str += emojitext
return(end_str)
Vous vous demanderez peut-être comment ce composant sait qu'il doit utiliser la sortie des étapes précédentes. Bonne question. Vous le découvrirez à l'étape suivante.
Étape 3 : Assemblez les composants en un pipeline
Les définitions des composants ci-dessus ont permis de créer des fonctions de fabrique pouvant être utilisées dans la définition d'un pipeline afin de créer des étapes. Pour configurer un pipeline, utilisez le décorateur @pipeline
, fournissez le nom et la description du pipeline, et indiquez le chemin d'accès racine dans lequel les artefacts de votre pipeline doivent être écrits. Le terme "artefacts" désigne tous les fichiers de sortie générés par votre pipeline. Ce pipeline d'introduction n'en génère pas, mais ce sera le cas du prochain.
Dans le bloc de code suivant, nous définissons une fonction intro_pipeline
. C'est là que nous spécifions les entrées de nos étapes de pipeline initiales et les liens entre les différentes étapes :
product_task
accepte un nom de produit en entrée. Nous transmettons ici "Vertex Pipelines", mais vous pouvez le remplacer par le nom de votre choix.emoji_task
accepte le code textuel d'un emoji en entrée. Vous pouvez également modifier cette information. Par exemple, "party_face" correspond à l'emoji 🥳. Étant donné que ce composant et le composantproduct_task
ne reçoivent pas d'entrées d'autres étapes, nous devons indiquer manuellement les entrées lorsque nous définissons notre pipeline.- La dernière étape de notre pipeline,
consumer_task
, compte trois paramètres d'entrée:- Résultat de
product_task
. Cette étape ne produisant qu'une seule sortie, nous pouvons la référencer à l'aide deproduct_task.output
. - La sortie
emoji
de notre étapeemoji_task
. Reportez-vous au composantemoji
défini ci-dessus, pour lequel nous avons nommé les paramètres de sortie. - De même, la sortie nommée
emoji_text
du composantemoji
. Si notre pipeline reçoit du texte qui ne correspond pas à un emoji, il utilisera ce texte pour construire une phrase.
- Résultat de
@pipeline(
name="hello-world",
description="An intro pipeline",
pipeline_root=PIPELINE_ROOT,
)
# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
product_task = product_name(text)
emoji_task = emoji(emoji_str)
consumer_task = build_sentence(
product_task.output,
emoji_task.outputs["emoji"],
emoji_task.outputs["emoji_text"],
)
Étape 4 : Compilez et exécutez le pipeline
Une fois que vous avez défini votre pipeline, vous pouvez le compiler. Le code suivant génère un fichier JSON qui vous permettra d'exécuter le pipeline :
compiler.Compiler().compile(
pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)
Ensuite, créez une variable TIMESTAMP
. Nous l'utiliserons dans notre ID de tâche:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
Définissez ensuite votre job de pipeline:
job = aiplatform.PipelineJob(
display_name="hello-world-pipeline",
template_path="intro_pipeline_job.json",
job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
enable_caching=True
)
Enfin, exécutez la tâche pour créer une exécution de pipeline:
job.submit()
Après avoir exécuté cette cellule, vous devriez voir des journaux avec un lien permettant d'afficher l'exécution du pipeline dans votre console:
Accédez à ce lien. Une fois l'exécution terminée, votre pipeline doit ressembler à ceci:
L'exécution de ce pipeline prend cinq à six minutes. Une fois l'opération terminée, vous pouvez cliquer sur le composant build-sentence
pour afficher le résultat final:
Maintenant que vous maîtrisez le fonctionnement du SDK KFP et de Vertex Pipelines, vous êtes prêt à construire un pipeline qui crée et déploie un modèle de ML à l'aide d'autres services Vertex AI. C'est parti !
6. Créer un pipeline de ML de bout en bout
Il est temps de créer votre premier pipeline de ML. Dans ce pipeline, nous utiliserons l'ensemble de données UCI Machine Learning Dry beans (Fèves sèches) de KOKLU, M. et OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques." publié dans "Computers and Electronics in Agriculture", 174, 105507. DOI.
Il s'agit d'un ensemble de données tabulaire. Dans notre pipeline, nous l'utiliserons pour entraîner, évaluer et déployer un modèle AutoML qui classe les haricots en sept catégories en fonction de leurs caractéristiques.
Ce pipeline va :
- Créer un ensemble de données dans
- Entraîner un modèle de classification tabulaire avec AutoML
- obtenir des métriques d'évaluation sur ce modèle ;
- décider, à partir des métriques d'évaluation, s'il faut déployer le modèle à l'aide d'une logique conditionnelle dans Vertex Pipelines ;
- Déployer le modèle sur un point de terminaison à l'aide de Vertex Prediction
Chacune des étapes énoncées correspond à un composant. La plupart des étapes du pipeline utilisent des composants prédéfinis pour les services Vertex AI via la bibliothèque google_cloud_pipeline_components
que vous avez importée plus tôt au cours de cet atelier de programmation. Dans cette section, nous allons commencer par définir un composant personnalisé. Ensuite, nous définirons les autres étapes du pipeline à l'aide de composants prédéfinis. Ces composants facilitent l'accès aux services Vertex AI, comme l'entraînement et le déploiement de modèles.
Étape 1 : Composant personnalisé pour l'évaluation du modèle
Le composant personnalisé que nous allons définir servira vers la fin de notre pipeline, une fois l'entraînement du modèle terminé. Ce composant a plusieurs fonctions :
- Obtenir les métriques d'évaluation du modèle de classification AutoML entraîné
- Analyser les métriques et en générer un rendu dans l'interface Vertex Pipelines
- Comparer les métriques à un seuil pour déterminer si le modèle doit être déployé
Avant de définir le composant, nous devons comprendre ses paramètres d'entrée et de sortie. Ce pipeline accepte en entrée des métadonnées de notre projet Cloud, le modèle entraîné obtenu (nous définirons ce composant plus tard), les métriques d'évaluation du modèle et un thresholds_dict_str
. Nous définirons thresholds_dict_str
lorsque nous exécuterons notre pipeline. Dans le cas de ce modèle de classification, il s'agira de l'aire sous la courbe ROC pour laquelle nous devons déployer le modèle. Par exemple, si nous transmettons 0,95, cela signifie que notre pipeline ne doit déployer le modèle que si cette métrique est supérieure à 95%.
Notre composant d'évaluation renvoie une chaîne qui indique s'il faut ou non déployer le modèle. Ajoutez le code suivant dans une cellule de notebook pour créer ce composant personnalisé :
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
output_component_file="tabular_eval_component.yaml",
packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
project: str,
location: str, # "us-central1",
api_endpoint: str, # "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str,
model: Input[Artifact],
metrics: Output[Metrics],
metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]): # Return parameter.
import json
import logging
from google.cloud import aiplatform as aip
# Fetch model eval info
def get_eval_info(client, model_name):
from google.protobuf.json_format import MessageToDict
response = client.list_model_evaluations(parent=model_name)
metrics_list = []
metrics_string_list = []
for evaluation in response:
print("model_evaluation")
print(" name:", evaluation.name)
print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
metrics = MessageToDict(evaluation._pb.metrics)
for metric in metrics.keys():
logging.info("metric: %s, value: %s", metric, metrics[metric])
metrics_str = json.dumps(metrics)
metrics_list.append(metrics)
metrics_string_list.append(metrics_str)
return (
evaluation.name,
metrics_list,
metrics_string_list,
)
# Use the given metrics threshold(s) to determine whether the model is
# accurate enough to deploy.
def classification_thresholds_check(metrics_dict, thresholds_dict):
for k, v in thresholds_dict.items():
logging.info("k {}, v {}".format(k, v))
if k in ["auRoc", "auPrc"]: # higher is better
if metrics_dict[k] < v: # if under threshold, don't deploy
logging.info("{} < {}; returning False".format(metrics_dict[k], v))
return False
logging.info("threshold checks passed.")
return True
def log_metrics(metrics_list, metricsc):
test_confusion_matrix = metrics_list[0]["confusionMatrix"]
logging.info("rows: %s", test_confusion_matrix["rows"])
# log the ROC curve
fpr = []
tpr = []
thresholds = []
for item in metrics_list[0]["confidenceMetrics"]:
fpr.append(item.get("falsePositiveRate", 0.0))
tpr.append(item.get("recall", 0.0))
thresholds.append(item.get("confidenceThreshold", 0.0))
print(f"fpr: {fpr}")
print(f"tpr: {tpr}")
print(f"thresholds: {thresholds}")
metricsc.log_roc_curve(fpr, tpr, thresholds)
# log the confusion matrix
annotations = []
for item in test_confusion_matrix["annotationSpecs"]:
annotations.append(item["displayName"])
logging.info("confusion matrix annotations: %s", annotations)
metricsc.log_confusion_matrix(
annotations,
test_confusion_matrix["rows"],
)
# log textual metrics info as well
for metric in metrics_list[0].keys():
if metric != "confidenceMetrics":
val_string = json.dumps(metrics_list[0][metric])
metrics.log_metric(metric, val_string)
# metrics.metadata["model_type"] = "AutoML Tabular classification"
logging.getLogger().setLevel(logging.INFO)
aip.init(project=project)
# extract the model resource name from the input Model Artifact
model_resource_path = model.metadata["resourceName"]
logging.info("model path: %s", model_resource_path)
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
client = aip.gapic.ModelServiceClient(client_options=client_options)
eval_name, metrics_list, metrics_str_list = get_eval_info(
client, model_resource_path
)
logging.info("got evaluation name: %s", eval_name)
logging.info("got metrics list: %s", metrics_list)
log_metrics(metrics_list, metricsc)
thresholds_dict = json.loads(thresholds_dict_str)
deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
if deploy:
dep_decision = "true"
else:
dep_decision = "false"
logging.info("deployment decision is %s", dep_decision)
return (dep_decision,)
Étape 2 : Ajoutez les composants Google Cloud prédéfinis
À cette étape, nous allons définir les autres composants de notre pipeline et découvrir comment ils s'agencent. Tout d'abord, définissez le nom à afficher de votre exécution de pipeline à l'aide d'un code temporel :
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)
Ensuite, copiez le code suivant dans une nouvelle cellule de notebook :
@pipeline(name="automl-tab-beans-training-v2",
pipeline_root=PIPELINE_ROOT)
def pipeline(
bq_source: str = "bq://aju-dev-demos.beans.beans1",
display_name: str = DISPLAY_NAME,
project: str = PROJECT_ID,
gcp_region: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str = '{"auRoc": 0.95}',
):
dataset_create_op = gcc_aip.TabularDatasetCreateOp(
project=project, display_name=display_name, bq_source=bq_source
)
training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
project=project,
display_name=display_name,
optimization_prediction_type="classification",
budget_milli_node_hours=1000,
column_transformations=[
{"numeric": {"column_name": "Area"}},
{"numeric": {"column_name": "Perimeter"}},
{"numeric": {"column_name": "MajorAxisLength"}},
{"numeric": {"column_name": "MinorAxisLength"}},
{"numeric": {"column_name": "AspectRation"}},
{"numeric": {"column_name": "Eccentricity"}},
{"numeric": {"column_name": "ConvexArea"}},
{"numeric": {"column_name": "EquivDiameter"}},
{"numeric": {"column_name": "Extent"}},
{"numeric": {"column_name": "Solidity"}},
{"numeric": {"column_name": "roundness"}},
{"numeric": {"column_name": "Compactness"}},
{"numeric": {"column_name": "ShapeFactor1"}},
{"numeric": {"column_name": "ShapeFactor2"}},
{"numeric": {"column_name": "ShapeFactor3"}},
{"numeric": {"column_name": "ShapeFactor4"}},
{"categorical": {"column_name": "Class"}},
],
dataset=dataset_create_op.outputs["dataset"],
target_column="Class",
)
model_eval_task = classification_model_eval_metrics(
project,
gcp_region,
api_endpoint,
thresholds_dict_str,
training_op.outputs["model"],
)
with dsl.Condition(
model_eval_task.outputs["dep_decision"] == "true",
name="deploy_decision",
):
endpoint_op = gcc_aip.EndpointCreateOp(
project=project,
location=gcp_region,
display_name="train-automl-beans",
)
gcc_aip.ModelDeployOp(
model=training_op.outputs["model"],
endpoint=endpoint_op.outputs["endpoint"],
dedicated_resources_min_replica_count=1,
dedicated_resources_max_replica_count=1,
dedicated_resources_machine_type="n1-standard-4",
)
Voyons ce qui se passe dans ce code:
- Tout d'abord, comme dans notre pipeline précédent, nous définissons les paramètres d'entrée du pipeline. Nous devons les définir manuellement, car elles ne dépendent pas du résultat d'autres étapes du pipeline.
- Le reste du pipeline utilise quelques composants prédéfinis pour interagir avec les services Vertex AI:
TabularDatasetCreateOp
crée un ensemble de données tabulaire dans Vertex AI à partir d'une source d'ensemble de données dans Cloud Storage ou BigQuery. Dans ce pipeline, nous transmettons les données via une URL de table BigQuery.AutoMLTabularTrainingJobRunOp
lance un job d'entraînement AutoML pour un ensemble de données tabulaire. Nous transmettons quelques paramètres de configuration à ce composant, y compris le type de modèle (dans ce cas, classification), des données sur les colonnes, la durée d'exécution souhaitée pour l'entraînement, ainsi qu'un pointeur vers l'ensemble de données. Notez que pour transmettre l'ensemble de données à ce composant, nous fournissons la sortie du composant précédent viadataset_create_op.outputs["dataset"]
.EndpointCreateOp
crée un point de terminaison dans Vertex AI. Le point de terminaison créé à partir de cette étape sera transmis en entrée au composant suivant.ModelDeployOp
déploie un modèle donné vers un point de terminaison dans Vertex AI. Dans ce cas, nous utilisons le point de terminaison créé à l'étape précédente. D'autres options de configuration sont disponibles, mais nous fournissons ici le type et le modèle de machine du point de terminaison que nous souhaitons déployer. Pour transmettre le modèle, nous accédons aux sorties de l'étape d'entraînement de notre pipeline.
- Ce pipeline utilise également la logique conditionnelle, une fonctionnalité de Vertex Pipelines qui vous permet de définir une condition ainsi que des branches différentes selon le résultat de cette condition. N'oubliez pas que lorsque nous avons défini notre pipeline, nous avons transmis un paramètre
thresholds_dict_str
. Il s'agit du seuil de justesse que nous utilisons pour déterminer si le modèle doit être déployé sur un point de terminaison. Pour l'implémenter, nous utilisons la classeCondition
du SDK KFP. La condition que nous transmettons est la sortie du composant d'évaluation personnalisé que nous avons défini plus tôt dans cet atelier de programmation. Si cette condition est vraie, le pipeline continue d'exécuter le composantdeploy_op
. Si la justesse n'atteint pas notre seuil prédéfini, le pipeline s'arrête et ne déploie pas de modèle.
Étape 3 : Compilez et exécutez le pipeline de ML de bout en bout
Maintenant que notre pipeline est entièrement défini, il est temps de le compiler:
compiler.Compiler().compile(
pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)
Ensuite, définissez la tâche:
ml_pipeline_job = aiplatform.PipelineJob(
display_name="automl-tab-beans-training",
template_path="tab_classif_pipeline.json",
pipeline_root=PIPELINE_ROOT,
parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
enable_caching=True
)
Enfin, exécutez le job:
ml_pipeline_job.submit()
Accédez au lien indiqué dans les journaux après avoir exécuté la cellule ci-dessus pour afficher votre pipeline dans la console. L'exécution de ce pipeline prend un peu plus d'une heure. principalement consacrée à l'étape d'entraînement AutoML. Le pipeline terminé doit ressembler à l'exemple suivant :
Si vous activez le bouton "Développer les artefacts" en haut, vous pourrez consulter les détails des différents artefacts créés à partir de votre pipeline. Par exemple, si vous cliquez sur l'artefact dataset
, vous pourrez consulter les détails de l'ensemble de données Vertex AI créé. Vous pouvez cliquer sur le lien affiché pour accéder à la page de cet ensemble de données :
De même, pour consulter les visualisations de métriques résultant de notre composant d'évaluation personnalisé, cliquez sur l'artefact nommé metricsc. Sur le côté droit du tableau de bord, vous pourrez consulter la matrice de confusion du modèle :
Pour voir le modèle et le point de terminaison créés à partir de cette exécution de pipeline, accédez à la section "Modèles", puis cliquez sur le modèle nommé automl-beans
. Le modèle déployé sur un point de terminaison doit s'afficher :
Vous pouvez également accéder à cette page en cliquant sur l'artefact endpoint dans le graphique du pipeline.
En plus de consulter le graphique du pipeline dans la console, vous pouvez également utiliser Vertex Pipelines pour effectuer le suivi de la traçabilité. Le suivi de la traçabilité désigne le suivi des artefacts créés via votre pipeline. Cela peut nous aider à comprendre où les artefacts ont été créés et comment ils sont utilisés dans un workflow de ML. Par exemple, pour consulter le suivi de la traçabilité pour l'ensemble de données créé dans ce pipeline, cliquez sur l'artefact de l'ensemble de données, puis sur Afficher la traçabilité :
Nous voyons ici tous les emplacements où cet artefact est utilisé:
Étape 4 : Comparez les métriques de plusieurs exécutions de pipeline
Si vous exécutez ce pipeline plusieurs fois, vous pouvez chercher à comparer les métriques des différentes exécutions. Vous pouvez utiliser la méthode aiplatform.get_pipeline_df()
pour accéder aux métadonnées des exécutions. Voici comment obtenir les métadonnées de toutes les exécutions du pipeline et les charger dans un DataFrame Pandas :
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df
Vous avez maintenant terminé l'atelier.
🎉 Félicitations ! 🎉
Vous savez désormais utiliser Vertex AI pour :
- Utiliser le SDK Kubeflow Pipelines pour créer des pipelines de bout en bout avec des composants personnalisés
- Exécuter vos pipelines sur Vertex Pipelines et lancer des exécutions de pipeline avec le SDK
- Afficher et analyser votre graphique Vertex Pipelines dans la console
- Utiliser des composants de pipeline prédéfinis pour ajouter des services Vertex AI à votre pipeline
- Planifier des jobs de pipeline récurrents
Pour en savoir plus sur les différents composants de Vertex, consultez la documentation.
7. Nettoyage
Pour éviter que des frais ne vous soient facturés, nous vous recommandons de supprimer les ressources créées tout au long de cet atelier.
Étape 1: Arrêtez ou supprimez votre instance Notebooks
Si vous souhaitez continuer à utiliser le notebook que vous avez créé dans cet atelier, nous vous recommandons de le désactiver quand vous ne vous en servez pas. À partir de l'interface utilisateur de Notebooks dans la console Cloud, sélectionnez le notebook et cliquez sur Arrêter. Si vous souhaitez supprimer définitivement l'instance, sélectionnez Delete (Supprimer).
Étape 2: Supprimez votre point de terminaison
Pour supprimer le point de terminaison que vous avez déployé, accédez à la section Points de terminaison de votre console Vertex AI, puis cliquez sur l'icône de suppression:
Cliquez ensuite sur Désinstaller dans l'invite suivante:
Enfin, accédez à la section Modèles de la console, recherchez le modèle, puis cliquez sur Supprimer le modèle dans le menu à trois points situé à droite:
Étape 3: Supprimez votre bucket Cloud Storage
Pour supprimer le bucket de stockage, utilisez le menu de navigation de la console Cloud pour accéder à Stockage, sélectionnez votre bucket puis cliquez sur "Supprimer" :