Vertex 管道簡介

1. 總覽

在本實驗室中,您將瞭解如何使用 Vertex Pipelines 建立及執行機器學習管道。

課程內容

學習重點:

  • 使用 Kubeflow Pipelines SDK 建構可擴充的機器學習管道
  • 建立並執行可接受文字輸入的 3 步驟介紹管道
  • 建立及執行管道,訓練、評估及部署 AutoML 分類模型
  • 使用預先建構的元件與 Vertex AI 服務互動,這些元件可透過 google_cloud_pipeline_components 程式庫取得
  • 使用 Cloud Scheduler 排定管道工作

在 Google Cloud 中執行這個研究室的總費用約為 $25 美元。

2. Vertex AI 簡介

這個實驗室使用 Google Cloud 最新推出的 AI 產品服務。Vertex AI 整合了 Google Cloud 中的機器學習產品,提供流暢的開發體驗。先前,使用 AutoML 訓練的模型和自訂模型必須透過不同的服務存取。這項新產品會與其他新產品一起合併為一個 API。您也可以將現有專案遷移至 Vertex AI。

除了模型訓練和部署服務之外,Vertex AI 也提供各種機器學習運作產品,包括 Vertex Pipelines (本研究室的核心)、模型監控和特徵儲存庫等。您可以在下方圖表中查看所有 Vertex AI 產品。

Vertex 產品總覽

如有任何意見,請參閱支援頁面

為什麼機器學習管道很實用?

在深入瞭解前,我們先來瞭解為什麼要使用管道。假設您要建構的機器學習工作流程包含處理資料、訓練模型、超參數調整、評估和模型部署。這些步驟各自可能有不同的依附元件,如果您將整個工作流程視為單一應用程式,可能會變得難以管理。開始擴大機器學習程序時,您可能會想與團隊中的其他成員分享機器學習工作流程,讓他們執行並提供程式碼。如果沒有可靠且能重現的程序,可能會變得困難。透過管道,機器學習程序中的每個步驟都是各自的容器。這樣一來,您就能獨立開發各個步驟,並以可重現的方式追蹤每個步驟的輸入和輸出。您也可以根據 Cloud 環境中的其他事件,排定或觸發管道的執行作業,例如在有新訓練資料時啟動管道執行作業。

重點摘要:管道可協助您自動化重現機器學習工作流程。

3. Cloud 環境設定

您必須擁有啟用計費功能的 Google Cloud Platform 專案,才能執行這個程式碼研究室。如要建立專案,請按照這篇文章中的操作說明進行。

步驟 1:啟動 Cloud Shell

在本研究室中,您將在 Cloud Shell 工作階段中進行作業,這是由在 Google 雲端運作的虛擬機器代管的指令解譯器。您也可以在自己的電腦上執行本節內容,但使用 Cloud Shell 可讓所有人都能在一致的環境中重現體驗。完成研究室後,您可以使用自己的電腦再試一次。

授權 Cloud Shell

啟用 Cloud Shell

在 Cloud 控制台右上方,按一下下方的按鈕,即可啟用 Cloud Shell

啟用 Cloud Shell

如果您從未啟動 Cloud Shell,系統會顯示中間畫面 (在折疊下方),說明 Cloud Shell 的用途。如果是這種情況,請按一下「Continue」 (您之後就不會再看到這個畫面)。這裡是一次性畫面的外觀:

Cloud Shell 設定

佈建並連線至 Cloud Shell 的作業只需幾分鐘的時間。

Cloud Shell 啟動

這個虛擬機器會載入您需要的所有開發工具。提供永久的 5 GB 主目錄,而且在 Google Cloud 中運作,可大幅提升網路效能和驗證功能。在本程式碼研究室中,您的工作幾乎都可以透過瀏覽器或 Chromebook 完成。

連線至 Cloud Shell 後,您應會發現自己通過驗證,且專案已設為您的專案 ID。

在 Cloud Shell 中執行下列指令,確認您已通過驗證:

gcloud auth list

您應該會在指令輸出中看到類似以下的內容:

Cloud Shell 輸出內容

在 Cloud Shell 中執行下列指令,確認 gcloud 指令知道您的專案:

gcloud config list project

指令輸出

[core]
project = <PROJECT_ID>

如未設定,請輸入下列指令設定專案:

gcloud config set project <PROJECT_ID>

指令輸出

Updated property [core/project].

Cloud Shell 有幾個環境變數,包括 GOOGLE_CLOUD_PROJECT,其中包含目前 Cloud 專案的名稱。我們會在本實驗室的各個地方使用這個值。您可以執行以下命令來查看:

echo $GOOGLE_CLOUD_PROJECT

步驟 2:啟用 API

在後續步驟中,您會看到需要這些服務的位置和原因。目前請先執行下列指令,為專案授予 Compute Engine、Container Registry 和 Vertex AI 服務的存取權:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

這應該會產生類似以下的成功訊息:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

步驟 3:建立 Cloud Storage 值區

如要在 Vertex AI 上執行訓練工作,我們需要一個儲存值區來儲存已儲存的模型資產。值區必須是區域性值區,我們在這裡使用 us-central,但您也可以使用其他區域 (只要在本研究室中進行替換即可)。如果您已有資料夾,可以略過這個步驟。

在 Cloud Shell 終端機中執行下列指令,即可建立儲存桶:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

接下來,我們會將這個值區的存取權授予運算服務帳戶。這樣一來,Vertex Pipelines 就能具備寫入這個值區的必要權限。執行下列指令即可新增此權限:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

步驟 4:建立 Vertex AI Workbench 執行個體

在 Cloud 控制台的 Vertex AI 專區中,按一下「Workbench」:

Vertex AI 選單

接著在「使用者自行管理的筆記本」中,按一下「新增筆記本」

建立新的筆記本

接著選取「TensorFlow 企業版 2.3 (含 LTS)」執行個體類型,且不加入任何 GPU

TFE 執行個體

使用預設選項,然後按一下「建立」

步驟 5:開啟 Notebook

建立執行個體後,請選取「Open JupyterLab」

開啟筆記本

4. Vertex Pipelines 設定

必須安裝下列程式庫,才能使用 Vertex Pipelines:

  • Kubeflow Pipelines:這是我們用來建構管道的 SDK。Vertex Pipelines 支援使用 Kubeflow Pipelines 或 TFX 建構的管線執行作業。
  • Google Cloud Pipeline Components:這個程式庫提供預先建構的元件,可讓您更輕鬆地透過管道步驟與 Vertex AI 服務互動。

步驟 1:建立 Python 筆記本並安裝程式庫

首先,在 Notebook 執行個體的啟動器選單中,選取「Python 3」,建立筆記本:

建立 Python 3 筆記本

您可以按一下筆記本執行個體左上角的「+」符號,存取啟動器選單。

如要安裝本實驗室中會用到的兩項服務,請先在筆記本儲存格中設定使用者標記:

USER_FLAG = "--user"

然後在筆記本中執行下列指令:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

安裝這些套件後,您需要重新啟動核心:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最後,檢查是否已正確安裝套件。KFP SDK 版本應為 1.8 以上:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

步驟 2:設定專案 ID 和值區

在本實驗室中,您將參照 Cloud 專案 ID 和先前建立的值區。接下來,我們要為每個項目建立變數。

如果您不知道專案 ID,可以執行下列指令來取得:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

否則,請在此設定:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

接著建立變數來儲存值區名稱。如果您是透過這個研究室建立叢集,就可以執行下列作業。否則,您就需要手動設定:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

步驟 3:匯入程式庫

新增以下程式碼,匯入我們在本程式碼研究室中會用到的程式庫:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

步驟 4:定義常數

在建構管道之前,我們需要定義一些常數變數。PIPELINE_ROOT 是要寫入管道建立的成果的 Cloud Storage 路徑。這裡使用 us-central1 做為區域,但如果您在建立值區時使用了其他區域,請更新下列程式碼中的 REGION 變數:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

執行上述程式碼後,您應該會看到管道根目錄的輸出內容。這是 Cloud Storage 位置,用來寫入管道中的構件。格式為 gs://YOUR-BUCKET-NAME/pipeline_root/

5. 建立第一個管道

為了熟悉 Vertex Pipelines 的運作方式,我們會先使用 KFP SDK 建立簡短的管道。這個管道不會執行任何機器學習相關作業 (別擔心,我們到了!),我們會使用它來教導您:

  • 如何在 KFP SDK 中建立自訂元件
  • 如何在 Vertex Pipelines 中執行及監控管道

我們將建立管道,使用兩個輸出內容 (產品名稱和表情符號說明) 列印句子。這個管道包含三個元件:

  • product_name:這個元件會將產品名稱 (或您想要的任何名詞) 做為輸入內容,並將該字串做為輸出內容
  • emoji:這個元件會將表情符號的文字說明轉換為表情符號。例如,✨ 的文字代碼是「sparkles」。這個元件使用表情符號程式庫,說明如何在管道中管理外部依附元件
  • build_sentence:這個最後的元件會使用前兩個元件的輸出內容,建立使用表情符號的句子。舉例來說,輸出結果可能是「Vertex Pipelines is ✨」。

開始編寫程式吧!

步驟 1:建立以 Python 函式為基礎的元件

使用 KFP SDK,我們就能根據 Python 函式建立元件。我們會將其用於第一個管道中的 3 個元件。我們會先建構 product_name 元件,該元件只會將字串做為輸入內容並傳回該字串。在筆記本中新增下列內容:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

讓我們進一步瞭解語法:

  • @component 裝飾器會在管道執行時,將這個函式編譯為元件。您在編寫自訂元件時,都會使用這個方法。
  • base_image 參數會指定這個元件要使用的容器映像檔。
  • output_component_file 參數為選用項目,可指定要將編譯元件寫入的 yaml 檔案。執行單元格後,您應該會看到該檔案已寫入 Notebook 執行個體。如果您想與他人分享此元件,可以傳送產生的 yaml 檔案,並請對方使用以下方式載入:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 函式定義後方的 -> str 會指定這個元件的輸出類型。

步驟 2:建立兩個其他元件

為了完成管道,我們會再建立兩個元件。第一個我們會定義一個字串做為輸入內容,如有這個字串,則會將其轉換成對應的表情符號。它會傳回一個元組,其中包含傳入的文字和產生的表情符號:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

這個元件比先前的元件複雜一些。讓我們進一步瞭解新功能:

  • packages_to_install 參數會向元件告知此容器的任何外部程式庫依附元件。在本例中,我們使用的是名為 emoji 的程式庫。
  • 這個元件會傳回名為 OutputsNamedTuple。請注意,這個元組中的每個字串都有鍵:emoji_textemoji。我們會在下一個元件中使用這些功能來存取輸出內容。

這個管道的最終元件會使用前兩個的輸出內容,合併兩者以傳回字串:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

您可能會想知道:這個元件如何知道要使用您定義的上一個步驟輸出內容?好問題!我們會在下一個步驟將所有內容整合在一起。

步驟 3:將元件組合成管道

上述定義的元件定義會建立工廠函式,可用於管道定義中建立步驟。如要設定管道,請使用 @pipeline 修飾符,為管道提供名稱和說明,並提供管道的構件應寫入的根路徑。我們指的是管道產生的所有輸出檔案。這個簡介管道不會產生任何結果,但會談到下一個流程。

在下一個程式碼區塊中,我們定義 intro_pipeline 函式。請在這裡指定初始管道步驟的輸入內容,以及步驟彼此的連結方式:

  • product_task 會將產品名稱做為輸入內容。我們在這裡傳遞的是「Vertex Pipelines」,但您可以將其變更為任何想要的名稱。
  • emoji_task 會將表情符號的文字代碼做為輸入內容。您也可以將這項設定變更為任何您想要的值。例如「party_face」是指 🥳? 表情符號。請注意,由於此元件和 product_task 元件沒有任何饋送輸入步驟,因此我們在定義管道時手動指定這些設定的輸入值。
  • 管道中的最後一個步驟 consumer_task 有三個輸入參數:
    • product_task 的輸出內容。由於這個步驟只會產生一個輸出結果,因此我們可以透過 product_task.output 參照該結果。
    • emoji_task 步驟的 emoji 輸出內容。請參閱上方定義的 emoji 元件,其中我們已命名輸出參數。
    • 同樣地,emoji_text 具名輸出內容則是來自 emoji 元件。如果管道傳遞的文字與表情符號不符,則會使用這段文字來構建句子。
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

步驟 4:編譯及執行管道

定義管道後,您就可以編譯管道了。以下會產生 JSON 檔案,您可用來執行管道:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

接下來,建立 TIMESTAMP 變數。我們會在工作 ID 中使用這個值:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

接著定義管道工作:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

最後,執行工作以建立新的管道執行作業:

job.submit()

執行這個儲存格後,您應該會看到記錄檔,其中包含可在控制台中查看管道執行情況的連結:

pipeline 工作記錄檔

前往該連結。完成之後,管道應如下所示:

已完成的介紹管道

這個管道需要執行 5 到 6 分鐘。完成後,您可以按一下 build-sentence 元件,查看最終輸出內容:

簡介管道輸出內容

您現在已熟悉 KFP SDK 和 Vertex Pipelines 的運作方式,因此可以建構管道,使用其他 Vertex AI 服務建立及部署機器學習模型。那就繼續下一步吧!

6. 建立端對端機器學習管道

接下來,我們將建構第一個機器學習管道。在這個管道中,我們會使用 UCI Machine Learning 的Dry beans dataset,來源為:KOKLU, M. and OZKAN, I.A., (2020 年)《Multiclass Classification of ry Beans Using Computer Vision and Machine Learning Techniques》(使用電腦視覺和機器學習技術的多重分類分類)。《In Computers and Electronics in Agriculture》(電腦與電子產品分類),174, 105507。DOI

這是一個表格資料集,在管道中,我們會使用這份資料集訓練、評估及部署 AutoML 模型,讓模型根據豆子的特性將其分類為 7 種豆子類型之一。

這個管道會:

  • 在 中建立資料集
  • 使用 AutoML 訓練表格型分類模型
  • 取得此模型的評估指標
  • 依據評估指標,決定是否要在 Vertex Pipelines 中使用條件邏輯部署模型
  • 使用 Vertex Prediction 將模型部署至端點

列出的每個步驟都是一個元件,大部分的管道步驟都會透過本程式碼研究室先前匯入的 google_cloud_pipeline_components 程式庫,使用 Vertex AI 服務的預先建構元件。在本節中,我們會先定義一個自訂元件。接著,我們會使用預先建構的元件定義管道的其他步驟。預先建構的元件可讓您更輕鬆地存取 Vertex AI 服務,例如模型訓練和部署。

步驟 1:用於評估模型的自訂元件

模型訓練完成後,我們定義的自訂元件會用於管道結尾。這個元件會執行以下幾項操作:

  • 從已訓練的 AutoML 分類模型取得評估指標
  • 剖析指標並在 Vertex Pipelines UI 中顯示
  • 比較指標與門檻,判斷是否應部署模型

在定義元件之前,請先瞭解其輸入和輸出參數。這個管道會將 Cloud 專案中的部分中繼資料、訓練後的模型 (我們會稍後定義這個元件)、模型的評估指標,以及 thresholds_dict_str 做為輸入內容。我們會在執行管道時定義 thresholds_dict_str。就這項分類模型而言,這會是 ROC 曲線下面積的值,我們應部署模型。舉例來說,如果傳入 0.95,我們只想在指標高於 95% 時部署模型。

我們的評估元件會傳回字串,指出是否要部署模型。在筆記本儲存格中新增下列程式碼,建立這個自訂元件:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

步驟 2:新增 Google Cloud 預先建構的元件

在這個步驟中,我們會定義其餘的管線元件,並瞭解這些元件如何彼此搭配。首先,使用時間戳記定義管道執行作業的顯示名稱:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

然後將下列內容複製到新的筆記本儲存格:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

我們來看看這段程式碼發生了什麼事:

  • 首先,我們和先前的管道一樣,定義這個管道接收的輸入參數。由於這些值不依賴管道中其他步驟的輸出內容,因此我們需要手動設定。
  • 管道的其餘部分會使用一些預先建構的元件與 Vertex AI 服務互動:
    • TabularDatasetCreateOp 會在 Vertex AI 中,根據 Cloud Storage 或 BigQuery 中的資料集來源建立表格式資料集。在這個管道中,我們要透過 BigQuery 資料表網址傳遞資料。
    • AutoMLTabularTrainingJobRunOp 會啟動表格資料集的 AutoML 訓練工作。我們將一些設定參數傳送至這個元件,包括模型類型 (在此案例中為分類)、資料欄中的部分資料、訓練時間長度,以及指向該資料集的指標。請注意,如要將資料集傳入此元件,我們會透過 dataset_create_op.outputs["dataset"] 提供前一個元件的輸出內容
    • EndpointCreateOp 會在 Vertex AI 中建立端點。從這個步驟建立的端點會做為輸入內容傳遞至下一個元件
    • ModelDeployOp 會將指定模型部署到 Vertex AI 中的端點。在本例中,我們會使用先前步驟中建立的端點。還有其他設定選項可供使用,但我們在此提供要部署的端點機器類型和模型。我們會透過存取管道中訓練步驟的輸出內容,將模型傳入
  • 這個管道也使用條件邏輯,這是 Vertex Pipelines 的一項功能,可讓您定義條件,並根據該條件的結果建立不同的分支。請注意,在定義管道時,我們傳遞了 thresholds_dict_str 參數。這是我們用來決定是否將模型部署至端點的準確度門檻。為了實作這項功能,我們採用 KFP SDK 的 Condition 類別。我們傳入的條件是先前在本程式碼研究室中定義的自訂 eval 元件的輸出內容。如果這個條件為真,管道會繼續執行 deploy_op 元件。如果準確率未達到我們預先定義的門檻,管線就會停止,不會部署模型。

步驟 3:編譯並執行端對端機器學習管道

定義完整管道後,就可以編譯:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

接著,定義工作:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

最後,執行工作:

ml_pipeline_job.submit()

執行上述單元格後,請前往記錄中顯示的連結,在控制台中查看管道。這個管道的執行時間會超過一小時。大部分時間都花在 AutoML 訓練步驟上。已完成的管道如下所示:

已完成的 AutoML 管道

如果切換頂端的「展開成果」按鈕,您就能查看管道建立的不同成果詳細資料。舉例來說,如果您按一下 dataset 構件,就會看到已建立的 Vertex AI 資料集詳細資料。您可以點選這裡的連結,前往該資料集的頁面:

管道資料集

同樣地,如要查看自訂評估元件產生的指標視覺化資料,請按一下名為 metricsc 的構件。在資訊主頁右側,您會看到此模型的混淆矩陣:

指標視覺化

如要查看這個管道執行作業所建立的模型和端點,請前往「模型」區段,然後按一下名為 automl-beans 的模型。您應該會看到這個模型已部署至端點:

模型端點

您也可以按一下管道圖中的 endpoint 構件,即可存取這個頁面。

除了在控制台中查看管道圖表外,您也可以使用 Vertex Pipelines 進行Lineage Tracking。追蹤歷程,是指追蹤整個管道中建立的構件。這有助於我們瞭解產物是在何處建立,以及在整個機器學習工作流程中如何使用。舉例來說,如要查看在這個管道中建立的資料集追蹤階層,請按一下資料集成果,然後點選「View Lineage」

查看歷程

這會顯示這個構件用於所有位置:

系譜詳細資料

步驟 4:比較管道執行期間的指標

如果您多次執行這個管道,您可能會想比較不同執行作業的指標。您可以使用 aiplatform.get_pipeline_df() 方法存取執行中繼資料。我們會在此取得此管道的所有執行作業的中繼資料,並將其載入至 Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

這樣就完成實驗室了!

🎉 恭喜!🎉

您已瞭解如何使用 Vertex AI 執行下列作業:

  • 使用 Kubeflow Pipelines SDK 建構包含自訂元件的端對端管道
  • 在 Vertex Pipelines 上執行管道,並使用 SDK 啟動管道執行作業
  • 在控制台查看及分析 Vertex AI Pipelines 圖表
  • 使用預先建構的管道元件,將 Vertex AI 服務新增至管道
  • 排定週期性管道工作

如要進一步瞭解 Vertex 的其他部分,請參閱說明文件

7. 清理

為避免產生費用,建議您刪除在本研究室中建立的資源。

步驟 1:停止或刪除 Notebooks 執行個體

如果想繼續使用您在這個研究室中建立的筆記本,建議在不使用時將其關閉。在 Cloud 控制台的「Notebooks」(筆記本) UI 中,依序選取筆記本和「Stop」(停止)。如要將執行個體完全刪除,請選取 [Delete] (刪除)

停止執行個體

步驟 2:刪除端點

如要刪除您部署的端點,請前往 Vertex AI 控制台的「端點」部分,然後按一下「刪除」圖示:

刪除端點

然後在下列提示中點選「取消部署」

取消部署模型

最後,請前往控制台的「Models」部分,找出該模型,然後在右側的三點圖示選單中,按一下「Delete model」

刪除模型

步驟 3:刪除 Cloud Storage 值區

如要刪除儲存體值區,請使用 Cloud 控制台的「導覽」選單,前往「儲存體」頁面,選取所需值區,然後按一下「刪除」:

刪除儲存空間