Wprowadzenie do Vertex Pipelines

1. Omówienie

W tym module dowiesz się, jak tworzyć i uruchamiać potoki ML przy użyciu Vertex Pipelines.

Czego się nauczysz

Poznasz takie zagadnienia jak:

  • Tworzenie skalowalnych potoków ML za pomocą pakietu Kubeflow Pipelines SDK
  • Utwórz i uruchom 3-etapowy potok wprowadzający, który wymaga wprowadzania tekstu
  • Tworzenie i uruchamianie potoku, który trenuje, ocenia i wdraża model klasyfikacji AutoML
  • Używanie gotowych komponentów do interakcji z usługami Vertex AI dostępnych w bibliotece google_cloud_pipeline_components
  • Zaplanuj zadanie potoku za pomocą usługi Cloud Scheduler

Całkowity koszt uruchomienia tego modułu w Google Cloud wynosi około 25 USD.

2. Wprowadzenie do Vertex AI

W tym module wykorzystano najnowszą ofertę usług AI dostępną w Google Cloud. Vertex AI integruje ofertę systemów uczących się z całego Google Cloud, tworząc bezproblemowe środowisko programistyczne. Wcześniej modele wytrenowane z użyciem AutoML i modele niestandardowe były dostępne w oddzielnych usługach. Nowa oferta jest łączona w 1 interfejs API wraz z innymi nowymi usługami. Możesz też przenieść istniejące projekty do Vertex AI.

Oprócz usług trenowania i wdrażania modeli Vertex AI oferuje również różne usługi MLOps, w tym Vertex Pipelines (które omawiamy w tym module), monitorowanie modeli i Feature Store. Wszystkie oferty usług Vertex AI możesz zobaczyć na schemacie poniżej.

Omówienie usługi Vertex

Jeśli masz jakieś uwagi, odwiedź stronę pomocy.

Dlaczego potoki systemów uczących się są przydatne?

Zanim przejdziemy do szczegółów, zastanówmy się, dlaczego warto używać potoku. Załóżmy, że tworzysz przepływ pracy ML, który obejmuje przetwarzanie danych, trenowanie modelu, dostrajanie hiperparametrów, ocenę i wdrażanie modelu. Każdy z tych kroków może mieć różne zależności, które mogą stać się nieporęczne, jeśli cały przepływ pracy jest traktowany jak monolit. Gdy zaczniesz skalować proces ML, warto udostępnić ten przepływ pracy innym członkom Twojego zespołu, aby mogli go uruchomić i współtworzyć kod. Bez niezawodnego, powtarzalnego procesu może to być trudne. W przypadku potoków każdy krok procesu ML jest oddzielnym kontenerem. Dzięki temu możesz rozwijać kroki niezależnie oraz śledzić w sposób powtarzalny dane wejściowe i wyjściowe z każdego z nich. Możesz też planować lub aktywować uruchomienia potoku na podstawie innych zdarzeń w środowisku Cloud, takich jak uruchamianie uruchomienia potoku po udostępnieniu nowych danych do trenowania.

The tl;dr: potoki pomagają automatyzować i odtwarzać przepływ pracy ML.

3. Konfiguracja środowiska Cloud

Aby uruchomić to ćwiczenia z programowania, musisz mieć projekt Google Cloud Platform z włączonymi płatnościami. Aby utworzyć projekt, postępuj zgodnie z tymi instrukcjami.

Krok 1. Uruchom Cloud Shell

W tym module będziesz pracować w sesji Cloud Shell, która jest tłumaczem poleceń hostowanym przez maszynę wirtualną działającą w chmurze Google. Możesz równie łatwo uruchomić tę sekcję lokalnie na swoim komputerze, ale użycie Cloud Shell zapewni wszystkim dostęp do powtarzalnego środowiska w spójnym środowisku. Po ukończeniu modułu możesz powtórzyć tę sekcję na swoim komputerze.

Autoryzuj Cloud Shell

Aktywowanie Cloud Shell

W prawym górnym rogu konsoli Cloud kliknij przycisk poniżej, aby aktywować Cloud Shell:

Aktywowanie Cloud Shell

Jeśli dopiero zaczynasz korzystać z Cloud Shell, wyświetli się ekran pośredni (w części strony widocznej po przewinięciu) z opisem tej funkcji. W takim przypadku kliknij Dalej (nie zobaczysz go więcej). Tak wygląda ten jednorazowy ekran:

Konfiguracja Cloud Shell

Uzyskanie dostępu do Cloud Shell i połączenie się z nim powinno zająć tylko kilka chwil.

Inicjowanie Cloud Shell

Ta maszyna wirtualna ma wszystkie potrzebne narzędzia dla programistów. Zawiera stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i uwierzytelnianie. Większość czynności z tego ćwiczenia z programowania można wykonać w przeglądarce lub na Chromebooku.

Po nawiązaniu połączenia z Cloud Shell powinno pojawić się informacja, że użytkownik jest już uwierzytelniony i że projekt jest już ustawiony na identyfikator Twojego projektu.

Uruchom to polecenie w Cloud Shell, aby potwierdzić, że jesteś uwierzytelniony:

gcloud auth list

W danych wyjściowych polecenia powinno pojawić się coś takiego:

Dane wyjściowe Cloud Shell

Uruchom to polecenie w Cloud Shell, aby sprawdzić, czy polecenie gcloud zna Twój projekt:

gcloud config list project

Dane wyjściowe polecenia

[core]
project = <PROJECT_ID>

Jeśli tak nie jest, możesz go ustawić za pomocą tego polecenia:

gcloud config set project <PROJECT_ID>

Dane wyjściowe polecenia

Updated property [core/project].

Cloud Shell ma kilka zmiennych środowiskowych, w tym GOOGLE_CLOUD_PROJECT, która zawiera nazwę naszego bieżącego projektu Cloud. Wykorzystamy to w różnych miejscach tego modułu. Możesz go sprawdzić, uruchamiając polecenie:

echo $GOOGLE_CLOUD_PROJECT

Krok 2. Włącz interfejsy API

W późniejszych krokach zobaczysz, gdzie te usługi są potrzebne (i dlaczego). Na razie uruchom to polecenie, aby przyznać projektowi dostęp do usług Compute Engine, Container Registry i Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Powinien wyświetlić się komunikat o powodzeniu podobny do tego:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Krok 3. Utwórz zasobnik Cloud Storage

Aby uruchomić zadanie trenowania w Vertex AI, potrzebujesz zasobnika na dane, w którym będą przechowywane zapisane zasoby modelu. Zasobnik musi być regionalny. Używamy tutaj regionu us-central, ale możesz użyć innego regionu (po prostu zastąp go w tym module). Jeśli masz już zasobnik, możesz pominąć ten krok.

Aby utworzyć zasobnik, uruchom te polecenia w terminalu Cloud Shell:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Następnie damy kontu usługi Compute dostęp do tego zasobnika. Dzięki temu Vertex Pipelines będzie mieć wymagane uprawnienia do zapisywania plików w tym zasobniku. Aby dodać to uprawnienie, uruchom to polecenie:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Krok 4. Utwórz instancję Vertex AI Workbench

W sekcji Vertex AI w konsoli Cloud kliknij Workbench:

Menu Vertex AI

W sekcji Notatniki zarządzane przez użytkownika kliknij Nowy notatnik:

Utwórz nowy notatnik

Następnie wybierz typ instancji TensorFlow Enterprise 2.3 (z LTS) bez GPU:

Instancja TFE

Użyj opcji domyślnych i kliknij Utwórz.

Krok 5. Otwórz notatnik

Po utworzeniu instancji wybierz Otwórz JupyterLab:

Otwórz notatnik

4. Konfiguracja Vertex Pipelines

Istnieje kilka dodatkowych bibliotek, które musimy zainstalować, aby używać Vertex Pipelines:

  • Kubeflow Pipelines: to pakiet SDK, którego użyjemy do utworzenia naszego potoku. Vertex Pipelines obsługuje uruchomione potoki utworzone w Kubeflow Pipelines i TFX.
  • Komponenty Google Cloud Pipeline: ta biblioteka zawiera gotowe komponenty, które ułatwiają interakcję z usługami Vertex AI na etapach potoku.

Krok 1. Utwórz notatnik w Pythonie i zainstaluj biblioteki

Najpierw w menu Menu z aplikacjami w instancji notatnika utwórz notatnik, wybierając Pythona 3:

Utwórz notatnik w Pythonie3

Aby otworzyć menu Menu z aplikacjami, kliknij znak + w lewym górnym rogu instancji notatnika.

Aby zainstalować obie usługi, których będziemy używać w tym module, najpierw ustaw flagę użytkownika w komórce notatnika:

USER_FLAG = "--user"

Następnie uruchom w notatniku to polecenie:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Po zainstalowaniu tych pakietów musisz ponownie uruchomić jądro:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Na koniec sprawdź, czy pakiety zostały poprawnie zainstalowane. Wersja pakietu SDK KFP powinna być większa niż 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Krok 2. Ustaw identyfikator projektu i zasobnik

W trakcie tego modułu będziesz się odwoływać do identyfikatora projektu Cloud i utworzonego wcześniej zasobnika. Następnie utworzymy zmienne dla każdego z nich.

Jeśli nie znasz identyfikatora projektu, możesz go uzyskać, uruchamiając to polecenie:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

W przeciwnym razie ustaw go tutaj:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Następnie utwórz zmienną do przechowywania nazwy zasobnika. Jeśli ten moduł został utworzony w tym module, podane niżej rozwiązania będą skuteczne. W przeciwnym razie musisz go ustawić ręcznie:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Krok 3. Zaimportuj biblioteki

Dodaj ten kod, aby zaimportować biblioteki, których będziemy używać w tym ćwiczeniu z programowania:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Krok 4. Zdefiniuj stałe

Ostatnią rzeczą, jaką musimy zrobić przed utworzeniem potoku, jest zdefiniowanie zmiennych stałych. PIPELINE_ROOT to ścieżka w Cloud Storage, w której będą zapisywane artefakty utworzone przez nasz potok. Używamy tu regionu us-central1, ale jeśli podczas tworzenia zasobnika używasz innego regionu, zaktualizuj zmienną REGION w tym kodzie:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Po uruchomieniu powyższego kodu powinien wyświetlić się katalog główny potoku. To jest lokalizacja w Cloud Storage, w której będą zapisywane artefakty z Twojego potoku. Będzie w formacie gs://YOUR-BUCKET-NAME/pipeline_root/

5. Tworzę pierwszy potok

Aby poznać działanie Vertex Pipelines, najpierw utworzymy krótki potok za pomocą pakietu SDK KFP. Ten potok nie robi niczego związanego z systemami uczącymi się (bez obaw – dotrzemy do tego celu!), ale używamy go, aby nauczyć Cię:

  • Tworzenie komponentów niestandardowych w pakiecie SDK KFP
  • Jak uruchamiać i monitorować potok w Vertex Pipelines

Utworzymy potok, który wyświetla zdanie przy użyciu 2 danych wyjściowych: nazwy produktu i opisu emotikona. Potok składa się z 3 komponentów:

  • product_name: ten komponent pobiera nazwę produktu (lub dowolny rzeczownik, którego potrzebujesz) jako dane wejściowe i zwraca ten ciąg znaków jako dane wyjściowe.
  • emoji: ten komponent przekształci opis tekstowy w emotikon. Na przykład kod tekstowy funkcji ✨ to „iskierki”. Ten komponent wykorzystuje bibliotekę emotikonów, aby pokazać, jak zarządzać zależnościami zewnętrznymi w potoku
  • build_sentence: ten ostatni komponent użyje danych wyjściowych z poprzednich 2 elementów, aby utworzyć zdanie, w którym pojawia się emotikon. Dane wyjściowe mogą na przykład wyglądać tak: „Potoki Vertex to ✨”.

Zacznijmy kodować!

Krok 1. Utwórz komponent oparty na funkcji Pythona

Za pomocą pakietu SDK KFP możemy tworzyć komponenty oparte na funkcjach w Pythonie. Wykorzystamy to w 3 komponentach w naszym pierwszym potoku. Najpierw utworzymy komponent product_name, który jako dane wejściowe pobiera ciąg znaków i go zwraca. Dodaj do notatnika te elementy:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Przyjrzyjmy się bliżej składni:

  • Dekorator @component kompiluje tę funkcję do komponentu podczas uruchamiania potoku. Będziesz go używać za każdym razem, gdy będziesz pisać komponent niestandardowy.
  • Parametr base_image określa obraz kontenera, którego będzie używać ten komponent.
  • Parametr output_component_file jest opcjonalny i określa plik yaml, w którym ma zostać zapisany skompilowany komponent. Po uruchomieniu komórki powinien być widoczny plik zapisany w instancji notatnika. Jeśli chcesz komuś udostępnić ten komponent, możesz wysłać tej osobie wygenerowany plik yaml i poprosić o wczytanie go z tym kodem:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • -> str po definicji funkcji określa typ wyjściowy dla tego komponentu.

Krok 2. Utwórz 2 dodatkowe komponenty

Aby dokończyć potok, utworzymy jeszcze 2 komponenty. Pierwszy, który zdefiniujemy, przyjmuje jako dane wejściowe ciąg znaków i przekształca ten ciąg na odpowiadający mu emotikon, jeśli taki istnieje. Zwraca krotkę z przekazanym tekstem wejściowym i emotikonem wynikowym:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Ten komponent jest nieco bardziej złożony niż poprzedni. Zobaczmy, co się zmieniło:

  • Parametr packages_to_install informuje komponent o zależnościach od bibliotek zewnętrznych związanych z kontenerem. W tym przypadku używamy biblioteki o nazwie emoji.
  • Ten komponent zwraca wartość NamedTuple o nazwie Outputs. Zwróć uwagę, że każdy z ciągów w tej krotce ma klucze: emoji_text i emoji. Wykorzystamy je w następnym komponencie, aby uzyskać dostęp do danych wyjściowych.

Ostatni komponent tego potoku pobierze dane wyjściowe z 2 pierwszych i połączy je, aby zwrócić ciąg znaków:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Być może zastanawiasz się, skąd ten komponent wie, że należy użyć danych wyjściowych uzyskanych w poprzednich krokach, które określiłeś. Świetne pytanie! W następnym kroku powiążemy wszystkie dane.

Krok 3. Łączenie komponentów w potok

Zdefiniowane powyżej definicje komponentów tworzyły funkcje fabryczne, których można użyć w definicji potoku do tworzenia kroków. Aby skonfigurować potok, użyj dekoratora @pipeline, nadaj potokowi nazwę i opis, a także podaj ścieżkę główną, w której powinny być zapisywane artefakty potoku. Artefakty oznaczają wszystkie pliki wyjściowe wygenerowane przez Twój potok. Ten potok wprowadzający nie wygeneruje żadnych, ale podczas następnego potoku zostaną wygenerowane.

W następnym bloku kodu definiujemy funkcję intro_pipeline. W tym miejscu określamy dane wejściowe do początkowych kroków potoku i w jaki sposób łączą się one ze sobą:

  • product_task pobiera nazwę produktu jako dane wejściowe. W tym celu ale możesz zmienić to, co chcesz.
  • emoji_task pobiera kod tekstowy emotikona jako dane wejściowe. Wartość tę można dowolnie zmienić. Na przykład „party_face” dotyczy emotikonu 🥳. Ponieważ zarówno ten, jak i komponent product_task nie zawierają żadnych kroków, które dostarczają do nich danych wejściowych, podczas definiowania naszego potoku ręcznie określamy dane wejściowe dla tych komponentów.
  • Ostatni krok naszego potoku – consumer_task ma 3 parametry wejściowe:
    • Dane wyjściowe funkcji product_task. Ponieważ ten krok generuje tylko 1 wynik, możemy się do niego odwołać za pomocą funkcji product_task.output.
    • Wynik emoji z kroku emoji_task. Patrz zdefiniowany powyżej komponent emoji, w którym nazwaliśmy parametry wyjściowe.
    • Podobnie jest nazwane emoji_text z komponentu emoji. Jeśli nasz potok zostanie przekazany do tekstu, który nie odpowiada emotikonowi, użyje go do utworzenia zdania.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Krok 4. Skompiluj i uruchom potok

Po zdefiniowaniu potoku możesz go skompilować. W ten sposób wygenerujesz plik JSON, którego użyjesz do uruchomienia potoku:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Następnie utwórz zmienną TIMESTAMP. Wykorzystamy to w identyfikatorze zadania:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Następnie zdefiniuj zadanie potoku:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Na koniec uruchom zadanie, aby utworzyć nowe wykonanie potoku:

job.submit()

Po uruchomieniu tej komórki powinny wyświetlić się logi z linkiem umożliwiającym wyświetlenie uruchomionego potoku w konsoli:

Logi zadań potoku

Otwórz odpowiedni link. Po zakończeniu potok powinien wyglądać tak:

Ukończono potok wprowadzenia

Uruchomienie tego potoku zajmie 5–6 minut. Gdy skończysz, możesz kliknąć komponent build-sentence, aby zobaczyć efekt końcowy:

Dane wyjściowe potoku wprowadzającego

Znasz już teraz, jak działają KFP SDK i Vertex Pipelines, więc możesz utworzyć potok, który utworzy i wdroży model ML przy użyciu innych usług Vertex AI. Zaczynamy!

6. Tworzenie pełnego potoku ML

Czas utworzyć pierwszy potok ML. W tym potoku użyjemy zbioru danych fasadowych z uczenia maszynowego UCI pochodzącego od: KOKLU, M., OZKAN, I.A., (2020), „Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques” (Wieloklasowa klasyfikacja fasoli w praktyce przy użyciu technik rozpoznawania i uczenia maszynowego). 174 r., 105507, s. DOI

Jest to tabelaryczny zbiór danych, a w naszym potoku użyjemy go do trenowania, oceny i wdrożenia modelu AutoML, który klasyfikuje ziarno na 1 z 7 typów na podstawie ich cech.

Ten potok:

  • Utwórz zbiór danych w
  • Trenowanie tabelarycznego modelu klasyfikacji przy użyciu AutoML
  • Pobierz wskaźniki oceny tego modelu
  • Na podstawie wskaźników oceny zdecyduj, czy chcesz wdrożyć model za pomocą logiki warunkowej w Vertex Pipelines
  • Wdróż model w punkcie końcowym za pomocą prognozy Vertex

Każdy z omówionych kroków będzie częścią składową. Większość kroków potoku będzie wykorzystywać gotowe komponenty usług Vertex AI za pomocą biblioteki google_cloud_pipeline_components zaimportowanej wcześniej w ramach tego ćwiczenia w Codelabs. W tej sekcji najpierw zdefiniujemy jeden komponent niestandardowy. Następnie zdefiniujemy pozostałe kroki potoku za pomocą gotowych komponentów. Gotowe komponenty ułatwiają dostęp do usług Vertex AI, takich jak trenowanie i wdrażanie modeli.

Krok 1. Komponent niestandardowy do oceny modelu

Zdefiniowany przez nas komponent niestandardowy będzie używany pod koniec potoku po zakończeniu trenowania modelu. Ten komponent wykona kilka czynności:

  • Pobieranie wskaźników oceny z wytrenowanego modelu klasyfikacji AutoML
  • Przeanalizuj wskaźniki i wyrenderuj je w interfejsie Vertex Pipelines
  • Porównaj wskaźniki z progiem, aby określić, czy model powinien zostać wdrożony

Zanim zdefiniujemy komponent, poznaj jego parametry wejściowe i wyjściowe. Jako dane wejściowe ten potok pobiera metadane dotyczące naszego projektu Cloud, wytrenowany model (zostanie zdefiniowany później), wskaźniki oceny modelu oraz thresholds_dict_str. thresholds_dict_str to coś, co definiujemy podczas uruchamiania potoku. W przypadku tego modelu klasyfikacji jest to obszar pod wartością krzywej ROC, dla którego należy wdrożyć model. Jeśli na przykład przekażemy 0,95, oznacza to, że chcemy, aby nasz potok wdrożył model tylko wtedy, gdy wartość ta przekroczy 95%.

Nasz komponent oceny zwraca ciąg znaków wskazujący, czy należy wdrożyć model. Aby utworzyć ten komponent niestandardowy, dodaj w komórce notatnika ten kod:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Krok 2. Dodaj gotowe komponenty Google Cloud

W tym kroku zdefiniujemy pozostałe komponenty potoku i zobaczymy, jak je ze sobą współgrają. Najpierw określ wyświetlaną nazwę uruchomienia potoku za pomocą sygnatury czasowej:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Następnie skopiuj te dane do nowej komórki notatnika:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Zobaczmy, co się dzieje w tym kodzie:

  • Najpierw, tak jak w poprzednim potoku, definiujemy parametry wejściowe tego potoku. Musimy ustawić je ręcznie, ponieważ nie zależą one od danych wyjściowych innych kroków w potoku.
  • Pozostała część potoku używa kilku gotowych komponentów do interakcji z usługami Vertex AI:
    • TabularDatasetCreateOp tworzy tabelaryczny zbiór danych w Vertex AI ze źródłem zbioru danych w Cloud Storage lub BigQuery. W tym potoku przekazujemy dane przez adres URL tabeli BigQuery
    • AutoMLTabularTrainingJobRunOp uruchamia zadanie trenowania AutoML dla tabelarycznego zbioru danych. Przekazujemy do tego komponentu kilka parametrów konfiguracji, w tym typ modelu (w tym przypadku jest to klasyfikacja), dane w kolumnach, czas, przez jaki mamy trenować, oraz wskaźnik do zbioru danych. Uwaga: aby przekazać zbiór danych do tego komponentu, udostępniamy dane wyjściowe poprzedniego komponentu w narzędziu dataset_create_op.outputs["dataset"].
    • EndpointCreateOp tworzy punkt końcowy w Vertex AI. Punkt końcowy utworzony w tym kroku zostanie przekazany jako dane wejściowe do następnego komponentu
    • ModelDeployOp wdraża dany model w punkcie końcowym w Vertex AI. W tym przypadku używamy punktu końcowego utworzonego w poprzednim kroku. Dostępne są dodatkowe opcje konfiguracji, ale podajemy tutaj typ maszyny i model punktu końcowego, które chcemy wdrożyć. Przekazujemy model, uzyskując dostęp do danych wyjściowych z etapu trenowania w naszym potoku
  • Ten potok korzysta również z logiki warunkowej – funkcji Vertex Pipelines, która umożliwia zdefiniowanie warunku oraz różnych gałęzi na podstawie jego wyniku. Pamiętaj, że podczas określania potoku przekazujemy parametr thresholds_dict_str. Jest to próg dokładności, którego używamy do określenia, czy wdrożyć nasz model w punkcie końcowym. Aby wdrożyć to rozwiązanie, używamy klasy Condition z pakietu KFP SDK. Przekazywany warunek to dane wyjściowe komponentu niestandardowego oceny, który został zdefiniowany wcześniej w tym ćwiczeniu w Codelabs. Jeśli ten warunek jest spełniony, potok będzie nadal uruchamiać komponent deploy_op. Jeśli dokładność nie osiągnie wstępnie zdefiniowanego progu, potok zatrzyma się w tym miejscu i nie wdroży modelu.

Krok 3. Skompiluj i uruchom kompleksowy potok ML

Po zdefiniowaniu pełnego potoku musisz go skompilować:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Następnie zdefiniuj zadanie:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Na koniec uruchom zadanie:

ml_pipeline_job.submit()

Po uruchomieniu powyższej komórki przejdź do linku widocznego w logach, aby wyświetlić potok w konsoli. Uruchomienie tego potoku zajmie trochę ponad godzinę. Większość czasu jest poświęcana na etapie trenowania AutoML. Ukończony potok będzie wyglądał mniej więcej tak:

Ukończony potok AutoML

Jeśli przełączysz opcję „Rozwiń artefakty” u góry strony wyświetlają się szczegóły różnych artefaktów utworzonych z potoku. Jeśli na przykład klikniesz artefakt dataset, zobaczysz szczegóły utworzonego zbioru danych Vertex AI. Możesz kliknąć ten link, aby przejść do strony tego zbioru danych:

Zbiór danych potoku

Podobnie, aby zobaczyć wizualizacje danych wygenerowane przez komponent oceny niestandardowej, kliknij artefakt o nazwie metricsc. Po prawej stronie panelu znajduje się tabela pomyłek dla tego modelu:

Wizualizacja danych

Aby zobaczyć model i punkt końcowy utworzony na podstawie tego uruchomienia potoku, przejdź do sekcji modeli i kliknij model o nazwie automl-beans. W punkcie końcowym powinien być wdrożony ten model:

Punkt końcowy modelu

Dostęp do tej strony możesz też uzyskać, klikając artefakt punktu końcowego na wykresie potoku.

Oprócz przeglądania wykresu potoku w konsoli możesz też używać Vertex Pipelines do śledzenia historii. Śledzenie historii danych oznacza śledzenie artefaktów utworzonych w całym potoku. Może to pomóc nam zrozumieć, gdzie zostały utworzone artefakty i jak są one używane w przepływie pracy związanym z systemami uczącymi się. Aby na przykład wyświetlić śledzenie historii w zbiorze danych utworzonym w tym potoku, kliknij artefakt zbioru danych, a następnie Wyświetl historię danych:

Wyświetl historię danych

Wyświetlą się wszystkie miejsca, w których jest używany ten artefakt:

Szczegóły historii danych

Krok 4. Porównaj wskaźniki w uruchomieniach potoku

Jeśli uruchamiasz ten potok wiele razy, warto porównywać wskaźniki między uruchomieniami. Aby uzyskać dostęp do metadanych uruchomienia, możesz użyć metody aiplatform.get_pipeline_df(). Pozyskamy tu metadane wszystkich uruchomień tego potoku i wczytamy je do ramki DataFrame usługi Pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Moduł ukończony.

🎉 Gratulacje! 🎉

Wiesz już, jak używać Vertex AI do:

  • Użyj pakietu Kubeflow Pipelines SDK do tworzenia kompleksowych potoków z komponentami niestandardowymi
  • Uruchamiaj potoki w Vertex Pipelines i uruchamiaj je za pomocą pakietu SDK
  • Wyświetlanie i analizowanie wykresu Vertex Pipelines w konsoli
  • Dodaj do potoku usługi Vertex AI z użyciem gotowych komponentów potoku
  • Zaplanuj cykliczne zadania potoku

Więcej informacji o różnych częściach Vertex znajdziesz w dokumentacji.

7. Czyszczenie

Aby uniknąć opłat, zalecamy usunięcie zasobów utworzonych w tym module.

Krok 1. Zatrzymaj lub usuń instancję Notatników

Jeśli chcesz nadal korzystać z notatnika utworzonego w tym module, zalecamy wyłączenie go, gdy nie jest używany. W interfejsie notatników w konsoli Cloud wybierz notatnik, a następnie kliknij Zatrzymaj. Jeśli chcesz całkowicie usunąć instancję, wybierz Usuń:

Zatrzymaj instancję

Krok 2. Usuń punkt końcowy

Aby usunąć wdrożony punkt końcowy, przejdź do sekcji Punkty końcowe w konsoli Vertex AI i kliknij ikonę usuwania:

Usuń punkt końcowy

Następnie kliknij Wycofaj wdrożenie z poziomu tego promptu:

Wycofaj wdrożenie modelu

Na koniec przejdź do sekcji Modele w konsoli, znajdź odpowiedni model i w menu z 3 kropkami po prawej stronie kliknij Usuń model:

Usuń model

Krok 3. Usuń zasobnik Cloud Storage

Aby usunąć zasobnik na dane, w menu nawigacyjnym w konsoli Cloud przejdź do Cloud Storage, wybierz swój zasobnik i kliknij Usuń:

Usuń miejsce na dane