Wprowadzenie do Vertex Pipelines

1. Przegląd

Z tego modułu dowiesz się, jak tworzyć i uruchamiać potoki ML za pomocą Vertex Pipelines.

Czego się dowiesz

Poznasz takie zagadnienia jak:

  • Tworzenie skalowalnych potoków ML za pomocą pakietu Kubeflow Pipelines SDK
  • Utwórz i uruchom 3-etapowy potok wprowadzający, który przyjmuje dane wejściowe w postaci tekstu.
  • Tworzenie i uruchamianie potoku, który trenuje, ocenia i wdraża model klasyfikacji AutoML
  • Używaj gotowych komponentów do interakcji z usługami Vertex AI, które są dostępne w bibliotece google_cloud_pipeline_components.
  • Planowanie zadania potoku za pomocą usługi Cloud Scheduler

Całkowity koszt ukończenia tego laboratorium w Google Cloud wynosi około 25 USD.

2. Wprowadzenie do Vertex AI

W tym module wykorzystujemy najnowszą ofertę produktów AI dostępną w Google Cloud. Vertex AI integruje oferty ML w Google Cloud, zapewniając płynne środowisko programistyczne. Wcześniej modele wytrenowane za pomocą AutoML i modele niestandardowe były dostępne w ramach osobnych usług. Nowa oferta łączy je w jeden interfejs API wraz z innymi nowymi usługami. Możesz też przeprowadzić migrację istniejących projektów do Vertex AI.

Oprócz usług trenowania i wdrażania modeli Vertex AI obejmuje też różne usługi MLOps, w tym Vertex Pipelines (które są głównym tematem tego laboratorium), monitorowanie modeli, Feature Store i inne. Wszystkie usługi Vertex AI znajdziesz na diagramie poniżej.

Omówienie usługi Vertex

Jeśli masz jakieś uwagi, odwiedź stronę pomocy.

Dlaczego potoki ML są przydatne?

Zanim przejdziemy do szczegółów, dowiedzmy się, dlaczego warto używać potoku. Wyobraź sobie, że tworzysz przepływ pracy uczenia maszynowego, który obejmuje przetwarzanie danych, trenowanie modelu, dostrajanie hiperparametrów, ocenę i wdrażanie modelu. Każdy z tych kroków może mieć różne zależności, co może być trudne do zarządzania, jeśli cały przepływ pracy jest traktowany jako monolit. Gdy zaczniesz skalować proces uczenia maszynowego, możesz udostępnić swój przepływ pracy innym członkom zespołu, aby mogli go uruchamiać i dodawać kod. Bez niezawodnego, powtarzalnego procesu może to być trudne. W przypadku potoków każdy krok procesu uczenia maszynowego jest osobnym kontenerem. Dzięki temu możesz opracowywać poszczególne kroki niezależnie od siebie oraz śledzić dane wejściowe i wyjściowe każdego z nich w powtarzalny sposób. Możesz też zaplanować lub wywołać uruchomienie potoku na podstawie innych zdarzeń w środowisku Cloud, np. uruchomić potok, gdy dostępne są nowe dane treningowe.

W skrócie: potoki pomagają automatyzować i odtwarzać przepływ pracy związany z systemami uczącymi się.

3. Konfigurowanie środowiska w chmurze

Aby wykonać to ćwiczenie, musisz mieć projekt w Google Cloud Platform z włączonymi płatnościami. Aby utworzyć projekt, postępuj zgodnie z instrukcjami.

Krok 1. Uruchom Cloud Shell

W tym module będziesz pracować w sesji Cloud Shell, czyli w interpreterze poleceń hostowanym przez maszynę wirtualną działającą w chmurze Google. Możesz równie łatwo uruchomić tę sekcję lokalnie na własnym komputerze, ale korzystanie z Cloud Shell zapewnia wszystkim możliwość odtworzenia działania w spójnym środowisku. Po zakończeniu modułu możesz spróbować ponownie wykonać tę sekcję na własnym komputerze.

Autoryzowanie Cloud Shell

Aktywowanie Cloud Shell

W prawym górnym rogu konsoli Cloud kliknij przycisk poniżej, aby aktywować Cloud Shell:

Aktywowanie Cloud Shell

Jeśli uruchamiasz Cloud Shell po raz pierwszy, zobaczysz ekran pośredni (część strony widoczna po przewinięciu) z opisem tego środowiska. W takim przypadku kliknij Dalej, a ten ekran nie będzie się już wyświetlać. Ten wyświetlany jednorazowo ekran wygląda tak:

Konfigurowanie Cloud Shell

Uzyskanie dostępu do środowiska Cloud Shell i połączenie się z nim powinno zająć tylko kilka chwil.

Inicjowanie Cloud Shell

Ta maszyna wirtualna zawiera wszystkie potrzebne narzędzia dla programistów. Zawiera również stały katalog domowy o pojemności 5 GB i działa w Google Cloud, co znacznie zwiększa wydajność sieci i usprawnia proces uwierzytelniania. Większość zadań w tym module, a być może wszystkie, możesz wykonać w przeglądarce lub na Chromebooku.

Po połączeniu z Cloud Shell zobaczysz, że uwierzytelnianie zostało już przeprowadzone, a projekt jest już ustawiony na Twój identyfikator projektu.

Aby potwierdzić, że uwierzytelnianie zostało przeprowadzone, uruchom w Cloud Shell to polecenie:

gcloud auth list

W wyniku polecenia powinny pojawić się informacje podobne do tych:

Dane wyjściowe Cloud Shell

Aby potwierdzić, że polecenie gcloud zna Twój projekt, uruchom w Cloud Shell to polecenie:

gcloud config list project

Wynik polecenia

[core]
project = <PROJECT_ID>

Jeśli nie, możesz go ustawić za pomocą tego polecenia:

gcloud config set project <PROJECT_ID>

Wynik polecenia

Updated property [core/project].

Cloud Shell ma kilka zmiennych środowiskowych, w tym GOOGLE_CLOUD_PROJECT, która zawiera nazwę naszego bieżącego projektu w chmurze. Będziemy go używać w różnych miejscach w tym laboratorium. Możesz to sprawdzić, uruchamiając to polecenie:

echo $GOOGLE_CLOUD_PROJECT

Krok 2. Włącz interfejsy API

W dalszych krokach dowiesz się, gdzie i dlaczego te usługi są potrzebne. Na razie uruchom to polecenie, aby przyznać projektowi dostęp do usług Compute Engine, Container Registry i Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Powinien wyświetlić się komunikat o powodzeniu podobny do tego:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Krok 3. Utwórz zasobnik Cloud Storage

Aby uruchomić zadanie trenowania w Vertex AI, potrzebujemy zasobnika pamięci masowej do przechowywania zapisanych zasobów modelu. Zasobnik musi być regionalny. Używamy tutaj regionu us-central, ale możesz użyć innego regionu (wystarczy go zastąpić w całym tym laboratorium). Jeśli masz już zasobnik, możesz pominąć ten krok.

Aby utworzyć zasobnik, uruchom w terminalu Cloud Shell te polecenia:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Następnie przyznamy kontu usługi obliczeniowej dostęp do tego zasobnika. Dzięki temu Vertex Pipelines będzie mieć uprawnienia do zapisywania plików w tym zasobniku. Aby dodać to uprawnienie, uruchom to polecenie:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Krok 4. Tworzenie instancji Vertex AI Workbench

W sekcji Vertex AI w konsoli Cloud kliknij Workbench:

Menu Vertex AI

Następnie w sekcji Notatniki zarządzane przez użytkownika kliknij Nowy notatnik:

Utwórz nowy notatnik

Następnie wybierz typ instancji TensorFlow Enterprise 2.3 (z LTS) bez procesorów graficznych:

Instancja TFE

Użyj opcji domyślnych, a potem kliknij Utwórz.

Krok 5. Otwórz notatnik

Po utworzeniu instancji wybierz Otwórz JupyterLab:

Otwórz notatnik

4. Konfigurowanie Vertex Pipelines

Aby korzystać z Vertex Pipelines, musimy zainstalować kilka dodatkowych bibliotek:

  • Kubeflow Pipelines: to pakiet SDK, którego użyjemy do utworzenia potoku. Vertex Pipelines obsługuje uruchamianie potoków utworzonych za pomocą Kubeflow Pipelines lub TFX.
  • Komponenty potoku Google Cloud: ta biblioteka zawiera gotowe komponenty, które ułatwiają interakcję z usługami Vertex AI z poziomu kroków potoku.

Krok 1. Utwórz notatnik w Pythonie i zainstaluj biblioteki

Najpierw w menu Launchera w instancji notatnika utwórz notatnik, wybierając Python 3:

Tworzenie notatnika w Pythonie 3

Aby otworzyć menu Launchera, kliknij znak + w lewym górnym rogu instancji notatnika.

Aby zainstalować obie usługi, których będziemy używać w tym module, najpierw ustaw flagę użytkownika w komórce notatnika:

USER_FLAG = "--user"

Następnie uruchom w notatniku to polecenie:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Po zainstalowaniu tych pakietów musisz ponownie uruchomić jądro:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Na koniec sprawdź, czy pakiety zostały prawidłowo zainstalowane. Wersja pakietu KFP SDK powinna być >=1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Krok 2. Ustaw identyfikator projektu i zasobnik

W ramach tego laboratorium będziesz odwoływać się do identyfikatora projektu w chmurze i utworzonego wcześniej zasobnika. Następnie utworzymy zmienne dla każdego z nich.

Jeśli nie znasz identyfikatora projektu, możesz go uzyskać, uruchamiając to polecenie:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

W przeciwnym razie ustaw go tutaj:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Następnie utwórz zmienną, w której będzie przechowywana nazwa zasobnika. Jeśli został utworzony w tym laboratorium, poniższe polecenie zadziała. W przeciwnym razie musisz ustawić to ręcznie:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Krok 3. Zaimportuj biblioteki

Aby zaimportować biblioteki, których będziemy używać w tym ćwiczeniu, dodaj ten kod:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Krok 4. Określ stałe

Ostatnią rzeczą, którą musimy zrobić przed utworzeniem potoku, jest zdefiniowanie niektórych stałych zmiennych. PIPELINE_ROOT to ścieżka w Cloud Storage, w której będą zapisywane artefakty utworzone przez nasz potok. Jako regionu używamy tutaj us-central1, ale jeśli podczas tworzenia zasobnika użyto innego regionu, zaktualizuj zmienną REGION w poniższym kodzie:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Po uruchomieniu powyższego kodu powinna zostać wyświetlona ścieżka do katalogu głównego potoku. Jest to lokalizacja w Cloud Storage, w której będą zapisywane artefakty z potoku. Będzie mieć format gs://YOUR-BUCKET-NAME/pipeline_root/

5. Tworzenie pierwszego potoku

Aby zapoznać się z działaniem Vertex Pipelines, najpierw utworzymy krótki potok za pomocą pakietu KFP SDK. Ten potok nie jest związany z uczeniem maszynowym (nie martw się, do tego też dojdziemy). Używamy go, aby Cię nauczyć:

  • Jak tworzyć komponenty niestandardowe w pakiecie KFP SDK
  • Jak uruchamiać i monitorować potok w Vertex Pipelines

Utworzymy potok, który wyświetli zdanie z użyciem 2 elementów wyjściowych: nazwy produktu i opisu emoji. Ten potok będzie się składać z 3 komponentów:

  • product_name: ten komponent przyjmuje jako dane wejściowe nazwę produktu (lub dowolny rzeczownik) i zwraca ten ciąg znaków jako dane wyjściowe.
  • emoji: ten komponent pobiera tekstowy opis emotikona i konwertuje go na emotikon. Na przykład kod tekstowy dla ✨ to „sparkles”. Ten komponent korzysta z biblioteki emoji, aby pokazać, jak zarządzać zależnościami zewnętrznymi w potoku.
  • build_sentence: ten ostatni komponent wykorzysta dane wyjściowe z dwóch poprzednich, aby utworzyć zdanie z emotikonem. Wynik może na przykład wyglądać tak: „Vertex Pipelines is ✨”.

Zaczynamy kodować!

Krok 1. Utwórz komponent oparty na funkcji Pythona

Za pomocą pakietu KFP SDK możemy tworzyć komponenty na podstawie funkcji Pythona. Użyjemy go w 3 komponentach pierwszego potoku. Najpierw utworzymy komponent product_name, który przyjmuje ciąg znaków jako dane wejściowe i zwraca ten ciąg. Dodaj do notatnika te informacje:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Przyjrzyjmy się bliżej składni:

  • Dekorator @component kompiluje tę funkcję do komponentu podczas uruchamiania potoku. Będziesz go używać za każdym razem, gdy będziesz pisać komponent niestandardowy.
  • Parametr base_image określa obraz kontenera, którego będzie używać ten komponent.
  • Parametr output_component_file jest opcjonalny i określa plik YAML, do którego ma zostać zapisany skompilowany komponent. Po uruchomieniu komórki zobaczysz, że plik został zapisany w instancji notatnika. Jeśli chcesz udostępnić ten komponent komuś innemu, możesz wysłać mu wygenerowany plik YAML i poprosić go o załadowanie go w ten sposób:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • Symbol -> str po definicji funkcji określa typ danych wyjściowych tego komponentu.

Krok 2. Utwórz 2 dodatkowe komponenty

Aby dokończyć potok, utworzymy jeszcze 2 komponenty. Pierwsza z nich przyjmuje ciąg znaków jako dane wejściowe i konwertuje go na odpowiednie emoji, jeśli takie istnieje. Zwraca krotkę z przekazanym tekstem wejściowym i wynikowym emotikonem:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Ten komponent jest nieco bardziej złożony niż poprzedni. Przyjrzyjmy się nowościom:

  • Parametr packages_to_install informuje komponent o wszelkich zewnętrznych zależnościach bibliotecznych tego kontenera. W tym przypadku używamy biblioteki o nazwie emoji.
  • Ten komponent zwraca obiekt NamedTuple o nazwie Outputs. Zwróć uwagę, że każdy ciąg znaków w tej krotce ma klucze: emoji_textemoji. Użyjemy ich w następnym komponencie, aby uzyskać dostęp do danych wyjściowych.

Ostatni komponent tego potoku wykorzysta dane wyjściowe z pierwszych dwóch komponentów i połączy je, aby zwrócić ciąg znaków:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Możesz się zastanawiać, skąd ten komponent wie, że ma użyć danych wyjściowych z poprzednich zdefiniowanych przez Ciebie kroków. Świetne pytanie! Wszystko to połączymy w następnym kroku.

Krok 3. Łączenie komponentów w potok

Zdefiniowane powyżej definicje komponentów utworzyły funkcje fabryczne, których można używać w definicji potoku do tworzenia kroków. Aby skonfigurować potok, użyj dekoratora @pipeline, nadaj potokowi nazwę i opis oraz podaj ścieżkę główną, w której mają być zapisywane artefakty potoku. Artefakty to wszystkie pliki wyjściowe wygenerowane przez potok. Ten potok wprowadzający nie generuje żadnych danych, ale nasz następny potok będzie to robić.

W następnym bloku kodu definiujemy funkcję intro_pipeline. W tym miejscu określamy dane wejściowe do początkowych kroków potoku i sposób, w jaki są one ze sobą połączone:

  • product_task przyjmuje jako dane wejściowe nazwę produktu. W tym przypadku przekazujemy wartość „Vertex Pipelines”, ale możesz ją zmienić na dowolną inną.
  • emoji_task przyjmuje jako dane wejściowe kod tekstowy emotikona. Możesz też zmienić tę wartość na dowolną inną. Na przykład „party_face” odnosi się do emoji 🥳. Pamiętaj, że ponieważ ten komponent i komponent product_task nie mają żadnych kroków, które dostarczają do nich dane wejściowe, podczas definiowania potoku ręcznie określamy dane wejściowe dla tych komponentów.
  • Ostatni krok w naszym potoku – consumer_task – ma 3 parametry wejściowe:
    • Dane wyjściowe product_task. Ten krok generuje tylko 1 wynik, więc możemy się do niego odwołać za pomocą product_task.output.
    • Dane wyjściowe emoji z kroku emoji_task. Zobacz zdefiniowany powyżej komponent emoji, w którym nadaliśmy nazwy parametrom wyjściowym.
    • Podobnie emoji_text nazwane dane wyjściowe z komponentu emoji. Jeśli nasz potok otrzyma tekst, który nie odpowiada emoji, użyje go do utworzenia zdania.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Krok 4. Kompilowanie i uruchamianie potoku

Po zdefiniowaniu potoku możesz go skompilować. Spowoduje to wygenerowanie pliku JSON, którego użyjesz do uruchomienia potoku:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Następnie utwórz zmienną TIMESTAMP. Użyjemy go w identyfikatorze zadania:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Następnie zdefiniuj zadanie potoku:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Na koniec uruchom zadanie, aby utworzyć nowe wykonanie potoku:

job.submit()

Po uruchomieniu tej komórki w konsoli powinny pojawić się logi z linkiem do wyświetlenia uruchomienia potoku:

Logi zadań potoku

Przejdź do tego linku. Po zakończeniu potok powinien wyglądać tak:

Ukończony potok wprowadzający

Wykonanie tego potoku zajmie 5–6 minut. Po zakończeniu możesz kliknąć komponent build-sentence, aby zobaczyć ostateczne dane wyjściowe:

Dane wyjściowe potoku wprowadzającego

Teraz, gdy wiesz już, jak działają pakiet KFP SDK i Vertex Pipelines, możesz utworzyć potok, który tworzy i wdraża model uczenia maszynowego przy użyciu innych usług Vertex AI. Zaczynajmy!

6. Tworzenie kompleksowego potoku ML

Czas utworzyć pierwszy potok ML. W tym potoku użyjemy zbioru danych UCI Machine Learning Dry beans dataset, pochodzącego z: KOKLU, M. and OZKAN, I.A., (2020), „Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques” (Wieloklasowa klasyfikacja suchych fasoli przy użyciu technik rozpoznawania obrazów i uczenia maszynowego). W Computers and Electronics in Agriculture, 174, 105507. DOI

Jest to zbiór danych tabelarycznych, którego użyjemy w naszym potoku do trenowania, oceniania i wdrażania modelu AutoML, który klasyfikuje ziarna na 7 rodzajów na podstawie ich cech.

Ten potok:

  • Utwórz zbiór danych w .
  • Trenowanie tabelarycznego modelu klasyfikacji za pomocą AutoML
  • Pobieranie wskaźników oceny tego modelu
  • Na podstawie wskaźników oceny zdecyduj, czy wdrożyć model za pomocą logiki warunkowej w Vertex Pipelines.
  • Wdrożenie modelu w punkcie końcowym za pomocą Vertex Prediction

Każdy z tych kroków będzie komponentem. Większość kroków potoku będzie korzystać z gotowych komponentów usług Vertex AI za pomocą biblioteki google_cloud_pipeline_components, którą zaimportowaliśmy wcześniej w tym samouczku. W tej sekcji najpierw zdefiniujemy 1 komponent niestandardowy. Następnie zdefiniujemy pozostałe kroki potoku za pomocą gotowych komponentów. Gotowe komponenty ułatwiają dostęp do usług Vertex AI, takich jak trenowanie i wdrażanie modeli.

Krok 1. Komponent niestandardowy do oceny modelu

Zdefiniowany przez nas komponent niestandardowy zostanie użyty pod koniec potoku po zakończeniu trenowania modelu. Ten komponent będzie wykonywać kilka czynności:

  • Pobieranie danych oceny z wytrenowanego modelu klasyfikacji AutoML
  • Analizowanie danych i wyświetlanie ich w interfejsie potoków Vertex AI
  • Porównaj dane z wartością progową, aby określić, czy model powinien zostać wdrożony.

Zanim zdefiniujemy komponent, przyjrzyjmy się jego parametrom wejściowym i wyjściowym. Ten potok przyjmuje jako dane wejściowe metadane naszego projektu w chmurze, wynikowy wytrenowany model (zdefiniujemy ten komponent później), wskaźniki oceny modelu i thresholds_dict_str. Wartość thresholds_dict_str określimy podczas uruchamiania potoku. W przypadku tego modelu klasyfikacji będzie to wartość obszaru pod krzywą charakterystyki operacyjnej odbiornika, dla której powinniśmy wdrożyć model. Jeśli na przykład podamy wartość 0,95, oznacza to, że chcemy, aby potok wdrażał model tylko wtedy, gdy ta wartość jest większa niż 95%.

Komponent oceny zwraca ciąg znaków wskazujący, czy model ma zostać wdrożony. Aby utworzyć ten komponent niestandardowy, dodaj w komórce notatnika ten kod:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Krok 2. Dodawanie gotowych komponentów Google Cloud

W tym kroku zdefiniujemy pozostałe komponenty potoku i zobaczymy, jak ze sobą współpracują. Najpierw zdefiniuj wyświetlaną nazwę uruchomienia potoku, używając sygnatury czasowej:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Następnie skopiuj poniższy kod do nowej komórki notatnika:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Zobaczmy, co się dzieje w tym kodzie:

  • Najpierw, podobnie jak w przypadku poprzedniego potoku, definiujemy parametry wejściowe, które przyjmuje ten potok. Musimy je ustawić ręcznie, ponieważ nie zależą od wyników innych kroków w potoku.
  • Pozostała część potoku korzysta z kilku gotowych komponentów do interakcji z usługami Vertex AI:
    • TabularDatasetCreateOp tworzy tabelaryczny zbiór danych w Vertex AI na podstawie źródła zbioru danych w Cloud Storage lub BigQuery. W tym potoku przekazujemy dane za pomocą adresu URL tabeli BigQuery.
    • AutoMLTabularTrainingJobRunOp rozpoczyna zadanie trenowania AutoML dla zbioru danych tabelarycznych. Do tego komponentu przekazujemy kilka parametrów konfiguracji, w tym typ modelu (w tym przypadku klasyfikacja), dane o kolumnach, czas trenowania i wskaźnik zbioru danych. Zwróć uwagę, że aby przekazać zbiór danych do tego komponentu, podajemy dane wyjściowe poprzedniego komponentu za pomocą dataset_create_op.outputs["dataset"].
    • EndpointCreateOp tworzy punkt końcowy w Vertex AI. Punkt końcowy utworzony w tym kroku zostanie przekazany jako dane wejściowe do następnego komponentu.
    • ModelDeployOp wdraża dany model w punkcie końcowym w Vertex AI. W tym przypadku używamy punktu końcowego utworzonego w poprzednim kroku. Dostępne są dodatkowe opcje konfiguracji, ale w tym przypadku podajemy typ maszyny i model punktu końcowego, który chcemy wdrożyć. Przekazujemy model, uzyskując dostęp do wyników kroku trenowania w naszym potoku.
  • Ten potok korzysta też z logiki warunkowej, czyli funkcji Vertex Pipelines, która umożliwia zdefiniowanie warunku oraz różnych gałęzi na podstawie wyniku tego warunku. Pamiętaj, że podczas definiowania potoku przekazaliśmy parametr thresholds_dict_str. Jest to próg dokładności, którego używamy do określania, czy wdrożyć model w punkcie końcowym. Aby to zrobić, użyjemy klasy Condition z pakietu KFP SDK. Przekazywany warunek to wynik komponentu niestandardowego obliczania, który został zdefiniowany wcześniej w tym samouczku. Jeśli ten warunek jest spełniony, potok będzie nadal wykonywać komponent deploy_op. Jeśli dokładność nie osiągnie wstępnie określonego progu, potok zatrzyma się na tym etapie i nie wdroży modelu.

Krok 3. Skompiluj i uruchom kompleksowy potok ML

Po zdefiniowaniu pełnego potoku czas go skompilować:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Następnie zdefiniuj zadanie:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Na koniec uruchom zadanie:

ml_pipeline_job.submit()

Kliknij link wyświetlany w logach po uruchomieniu komórki powyżej, aby zobaczyć potok w konsoli. Uruchomienie tego potoku zajmie nieco ponad godzinę. Większość czasu zajmuje krok trenowania AutoML. Ukończony potok będzie wyglądać mniej więcej tak:

Ukończony potok AutoML

Jeśli klikniesz przycisk „Rozwiń artefakty” u góry, zobaczysz szczegóły różnych artefaktów utworzonych na podstawie Twojego potoku. Jeśli na przykład klikniesz artefakt dataset, zobaczysz szczegóły utworzonego zbioru danych Vertex AI. Aby przejść do strony tego zbioru danych, kliknij ten link:

Zbiór danych potoku

Podobnie, aby zobaczyć wizualizacje danych uzyskanych z naszego niestandardowego komponentu oceny, kliknij artefakt o nazwie metricsc. Po prawej stronie panelu zobaczysz macierz pomyłek tego modelu:

Wizualizacja danych

Aby zobaczyć model i punkt końcowy utworzone na podstawie tego uruchomienia potoku, otwórz sekcję modeli i kliknij model o nazwie automl-beans. Powinien być tam widoczny model wdrożony w punkcie końcowym:

Punkt końcowy modelu

Możesz też otworzyć tę stronę, klikając artefakt punktu końcowego na wykresie potoku.

Oprócz sprawdzania wykresu potoku w konsoli możesz też używać Vertex Pipelines do śledzenia pochodzenia. Śledzenie pochodzenia oznacza śledzenie artefaktów utworzonych w całym potoku. Dzięki temu możemy dowiedzieć się, gdzie powstały artefakty i jak są wykorzystywane w przepływie pracy związanym z uczeniem maszynowym. Aby na przykład zobaczyć śledzenie pochodzenia zbioru danych utworzonego w tym potoku, kliknij artefakt zbioru danych, a potem Wyświetl pochodzenie:

Wyświetl historię danych

Wyświetli to wszystkie miejsca, w których jest używany ten artefakt:

Szczegóły historii danych

Krok 4. Porównywanie wskaźników w różnych uruchomieniach potoku

Jeśli uruchomisz ten potok wiele razy, możesz chcieć porównać dane z różnych uruchomień. Aby uzyskać dostęp do metadanych wykonania, możesz użyć metody aiplatform.get_pipeline_df(). Pobierzemy tu metadane wszystkich uruchomień tego potoku i załadujemy je do struktury DataFrame biblioteki pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

To już koniec tego laboratorium!

🎉 Gratulacje! 🎉

Dowiedziałeś się, jak używać Vertex AI do:

  • Używanie pakietu Kubeflow Pipelines SDK do tworzenia kompleksowych potoków z komponentami niestandardowymi
  • Uruchamianie potoków w Vertex Pipelines i rozpoczynanie uruchomień potoków za pomocą pakietu SDK
  • Wyświetlanie i analizowanie wykresu Vertex Pipelines w konsoli
  • Używanie gotowych komponentów potoku do dodawania usług Vertex AI do potoku
  • Planowanie cyklicznych zadań potoku

Więcej informacji o różnych częściach Vertex znajdziesz w dokumentacji.

7. Czyszczenie

Aby uniknąć obciążenia konta opłatami, zalecamy usunięcie zasobów utworzonych w ramach tego modułu.

Krok 1. Zatrzymaj lub usuń instancję notatników

Jeśli chcesz nadal korzystać z notatnika utworzonego w tym module, zalecamy wyłączanie go, gdy nie jest używany. W interfejsie Notebooks w konsoli Cloud wybierz notebooka, a następnie kliknij Zatrzymaj. Jeśli chcesz całkowicie usunąć instancję, kliknij Usuń:

Zatrzymaj instancję

Krok 2. Usuń punkt końcowy

Aby usunąć wdrożony punkt końcowy, przejdź do sekcji Punkty końcowe w konsoli Vertex AI i kliknij ikonę usuwania:

Usuń punkt końcowy

Następnie w wyświetlonym oknie kliknij Wycofaj:

Wycofaj wdrożenie modelu

Na koniec przejdź do sekcji Modele w konsoli, znajdź ten model i w menu z 3 kropkami po prawej stronie kliknij Usuń model:

Usuń model

Krok 3. Usuń zasobnik Cloud Storage

Aby usunąć zasobnik Storage, w menu nawigacyjnym w konsoli Cloud otwórz Storage, wybierz zasobnik i kliknij Usuń:

Usuń miejsce na dane