Wprowadzenie do Vertex Pipelines

1. Omówienie

W tym module dowiesz się, jak tworzyć i uruchamiać potoki ML za pomocą Vertex Pipelines.

Czego się nauczysz

Poznasz takie zagadnienia jak:

  • Tworzenie skalowalnych potoków ML za pomocą pakietu Kubeflow Pipelines SDK
  • Tworzenie i uruchamianie 3-etapowego wstępnego potoku, który przyjmuje dane wejściowe tekstowe
  • Tworzenie i uruchamianie potoku, który trenuje, ocenia i wdraża model klasyfikacji AutoML
  • Korzystanie z gotowych komponentów do interakcji z usługami Vertex AI, które są dostępne w bibliotece google_cloud_pipeline_components.
  • Planowanie zadania potoku za pomocą Cloud Scheduler

Łączny koszt przeprowadzenia tego laboratorium w Google Cloud wynosi około 25 USD.

2. Wprowadzenie do Vertex AI

W tym module wykorzystano najnowszą ofertę usług AI dostępną w Google Cloud. Vertex AI integruje rozwiązania ML w Google Cloud, zapewniając bezproblemowe środowisko programistyczne. Wcześniej modele wytrenowane za pomocą AutoML i modele niestandardowe były dostępne za pomocą oddzielnych usług. Nowa oferta łączy oba te interfejsy API z innymi nowymi usługami. Możesz też przenieść istniejące projekty do Vertex AI.

Oprócz usług trenowania i wdrażania modeli Vertex AI obejmuje też wiele usług MLOps, takich jak Vertex Pipelines (omawiana w tym module), Model Monitoring, Feature Store i inne. Wszystkie usługi Vertex AI znajdziesz na diagramie poniżej.

Omówienie usługi Vertex

Jeśli chcesz podzielić się opinią, odwiedź stronę pomocy.

Dlaczego potoki ML są przydatne?

Zanim przejdziemy do szczegółów, wyjaśnijmy, dlaczego warto używać potoku. Wyobraź sobie, że tworzysz przepływ pracy ML, który obejmuje przetwarzanie danych, trenowanie modelu, dostrajanie hiperparametrów, ocenę i wdrażanie modelu. Każdy z tych kroków może mieć różne zależności, które mogą stać się nieporęczne, jeśli cały przepływ pracy jest traktowany jak monolit. Gdy zaczniesz wdrażać proces uczenia maszynowego na większą skalę, możesz udostępnić przepływ pracy związanej z uczeniem maszynowym innym członkom zespołu, aby mogli go uruchomić i wnosić poprawki do kodu. Bez niezawodnego, powtarzalnego procesu może to być trudne. W przypadku potoków każdy krok procesu ML jest oddzielnym kontenerem. Dzięki temu możesz opracowywać kroki niezależnie od siebie i śledzić dane wejściowe i wyjściowe z każdego kroku w powtarzalny sposób. Możesz też zaplanować lub uruchomić uruchomienie potoku na podstawie innych zdarzeń w środowisku Cloud, np. uruchomić uruchomienie potoku, gdy dostępne są nowe dane szkoleniowe.

Podsumowanie: potoki pomagają automatyzować i odtwarzać przepływ pracy związany z systemami uczącymi się.

3. Konfigurowanie środowiska w chmurze

Aby uruchomić to ćwiczenie, musisz mieć projekt Google Cloud Platform z włączonym rozliczeniem. Aby utworzyć projekt, postępuj zgodnie z tymi instrukcjami.

Krok 1. Uruchom Cloud Shell

W tym module będziesz pracować w sesji Cloud Shell, czyli w interpreterze poleceń hostowanym przez maszynę wirtualną działającą w chmurze Google. Możesz też uruchomić tę sekcję lokalnie na swoim komputerze, ale korzystanie z Cloud Shell daje wszystkim dostęp do powtarzalnego środowiska w spójnym środowisku. Po zakończeniu ćwiczeń możesz ponownie wykonać tę sekcję na swoim komputerze.

Autoryzuj Cloud Shell

Aktywowanie Cloud Shell

W prawym górnym rogu konsoli Cloud Console kliknij przycisk poniżej, aby aktywować Cloud Shell:

Aktywowanie Cloud Shell

Jeśli nigdy wcześniej nie korzystałeś(-aś) z Cloud Shell, zobaczysz ekran pośredni (poniżej zgięcia), który wyjaśnia, czym jest to środowisko. W takim przypadku kliknij Dalej (nie zobaczysz go już więcej). Oto jak wygląda ekran jednorazowy:

Konfiguracja Cloud Shell

Uproszczenie i połączenie z Cloud Shell powinno zająć tylko kilka chwil.

Inicjowanie Cloud Shell

Ta maszyna wirtualna ma wszystkie potrzebne narzędzia dla programistów. Zawiera stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie poprawia wydajność sieci i uwierzytelnianie. Większość, jeśli nie wszystkie, czynności w tym CodeLab możesz wykonać w przeglądarce lub na Chromebooku.

Po połączeniu z Cloud Shell powinieneś zobaczyć, że jesteś już uwierzytelniony i że projekt jest już ustawiony na identyfikator Twojego projektu.

Aby potwierdzić uwierzytelnianie, uruchom w Cloud Shell to polecenie:

gcloud auth list

W wyjściu polecenia powinna pojawić się informacja podobna do tej:

Dane wyjściowe Cloud Shell

Uruchom to polecenie w Cloud Shell, aby sprawdzić, czy polecenie gcloud zna Twój projekt:

gcloud config list project

Wynik polecenia

[core]
project = <PROJECT_ID>

Jeśli nie, możesz ustawić je za pomocą tego polecenia:

gcloud config set project <PROJECT_ID>

Wynik polecenia

Updated property [core/project].

Cloud Shell ma kilka zmiennych środowiskowych, w tym GOOGLE_CLOUD_PROJECT, która zawiera nazwę naszego bieżącego projektu Cloud. Będziemy go używać w różnych miejscach w tym module. Aby go wyświetlić, wykonaj jedną z tych czynności:

echo $GOOGLE_CLOUD_PROJECT

Krok 2. Włącz interfejsy API

W późniejszych krokach zobaczysz, gdzie te usługi są potrzebne (i dlaczego). Na razie uruchom to polecenie, aby przyznać projektowi dostęp do usług Compute Engine, Container Registry i Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Powinien wyświetlić się komunikat o udanym przeprowadzeniu operacji podobny do tego:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Krok 3. Utwórz zasobnik Cloud Storage

Aby uruchomić zadanie trenowania w Vertex AI, potrzebujemy zasobnika Cloud Storage do przechowywania zapisanych komponentów modelu. Zasobnik musi być regionalny. Tutaj używamy regionu us-central, ale możesz użyć innego (wystarczy go zastąpić w tym laboratorium). Jeśli masz już taki folder, możesz pominąć ten krok.

Aby utworzyć zasobnik, uruchom w terminalu Cloud Shell te polecenia:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Następnie przyznamy temu kontu usługi obliczeniowej dostęp do tego zasobnika. Dzięki temu usługa Vertex Pipelines będzie mieć wymagane uprawnienia do zapisywania plików w tym zasobniku. Aby dodać to uprawnienie, uruchom to polecenie:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Krok 4. Utwórz instancję Vertex AI Workbench

W konsoli Cloud w sekcji Vertex AI kliknij Workbench:

Menu Vertex AI

Następnie na karcie Notatniki zarządzane przez użytkownika kliknij Nowy notatnik:

Tworzenie nowego notatnika

Następnie wybierz typ instancji TensorFlow Enterprise 2.3 (z LTS) bez procesorów graficznych:

Instancja TFE

Użyj domyślnych opcji i kliknij Utwórz.

Krok 5. Otwórz notatnik

Po utworzeniu instancji kliknij Otwórz JupyterLab:

Otwórz notatnik

4. Konfiguracja Vertex Pipelines

Istnieje kilka dodatkowych bibliotek, które musimy zainstalować, aby używać Vertex Pipelines:

  • Kubeflow Pipelines: to pakiet SDK, którego użyjemy do tworzenia potoku. Vertex Pipelines obsługuje uruchamianie ścieżek utworzonych za pomocą Kubeflow Pipelines lub TFX.
  • Komponenty potoku Google Cloud: ta biblioteka zawiera gotowe komponenty, które ułatwiają interakcję z usługami Vertex AI z poziomu kroków potoku.

Krok 1. Utwórz notatnik w Pythonie i zainstaluj biblioteki

Najpierw w menu Menu z aplikacjami w instancji notatnika utwórz notatnik, wybierając Python 3:

Tworzenie notatnika w Pythonie 3

Aby otworzyć menu menu, kliknij znak + w lewym górnym rogu instancji notebooka.

Aby zainstalować obie usługi, których użyjemy w tym laboratorium, najpierw ustaw flagę użytkownika w komórce notebooka:

USER_FLAG = "--user"

Następnie w notatniku wykonaj te czynności:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Po zainstalowaniu tych pakietów musisz ponownie uruchomić jądro:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Na koniec sprawdź, czy pakiety zostały poprawnie zainstalowane. Wersja pakietu SDK KFP powinna być większa niż 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Krok 2. Ustaw identyfikator projektu i zasobnik

W tym laboratorium będziesz używać identyfikatora projektu Cloud i utworzonego wcześniej zasobnika. Następnie utworzymy zmienne dla każdego z nich.

Jeśli nie znasz identyfikatora projektu, możesz go uzyskać, wykonując te czynności:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

W przeciwnym razie ustaw je tutaj:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Następnie utwórz zmienną do przechowywania nazwy zasobnika. Jeśli utworzysz go w tym module, wykonaj te czynności. W przeciwnym razie musisz ustawić je ręcznie:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Krok 3. Zaimportuj biblioteki

Aby zaimportować biblioteki, których będziemy używać w tym ćwiczeniu, dodaj:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Krok 4. Zdefiniuj stałe

Ostatnią rzeczą, którą musimy zrobić przed utworzeniem potoku, jest zdefiniowanie stałych zmiennych. PIPELINE_ROOT to ścieżka do Cloud Storage, na której będą zapisywane artefakty utworzone przez nasz potok. Tutaj używamy regionu us-central1, ale jeśli podczas tworzenia zasobnika został użyty inny region, zaktualizuj zmienną REGION w poniższym kodzie:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Po uruchomieniu powyższego kodu powinien zostać wydrukowany katalog główny potoku. To jest lokalizacja w Cloud Storage, w której będą zapisywane artefakty z Twojego potoku. Numer ten ma format gs://YOUR-BUCKET-NAME/pipeline_root/.

5. Tworzenie pierwszego potoku

Aby poznać działanie Vertex Pipelines, najpierw utworzymy krótki potok za pomocą pakietu SDK KFP. Ten potok nie robi niczego związanego z systemami uczącymi się (bez obaw – dotrzemy do tego celu!), ale używamy go, aby nauczyć Cię:

  • Jak tworzyć komponenty niestandardowe w pakiecie SDK KFP
  • Jak uruchamiać i monitorować potok w Vertex Pipelines

Utworzymy przepływ danych, który wypisuje zdanie z 2 wyjściami: nazwą produktu i opisem w emoji. Potok składa się z 3 komponentów:

  • product_name: ten komponent przyjmuje jako dane wejściowe nazwę produktu (lub dowolny rzeczownik) i zwraca ten ciąg znaków jako dane wyjściowe.
  • emoji: ten komponent przekształci opis tekstowy w emotikon. Na przykład kod tekstowy dla ✨ to „sparkles”. Ten komponent wykorzystuje bibliotekę emotikonów, aby pokazać, jak zarządzać zależnościami zewnętrznymi w potoku
  • build_sentence: Ten ostatni komponent wykorzysta dane wyjściowe z poprzednich 2 komponentów, aby utworzyć zdanie z użyciem emotikonu. Na przykład dane wyjściowe mogą wyglądać tak: „Vertex Pipelines to ✨”.

Zacznijmy kodować.

Krok 1. Utwórz komponent oparty na funkcji Pythona

Za pomocą pakietu SDK KFP możemy tworzyć komponenty na podstawie funkcji Pythona. Użyjemy go w przypadku 3 komponentów w pierwszym łańcuchu. Najpierw utworzymy komponent product_name, który po prostu przyjmuje ciąg znaków jako dane wejściowe i zwraca ten ciąg. Dodaj do notatnika te elementy:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Przyjrzyjmy się bliżej tej składni:

  • Dekorator @component kompiluje tę funkcję do komponentu podczas uruchamiania potoku. Używasz go za każdym razem, gdy piszesz niestandardowy komponent.
  • Parametr base_image określa obraz kontenera, którego będzie używać ten komponent.
  • Parametr output_component_file jest opcjonalny i określa plik YAML, do którego ma zostać zapisany skompilowany komponent. Po uruchomieniu komórki powinien być widoczny plik zapisany w instancji notatnika. Jeśli chcesz udostępnić ten komponent innej osobie, możesz wysłać wygenerowany plik yaml i poprosić o jego załadowanie:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Wartość -> str po definicji funkcji określa typ danych wyjściowych tego komponentu.

Krok 2. Utwórz 2 dodatkowe komponenty

Aby ukończyć ścieżkę, utworzymy jeszcze 2 komponenty. Pierwsza zdefiniowana przez nas funkcja przyjmuje jako argument ciąg znaków i konwertuje go na odpowiedni emotikon, jeśli taki istnieje. Zwraca ona tuplę z przekazanym tekstem wejściowym i wynikiem emotikonu:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Ten komponent jest nieco bardziej skomplikowany niż poprzedni. Oto nowości:

  • Parametr packages_to_install informuje komponent o zewnętrznych bibliotekach, których potrzebuje ten kontener. W tym przypadku używamy biblioteki o nazwie emoji.
  • Ten komponent zwraca obiekt NamedTuple o nazwie Outputs. Zwróć uwagę, że każdy z ciągów w tej tupla ma klucze: emoji_textemoji. Użyjemy ich w następnym komponencie, aby uzyskać dostęp do danych wyjściowych.

Ostatni komponent tego potoku pobierze dane wyjściowe z 2 pierwszych i połączy je, aby zwrócić ciąg znaków:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Być może zastanawiasz się, skąd ten komponent wie, że należy użyć danych wyjściowych uzyskanych w poprzednich krokach, które określiłeś. Świetne pytanie! W następnym kroku powiążemy wszystkie dane.

Krok 3. Łączenie komponentów w proces

Zdefiniowane powyżej definicje komponentów utworzyły funkcje fabryczne, których można używać w definicji potoku do tworzenia kroków. Aby skonfigurować potok, użyj dekoratora @pipeline, nadaj potokowi nazwę i opis, a także podaj ścieżkę główną, w której powinny być zapisywane artefakty potoku. Przez „artefakty” rozumiemy wszystkie pliki wyjściowe wygenerowane przez Twój potok. Ten wstępny potok nie generuje żadnych, ale nasz następny już tak.

W następnym bloku kodu definiujemy funkcję intro_pipeline. Tutaj określamy dane wejściowe do początkowych kroków potoku i sposób ich wzajemnego łączenia:

  • product_task pobiera nazwę produktu jako dane wejściowe. Tutaj przekazujemy „Vertex Pipelines”, ale możesz zmienić tę nazwę na dowolną inną.
  • emoji_task pobiera kod tekstowy emotikona jako dane wejściowe. Możesz też zmienić tę wartość na dowolną inną. Na przykład „party_face” odnosi się do emotikonu 🥳. Pamiętaj, że zarówno ten, jak i komponent product_task nie mają żadnych kroków, które dostarczają im danych wejściowych, więc podczas definiowania potoku ręcznie podajemy dane wejściowe dla tych komponentów.
  • Ostatni krok naszego potoku – consumer_task ma 3 parametry wejściowe:
    • Wynik funkcji product_task. Ponieważ ten krok generuje tylko 1 wyjście, możemy się do niego odwoływać za pomocą product_task.output.
    • emoji z kroku emoji_task. Zobacz komponent emoji zdefiniowany powyżej, w którym nazwaliśmy parametry wyjściowe.
    • Podobnie wyjście o nazwie emoji_text z komponentu emoji. Jeśli do naszego potoku zostanie przekazany tekst, który nie odpowiada emotikonowi, zostanie on użyty do zbudowania zdania.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Krok 4. Kompilowanie i uruchamianie potoku

Po zdefiniowaniu potoku możesz go skompilować. Poniższe polecenie wygeneruje plik JSON, którego użyjesz do uruchomienia potoku:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Następnie utwórz zmienną TIMESTAMP. Użyjemy go w identyfikatorze zadania:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Następnie zdefiniuj zadanie potoku:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Na koniec uruchom zadanie, aby utworzyć nowe wykonanie potoku:

job.submit()

Po uruchomieniu tej komórki powinny wyświetlić się logi z linkiem umożliwiającym wyświetlenie uruchomionego potoku w konsoli:

Logi zadań potoku

Otwórz odpowiedni link. Po zakończeniu potok powinien wyglądać tak:

Ukończono potok wprowadzenia

Uruchomienie tego potoku potrwa 5–6 minut. Po zakończeniu możesz kliknąć komponent build-sentence, aby zobaczyć wynik końcowy:

Wyjście z poziomu wprowadzenia

Znasz już teraz, jak działają KFP SDK i Vertex Pipelines, więc możesz utworzyć potok, który utworzy i wdroży model ML przy użyciu innych usług Vertex AI. Zaczynamy.

6. Tworzenie kompleksowego potoku ML

Czas stworzyć pierwszy potok uczenia maszynowego. W tym potoku użyjemy zbioru danych „Dry beans” z platformy UCI Machine Learning, który pochodzi z artykułu KOKLU, M. i OZKAN, I.A., (2020), „Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques” [Wieloklasowa klasyfikacja fasoli przy użyciu technik rozpoznawania obrazów i uczenia maszynowego], „In Computers and Electronics in Agriculture”, 174, 105507. DOI.

Jest to zbiór danych tabelarycznych, którego użyjemy w naszej ścieżce do wytrenowania, oceny i wdrażania modelu AutoML, który na podstawie cech klasyfikuje ziarna do jednej z 7 grup.

Ten potok:

  • Utwórz zbiór danych
  • Trenowanie tabelarycznego modelu klasyfikacji przy użyciu AutoML
  • Pobieranie danych oceny tego modelu
  • Na podstawie wskaźników oceny zdecyduj, czy wdrożyć model za pomocą logiki warunkowej w Vertex Pipelines
  • Wdrożenie modelu w punkcie końcowym za pomocą Vertex Prediction

Każdy z wymienionych kroków będzie elementem. Większość kroków w pipeline’u będzie używać gotowych komponentów usług Vertex AI za pomocą biblioteki google_cloud_pipeline_components zaimportowanej wcześniej w tym laboratorium kodu. W tej sekcji najpierw zdefiniujemy jeden komponent niestandardowy. Następnie zdefiniujemy pozostałe kroki potoku za pomocą gotowych komponentów. Gotowe komponenty ułatwiają dostęp do usług Vertex AI, takich jak trenowanie i wdrażanie modeli.

Krok 1. Komponent niestandardowy do oceny modelu

Zdefiniowany przez nas komponent niestandardowy będzie używany pod koniec potoku po zakończeniu trenowania modelu. Ten komponent będzie wykonywać kilka czynności:

  • Pobieranie danych oceny z trenowanego modelu klasyfikacji AutoML
  • Przeanalizuj wskaźniki i wyrenderuj je w interfejsie Vertex Pipelines
  • Porównaj dane z wartością progową, aby określić, czy model należy wdrożyć.

Zanim zdefiniujemy komponent, poznaj jego parametry wejściowe i wyjściowe. Na potrzeby tego potoku używamy metadanych projektu Cloud, wytrenowanego modelu (zdefiniujemy go później), wskaźników oceny modelu i elementu thresholds_dict_str. thresholds_dict_str to coś, co definiujemy podczas uruchamiania potoku. W przypadku tego modelu klasyfikacji jest to obszar pod wartością krzywej ROC, dla którego należy wdrożyć model. Jeśli na przykład przekażesz wartość 0,95, oznacza to, że nasz potok powinien wdrażać model tylko wtedy, gdy wartość tego parametru przekracza 95%.

Nasz komponent oceny zwraca ciąg znaków wskazujący, czy należy wdrożyć model. Aby utworzyć ten komponent niestandardowy, dodaj w komórce notatnika ten kod:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Krok 2. Dodaj gotowe komponenty Google Cloud

W tym kroku zdefiniujemy pozostałe komponenty potoku i sprawdzimy, jak się ze sobą łączą. Najpierw określ wyświetlaną nazwę uruchomienia potoku za pomocą sygnatury czasowej:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Następnie skopiuj ten kod do nowej komórki notatnika:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Zobaczmy, co się dzieje w tym kodzie:

  • Najpierw, podobnie jak w poprzednim strumieniu, definiujemy parametry wejściowe tego potoku. Musimy je ustawić ręcznie, ponieważ nie zależą od wyników innych kroków w pipeline.
  • Pozostała część potoku korzysta z kilku wstępnie utworzonych komponentów do interakcji z usługami Vertex AI:
    • TabularDatasetCreateOp tworzy tabelaryczny zbiór danych w Vertex AI ze źródłem zbioru danych w Cloud Storage lub BigQuery. W tym potoku dane są przekazywane za pomocą adresu URL tabeli BigQuery.
    • AutoMLTabularTrainingJobRunOp uruchamia zadanie trenowania AutoML dla tabelarycznego zbioru danych. Do tego komponentu przekazujemy kilka parametrów konfiguracji, w tym typ modelu (w tym przypadku klasyfikacja), niektóre dane dotyczące kolumn, czas trwania trenowania oraz wskaźnik do zbioru danych. Uwaga: aby przekazać zbiór danych do tego komponentu, udostępniamy dane wyjściowe poprzedniego komponentu w narzędziu dataset_create_op.outputs["dataset"].
    • EndpointCreateOp tworzy punkt końcowy w Vertex AI. Punkt końcowy utworzony w tym kroku zostanie przekazany jako dane wejściowe do następnego komponentu
    • ModelDeployOp wdraża dany model w punkcie końcowym w Vertex AI. W tym przypadku użyjemy punktu końcowego utworzonego w poprzednim kroku. Dostępne są dodatkowe opcje konfiguracji, ale podajemy tutaj typ maszyny i model punktu końcowego, które chcemy wdrożyć. Model jest przekazywany przez dostęp do danych wyjściowych kroku trenowania w potoku.
  • Ten potok korzysta również z logiki warunkowej – funkcji Vertex Pipelines, która umożliwia zdefiniowanie warunku oraz różnych gałęzi na podstawie jego wyniku. Pamiętaj, że podczas definiowania potoku przekazaliśmy parametr thresholds_dict_str. Jest to próg dokładności, którego używamy do określania, czy wdrażać model w punkcie końcowym. Aby wdrożyć to rozwiązanie, używamy klasy Condition z pakietu KFP SDK. Warunek, który przekazujemy, to dane wyjściowe niestandardowego komponentu oceny zdefiniowanego wcześniej w tym samouczku. Jeśli ten warunek jest spełniony, potok będzie nadal wykonywać komponent deploy_op. Jeśli dokładność nie osiągnie wstępnie zdefiniowanego progu, potok zatrzyma się w tym miejscu i nie wdroży modelu.

Krok 3. Zbuduj i uruchom kompleksowy potok ML

Po zdefiniowaniu całego potoku możesz go skompilować:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Następnie określ zadanie:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Na koniec uruchom zadanie:

ml_pipeline_job.submit()

Aby wyświetlić potok w konsoli, kliknij link widoczny w logach po uruchomieniu komórki powyżej. Uruchomienie tego potoku zajmie nieco ponad godzinę. Większość czasu zajmuje etap trenowania AutoML. Ukończony potok będzie wyglądać mniej więcej tak:

Zakończony potok AutoML

Jeśli przełączysz przycisk „Rozwiń artefakty” u góry, zobaczysz szczegóły różnych artefaktów utworzonych w Twoim procesie. Jeśli na przykład klikniesz element dataset, zobaczysz szczegóły utworzonego zbioru danych Vertex AI. Możesz kliknąć ten link, aby przejść do strony tego zbioru danych:

Zbiór danych potoku

Aby zobaczyć wizualizacje danych z naszego niestandardowego komponentu oceny, kliknij element o nazwie metricsc. Po prawej stronie panelu znajdziesz matrycę błędów tego modelu:

Wizualizacja danych

Aby wyświetlić model i punkt końcowy utworzone na podstawie tego przebiegu potoku danych, otwórz sekcję Modele i kliknij model o nazwie automl-beans. Powinieneś zobaczyć ten model wdrożony w punkcie końcowym:

Model – punkt końcowy

Dostęp do tej strony możesz też uzyskać, klikając artefakt punktu końcowego na wykresie potoku.

Oprócz przeglądania wykresu potoku w konsoli możesz też używać Vertex Pipelines do śledzenia pochodzenia. Śledzenie pochodzenia oznacza śledzenie artefaktów utworzonych w trakcie przetwarzania. Może to pomóc nam zrozumieć, gdzie zostały utworzone artefakty i jak są one używane w przepływie pracy związanym z systemami uczącymi się. Aby na przykład wyświetlić śledzenie pochodzenia zbioru danych utworzonego w tym potoku, kliknij element zbioru danych, a potem Wyświetl pochodzenie:

Wyświetl historię danych

Widzimy tu wszystkie miejsca, w których jest używany dany artefakt:

Szczegóły historii danych

Krok 4. Porównywanie danych w ramach uruchomień potoku

Jeśli uruchamiasz ten potok wiele razy, warto porównać wskaźniki między uruchomieniami. Aby uzyskać dostęp do metadanych przebiegu, możesz użyć metody aiplatform.get_pipeline_df(). Tutaj pobieramy metadane wszystkich uruchomień tego potoku i wczytujemy je do struktury DataFrame biblioteki Pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

To już koniec tego modułu.

🎉 Gratulacje! 🎉

Wiesz już, jak używać Vertex AI do:

  • Korzystanie z pakietu Kubeflow Pipelines SDK do tworzenia kompleksowych ścieżek z komponentami niestandardowymi
  • Uruchamianie potoków w usłudze Vertex Pipelines i uruchamianie ich uruchomień za pomocą pakietu SDK
  • Wyświetlanie i analizowanie wykresu Vertex Pipelines w konsoli
  • Dodawanie usług Vertex AI do potoku za pomocą gotowych komponentów
  • Planowanie cyklicznych zadań potoku

Więcej informacji o różnych częściach Vertex znajdziesz w dokumentacji.

7. Czyszczenie

Aby uniknąć opłat, zalecamy usunięcie zasobów utworzonych w tym module.

Krok 1. Zatrzymaj lub usuń instancję Notebooks

Jeśli chcesz nadal korzystać z notatnika utworzonego w tym module, zalecamy wyłączenie go, gdy nie jest używany. W interfejsie notatników w konsoli Cloud wybierz notatnik, a następnie kliknij Zatrzymaj. Jeśli chcesz całkowicie usunąć instancję, kliknij Usuń:

Zatrzymaj instancję

Krok 2. Usuń punkt końcowy

Aby usunąć wdrożony punkt końcowy, otwórz sekcję Punkty końcowe w konsoli Vertex AI i kliknij ikonę usuwania:

Usuń punkt końcowy

Następnie w oknie wyświetlonym po kliknięciu Undeploy (Cofnij wdrożenie) kliknij Undeploy (Cofnij wdrożenie):

Wycofaj wdrożenie modelu

Na koniec przejdź w konsoli do sekcji Modele, znajdź ten model i w menu z 3 kropkami po prawej stronie kliknij Usuń model:

Usuń model

Krok 3. Usuń zasobnik Cloud Storage

Aby usunąć zasobnik na dane, w menu nawigacyjnym w konsoli Cloud przejdź do Cloud Storage, wybierz swój zasobnik i kliknij Usuń:

Usuń miejsce na dane