Vertex Pipelines 소개

1. 개요

이 실습에서는 Vertex AI Pipelines를 사용하여 ML 파이프라인을 만들고 실행하는 방법을 배웁니다.

학습 내용

다음 작업을 수행하는 방법을 배우게 됩니다.

  • Kubeflow Pipelines SDK를 사용하여 확장 가능한 ML 파이프라인 빌드
  • 텍스트 입력을 받는 3단계 인트로 파이프라인 만들기 및 실행
  • AutoML 분류 모델을 학습, 평가, 배포하는 파이프라인 만들기 및 실행
  • google_cloud_pipeline_components 라이브러리를 통해 제공되는 사전 빌드된 구성요소를 사용하여 Vertex AI 서비스와 상호작용
  • Cloud Scheduler로 파이프라인 작업 예약

Google Cloud에서 이 실습을 진행하는 데 드는 총 비용은 약 $25입니다.

2. Vertex AI 소개

이 실습에서는 Google Cloud에서 제공되는 최신 AI 제품을 사용합니다. Vertex AI는 Google Cloud 전반의 ML 제품을 원활한 개발 환경으로 통합합니다. 예전에는 AutoML로 학습된 모델과 커스텀 모델은 별도의 서비스를 통해 액세스할 수 있었습니다. 새 서비스는 다른 새로운 제품과 함께 두 가지 모두를 단일 API로 결합합니다. 기존 프로젝트를 Vertex AI로 마이그레이션할 수도 있습니다.

모델 학습 및 배포 서비스 외에도 Vertex AI에는 Vertex AI Pipelines(이 실습의 핵심), Model Monitoring, Feature Store를 비롯한 다양한 MLOps 제품이 포함되어 있습니다. 아래 다이어그램에서 Vertex AI 제품을 모두 확인할 수 있습니다.

Vertex 제품 개요

의견이 있는 경우 지원 페이지를 참고하세요.

ML 파이프라인이 유용한 이유

자세히 알아보기 전에 먼저 파이프라인을 사용하려는 이유를 생각해 보겠습니다. 데이터 처리, 모델 학습, 초매개변수 조정, 평가, 모델 배포가 포함된 ML 워크플로를 빌드한다고 상상해 보세요. 각 단계에는 다양한 종속 항목이 있을 수 있어 전체 워크플로를 모놀리식으로 처리하면 관리가 힘들 수 있습니다. ML 프로세스를 확장하기 시작할 때 팀원이 워크플로를 실행하고 코드 개발에 참여하도록 팀원과 ML 워크플로를 공유하려는 경우를 생각해 보겠습니다. 신뢰할 수 있고 재현 가능한 프로세스가 없으면 이 작업이 어려워질 수 있습니다. 파이프라인을 사용하면 ML 프로세스의 각 단계는 자체 컨테이너가 됩니다. 이를 통해 단계를 독립적으로 개발하고 재현 가능한 방식으로 각 단계의 입력 및 출력을 추적할 수 있습니다. 새로운 학습 데이터가 제공될 때 파이프라인 실행을 시작하는 것과 같이 Cloud 환경의 다른 이벤트를 기반으로 파이프라인 실행을 예약하거나 트리거할 수도 있습니다.

요약: 파이프라인을 사용하면 ML 워크플로를 자동화하고 재현할 수 있습니다.

3. 클라우드 환경 설정

이 Codelab을 실행하려면 결제가 사용 설정된 Google Cloud Platform 프로젝트가 필요합니다. 프로젝트를 만들려면 여기의 안내를 따르세요.

1단계: Cloud Shell 시작

이 실습에서는 Google의 클라우드에서 실행되는 가상 머신이 호스팅하는 명령어 인터프리터인 Cloud Shell 세션에서 작업합니다. 이 섹션은 사용자의 컴퓨터에서 로컬로 쉽게 실행할 수도 있지만, Cloud Shell을 사용하면 모든 사람이 일관되고 재현 가능한 환경에 액세스할 수 있습니다. 실습을 마치고 로컬 컴퓨터에서 이 섹션을 다시 시도하셔도 됩니다.

Cloud Shell 승인

Cloud Shell 활성화

Cloud 콘솔의 오른쪽 상단에서 아래 버튼을 클릭하여 Cloud Shell을 활성화합니다.

Cloud Shell 활성화

이전에 Cloud Shell을 시작한 적이 없는 경우 기능을 설명하는 중간 화면 (스크롤해야 볼 수 있는 부분)이 표시됩니다. 이 경우 계속을 클릭합니다 (이후 다시 표시되지 않음). 이 일회성 화면은 다음과 같습니다.

Cloud Shell 설정

Cloud Shell을 프로비저닝하고 연결하는 데 몇 분 정도만 걸립니다.

Cloud Shell init

이 가상 머신에는 필요한 개발 도구가 모두 포함되어 로드됩니다. 영구적인 5GB 홈 디렉터리를 제공하고 Google Cloud에서 실행되므로 네트워크 성능과 인증이 크게 개선됩니다. 이 Codelab에서 대부분의 작업은 브라우저나 Chromebook만 사용하여 수행할 수 있습니다.

Cloud Shell에 연결되면 인증이 완료되었고 프로젝트가 해당 프로젝트 ID로 이미 설정된 것을 볼 수 있습니다.

Cloud Shell에서 다음 명령어를 실행하여 인증되었는지 확인합니다.

gcloud auth list

명령어 출력에 다음과 같은 내용이 표시됩니다.

Cloud Shell 출력

Cloud Shell에서 다음 명령어를 실행하여 gcloud 명령어가 프로젝트를 알고 있는지 확인합니다.

gcloud config list project

명령어 결과

[core]
project = <PROJECT_ID>

또는 다음 명령어로 설정할 수 있습니다.

gcloud config set project <PROJECT_ID>

명령어 결과

Updated property [core/project].

Cloud Shell에는 현재 Cloud 프로젝트의 이름이 포함된 GOOGLE_CLOUD_PROJECT를 비롯하여 몇 가지 환경 변수가 있습니다. 이는 이 실습에서 여러 차례 사용됩니다. 다음을 실행하여 환경 변수를 확인할 수 있습니다.

echo $GOOGLE_CLOUD_PROJECT

2단계: API 사용 설정

이러한 서비스가 필요한 경우와 그 이유는 이후 단계에서 설명하겠습니다. 지금 단계에는 다음 명령어를 실행하여 프로젝트에 Compute Engine, Container Registry, Vertex AI 서비스에 대한 액세스 권한을 부여해 보겠습니다.

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

그러면 다음과 비슷한 성공 메시지가 표시될 것입니다.

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

3단계: Cloud Storage 버킷 만들기

Vertex AI에서 학습 작업을 실행하려면 저장된 모델 애셋을 저장할 스토리지 버킷이 필요합니다. 버킷은 리전을 기반으로 해야 합니다. 여기서는 us-central 리전을 사용하고 있지만 다른 리전을 사용해도 됩니다. 단, 실습 전반에서 동일한 리전을 사용해야 합니다. 이미 버킷이 있는 경우 이 단계를 건너뛸 수 있습니다.

Cloud Shell 터미널에서 다음 명령어를 실행하여 버킷을 만듭니다.

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

다음으로 컴퓨팅 서비스 계정에 이 버킷에 대한 액세스 권한을 부여합니다. 이렇게 하면 이 버킷에 파일을 쓰는 데 필요한 권한이 Vertex Pipelines에 부여됩니다. 다음 명령어를 실행하여 이 권한을 추가합니다.

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

4단계: Vertex AI Workbench 인스턴스 만들기

Cloud 콘솔의 Vertex AI 섹션에서 'Workbench'를 클릭합니다.

Vertex AI 메뉴

여기에서 사용자 관리 노트북 내에서 새 노트북을 클릭합니다.

새 노트북 만들기

그런 다음 GPU가 없는TensorFlow Enterprise 2.3 (LTS 사용) 인스턴스 유형을 선택합니다.

TFE 인스턴스

기본 옵션을 사용하고 만들기를 클릭합니다.

5단계: 노트북 열기

인스턴스가 생성되면 JupyterLab 열기를 선택합니다.

노트북 열기

4. Vertex Pipelines 설정

Vertex Pipeline을 사용하려면 몇 가지 추가 라이브러리를 설치해야 합니다.

  • Kubeflow Pipelines: 파이프라인을 빌드하는 데 사용할 SDK입니다. Vertex Pipelines는 Kubeflow Pipelines 또는 TFX를 사용하여 빌드된 파이프라인 실행을 지원합니다.
  • Google Cloud 파이프라인 구성요소: 이 라이브러리는 파이프라인 단계에서 Vertex AI 서비스와 더 쉽게 상호작용할 수 있도록 사전 빌드된 구성요소를 제공합니다.

1단계: Python 노트북 만들기 및 라이브러리 설치

먼저 노트북 인스턴스의 런처 메뉴에서 Python 3을 선택하여 노트북을 만듭니다.

Python3 노트북 만들기

노트북 인스턴스의 왼쪽 상단에 있는 + 기호를 클릭하여 런처 메뉴에 액세스할 수 있습니다.

이 실습에서 사용할 두 서비스를 모두 설치하려면 먼저 노트북 셀에 사용자 플래그를 설정한 다음

USER_FLAG = "--user"

노트북에서 다음을 실행합니다.

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

다음 패키지를 설치한 후 커널을 다시 시작해야 합니다.

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

마지막으로 패키지를 올바르게 설치했는지 확인하세요. KFP SDK 버전은 1.8 이상이어야 합니다.

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

2단계: 프로젝트 ID 및 버킷 설정하기

이 실습 전체에서 Cloud 프로젝트 ID와 이전에 만든 버킷을 참조합니다. 그런 다음 Cloud 프로젝트 ID와 이전에 만든 버킷마다 변수를 만듭니다.

프로젝트 ID를 모르는 경우 다음을 실행하여 프로젝트 ID를 확인할 수 있습니다.

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

그렇지 않은 경우 여기에서 설정하세요.

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

그런 다음 버킷 이름을 저장할 변수를 만듭니다. 이 실습에서 변수를 만든 경우 다음이 작동합니다. 그렇지 않으면 수동으로 이를 설정해야 합니다.

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

3단계: 라이브러리 가져오기

다음을 추가하여 이 Codelab에서 사용할 라이브러리를 가져옵니다.

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

4단계: 상수 정의

파이프라인을 빌드하기 전 마지막으로 해야 할 일은 상수 변수를 정의하는 것입니다. PIPELINE_ROOT는 파이프라인에서 생성된 아티팩트가 작성될 Cloud Storage 경로입니다. 여기서는 us-central1을 리전으로 사용하고 있지만, 버킷을 만들 때 다른 리전을 사용했다면 아래 코드에서 REGION 변수를 업데이트하세요.

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

위의 코드를 실행한 후에는 파이프라인의 루트 디렉터리가 표시되어야 합니다. 이는 파이프라인의 아티팩트가 작성될 Cloud Storage 위치이며 gs://YOUR-BUCKET-NAME/pipeline_root/ 형식입니다.

5. 첫 번째 파이프라인 만들기

Vertex Pipeline의 작동 방식에 익숙해지기 위해 먼저 KFP SDK를 사용하여 짧은 파이프라인을 만들어 보겠습니다. 이 파이프라인은 ML과 관련된 어떤 작업도 하지 않지만(곧 ML 관련 작업도 하므로 걱정하지 마세요) 배우기 위해 사용합니다.

  • KFP SDK에서 커스텀 구성요소를 만드는 방법
  • Vertex Pipelines에서 파이프라인을 실행하고 모니터링하는 방법

제품 이름과 그림 이모티콘 설명의 두 가지 출력을 사용하여 문장을 출력하는 파이프라인을 만들어 보겠습니다. 이 파이프라인은 다음 세 가지 구성요소로 구성됩니다.

  • product_name: 이 구성요소는 제품 이름 (또는 원하는 모든 명사)을 입력으로 사용하고 해당 문자열을 출력으로 반환합니다.
  • emoji: 이 구성요소는 그림 이모티콘의 텍스트 설명을 가져와 그림 이모티콘으로 변환합니다. 예를 들어 ✨의 텍스트 코드는 'sparkles'입니다. 이 구성요소는 이모티콘 라이브러리를 사용하여 파이프라인에서 외부 종속 항목을 관리하는 방법을 보여줍니다.
  • build_sentence: 이 마지막 구성요소는 이전 두 개의 출력을 사용하여 그림 이모티콘을 사용하는 문장을 빌드합니다. 예를 들어 'Vertex Pipelines는 ✨'과 같이 출력됩니다.

코딩을 시작해 보겠습니다.

1단계: Python 함수 기반 구성요소 만들기

KFP SDK를 사용하여 Python 함수를 기반으로 구성요소를 만들 수 있습니다. 이를 첫 번째 파이프라인의 3가지 구성요소에 사용해 보겠습니다. 먼저 product_name 구성요소를 빌드합니다. 이 구성요소는 단순히 문자열을 입력으로 사용하고 해당 문자열을 반환합니다. 노트북에 다음을 추가하세요.

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

여기에서 구문을 자세히 살펴보겠습니다.

  • @component 데코레이터는 파이프라인이 실행될 때 이 함수를 구성요소로 컴파일합니다. 커스텀 구성요소를 작성할 때마다 이 함수를 사용합니다.
  • base_image 매개변수는 이 구성요소가 사용할 컨테이너 이미지를 지정합니다.
  • output_component_file 매개변수는 선택사항이며 컴파일된 구성요소를 작성할 yaml 파일을 지정합니다. 셀을 실행한 후에는 해당 파일이 노트북 인스턴스에 작성된 것이 표시되어야 합니다. 이 구성요소를 다른 사용자와 공유하려면 생성된 yaml 파일을 다른 사용자에게 전송하고 다음을 사용하여 해당 파일을 로드하도록 합니다.
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 함수 정의 뒤의 -> str는 이 구성요소의 출력 유형을 지정합니다.

2단계: 2개의 추가 구성요소 만들기

파이프라인을 완료하기 위해 추가 구성요소를 2개 만들어 보겠습니다. 정의할 첫 번째 구성요소는 문자열을 입력으로 사용하고, 이 문자열에 해당하는 이모티콘이 있으면 문자열을 해당 이모티콘으로 변환합니다. 전달된 입력 텍스트와 결과 이모티콘이 있는 튜플을 반환합니다.

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

이 구성요소는 이전 구성요소보다 조금 더 복잡합니다. 새로운 내용을 자세히 살펴보겠습니다.

  • packages_to_install 매개변수는 이 컨테이너의 외부 라이브러리 종속 항목을 구성요소에 알립니다. 이 예에서는 emoji라는 라이브러리를 사용합니다.
  • 이 구성요소는 Outputs라는 NamedTuple를 반환합니다. 이 튜플의 각 문자열에는 emoji_textemoji 키가 있습니다. 다음 구성요소에서 이러한 키를 사용하여 출력에 액세스합니다.

이 파이프라인의 최종 구성요소는 처음 2개의 출력을 조합하여 하나의 문자열을 반환합니다.

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

이 구성요소가 사용자가 정의한 이전 단계의 출력을 어떻게 사용하는지 궁금하시죠? 좋은 질문입니다. 다음 단계에서 총정리해보겠습니다.

3단계: 구성요소를 모두 파이프라인에 입력하기

위에서 정의한 구성요소 정의에서 파이프라인 단계를 만들기 위해 파이프라인 정의에 사용할 수 있는 팩토리 함수를 만들었습니다. 파이프라인을 설정하려면 @pipeline 데코레이터를 사용하여 파이프라인의 이름과 설명을 지정하고 파이프라인의 아티팩트가 작성될 루트 경로를 제공하세요. 아티팩트란 파이프라인에서 생성된 모든 출력 파일을 의미합니다. 이 인트로 파이프라인에서는 아무 파일도 생성되지 않지만, 다음 파이프라인에서는 파일이 생성됩니다.

다음 코드 블록에서는 intro_pipeline 함수를 정의합니다. 여기에서 초기 파이프라인 단계에 대한 입력 및 단계가 서로 연결되는 방식을 지정합니다.

  • product_task는 제품 이름을 입력으로 사용합니다. 여기서는 'Vertex Pipelines'를 전달하지만 이는 원하는 대로 변경할 수 있습니다.
  • emoji_task는 그림 이모티콘의 텍스트 코드를 입력으로 사용합니다. 이것도 원하는 대로 변경할 수 있습니다. 예를 들어 'party_face'는 🥳 이모티콘을 나타냅니다. 이 구성요소와 product_task 구성요소에는 입력을 제공하는 단계가 모두 없기 때문에 파이프라인 정의 시 이러한 항목에 대한 입력을 수동으로 지정합니다.
  • 파이프라인의 마지막 단계인 consumer_task에는 3가지 입력 매개변수가 있습니다.
    • product_task의 출력 이 단계에서는 하나의 출력만 생성되므로 product_task.output를 통해 참조할 수 있습니다.
    • emoji_task 단계의 emoji 출력입니다. 위에서 출력 매개변수에 이름을 지정한 emoji 구성요소를 참고하세요.
    • 마찬가지로 emoji 구성요소의 emoji_text라는 이름의 출력입니다. 파이프라인이 이모티콘과 일치하지 않는 텍스트를 전달할 경우 이 텍스트를 사용하여 문장을 구성합니다.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

4단계: 파이프라인 컴파일 및 실행

파이프라인이 정의되었으므로 파이프라인을 컴파일할 준비가 되었습니다. 다음과 같이 파이프라인을 실행하는 데 사용할 JSON 파일을 생성합니다.

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

다음으로 TIMESTAMP 변수를 만듭니다. 작업 ID에 다음을 사용합니다.

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

그런 다음 파이프라인 작업을 정의합니다.

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

마지막으로 작업을 실행하여 새 파이프라인 실행을 만듭니다.

job.submit()

이 셀을 실행하면 콘솔에서 파이프라인 실행을 볼 수 있는 링크가 포함된 로그가 표시됩니다.

파이프라인 작업 로그

해당 링크로 이동합니다. 완료되면 파이프라인이 다음과 같이 표시됩니다.

완료된 인트로 파이프라인

이 파이프라인은 실행하는 데 5~6분이 소요됩니다. 완료되면 build-sentence 구성요소를 클릭하여 최종 출력을 확인할 수 있습니다.

인트로 파이프라인 출력

이제 KFP SDK 및 Vertex Pipelines의 작동 방식을 숙지했으므로 다른 Vertex AI 서비스를 사용하여 ML 모델을 만들고 배포하는 파이프라인을 빌드할 수 있습니다. 시작해 볼까요?

6. 엔드 투 엔드 ML 파이프라인 만들기

이제 첫 번째 ML 파이프라인을 빌드할 시간입니다. 이 파이프라인에서는 KOKLU, M. and OZKAN, I.A.의 UCI 머신러닝 Dry beans dataset을 사용합니다. (2020), '컴퓨터 비전 및 머신러닝 기법을 사용한 건조 콩의 다중 클래스 분류'. 농업용 컴퓨터 및 전자, 174, 105507. DOI.

이것은 테이블 형식 데이터 세트입니다. 이 파이프라인에서는 데이터 세트를 사용하여 콩을 특성에 따라 7가지 유형 중 하나로 분류하는 AutoML 모델을 학습, 평가, 배포합니다.

이 파이프라인은 다음을 실행합니다.

  • 에서 데이터 세트를 만듭니다.
  • AutoML로 테이블 형식 분류 모델 학습
  • 이 모델의 평가 측정항목 가져오기
  • 평가 측정항목을 기반으로 Vertex Pipelines에서 조건부 로직을 사용하여 모델을 배포할지 결정합니다.
  • Vertex Prediction을 사용하여 모델을 엔드포인트에 배포

설명된 각 단계는 구성요소가 됩니다. 대부분의 파이프라인 단계에서는 이 Codelab에서 앞서 가져온 google_cloud_pipeline_components 라이브러리를 통해 Vertex AI 서비스용 사전 빌드된 구성요소를 사용합니다. 이 섹션에서는 먼저 맞춤 구성요소 하나를 정의합니다. 그런 다음 사전 빌드된 구성요소를 사용하여 나머지 파이프라인 단계를 정의합니다. 사전 빌드된 구성요소를 사용하면 모델 학습 및 배포와 같은 Vertex AI 서비스에 더 쉽게 액세스할 수 있습니다.

1단계: 모델 평가를 위한 맞춤 구성요소

정의할 커스텀 구성요소는 모델 학습이 완료된 후 파이프라인 끝부분에서 사용됩니다. 이 구성요소는 다음 작업을 실행합니다.

  • 학습된 AutoML 분류 모델에서 평가 측정항목 가져오기
  • 측정항목을 파싱하고 Vertex Pipelines UI에서 렌더링
  • 측정항목을 기준점과 비교하여 모델을 배포해야 하는지 결정

구성요소를 정의하기 전에 입력 및 출력 매개변수를 이해해 보겠습니다. 이 파이프라인은 Cloud 프로젝트의 일부 메타데이터, 학습된 결과 모델 (이 구성요소는 나중에 정의), 모델의 평가 측정항목, thresholds_dict_str를 입력으로 사용합니다. thresholds_dict_str는 파이프라인을 실행할 때 정의합니다. 이 분류 모델의 경우 모델을 배포해야 하는 ROC 곡선 값 아래의 영역이 됩니다. 예를 들어 0.95를 전달하면 이 측정항목이 95%를 초과하는 경우에만 파이프라인에서 모델을 배포하도록 지정할 수 있습니다.

평가 구성요소는 모델을 배포할지 여부를 나타내는 문자열을 반환합니다. 노트북 셀에 다음을 추가하여 이 맞춤 구성요소를 만듭니다.

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

2단계: Google Cloud 사전 빌드된 구성요소 추가

이 단계에서는 나머지 파이프라인 구성요소를 정의하고 서로 어떻게 연결되는지 확인합니다. 먼저 타임스탬프를 사용하여 파이프라인 실행의 표시 이름을 정의합니다.

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

그런 다음 다음을 새 노트북 셀에 복사합니다.

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

이 코드에서 어떤 일이 일어나는지 살펴보겠습니다.

  • 먼저 이전 파이프라인에서와 마찬가지로 이 파이프라인이 사용하는 입력 매개변수를 정의합니다. 파이프라인의 다른 단계의 출력에 종속되지 않으므로 이러한 값은 수동으로 설정해야 합니다.
  • 나머지 파이프라인은 Vertex AI 서비스와 상호작용하기 위해 몇 가지 사전 빌드된 구성요소를 사용합니다.
    • TabularDatasetCreateOp는 Cloud Storage 또는 BigQuery의 데이터 세트 소스를 바탕으로 Vertex AI에서 테이블 형식의 데이터 세트를 만듭니다. 이 파이프라인에서는 BigQuery 테이블 URL을 통해 데이터를 전달합니다.
    • AutoMLTabularTrainingJobRunOp가 테이블 형식 데이터 세트에 대한 AutoML 학습 작업을 시작합니다. 모델 유형 (여기서는 분류), 열의 일부 데이터, 학습 실행 기간, 데이터 세트에 대한 포인터를 비롯한 몇 가지 구성 매개변수를 이 구성요소에 전달합니다. 이 구성요소에 데이터 세트를 전달하기 위해 dataset_create_op.outputs["dataset"]를 통해 이전 구성요소의 출력을 제공합니다.
    • EndpointCreateOp는 Vertex AI에 엔드포인트를 만듭니다. 이 단계에서 생성된 엔드포인트는 다음 구성요소에 입력으로 전달됩니다.
    • ModelDeployOp는 지정된 모델을 Vertex AI의 엔드포인트에 배포합니다. 이 경우 이전 단계에서 만든 엔드포인트를 사용합니다. 사용 가능한 추가 구성 옵션이 있지만 여기서는 배포할 엔드포인트 머신 유형과 모델을 제공합니다. 파이프라인의 학습 단계 출력에 액세스하여 모델을 전달합니다.
  • 이 파이프라인은 조건을 정의하고 해당 조건의 결과에 따라 여러 브랜치를 사용할 수 있는 Vertex Pipelines의 기능인 조건부 로직도 사용합니다. 파이프라인을 정의할 때 thresholds_dict_str 매개변수를 전달했습니다. 이는 모델을 엔드포인트에 배포할지 여부를 결정하는 데 사용되는 정확성 기준점입니다. 이를 구현하려면 KFP SDK의 Condition 클래스를 사용합니다. 전달하는 조건은 이 Codelab의 앞부분에서 정의한 맞춤 eval 구성요소의 출력입니다. 이 조건이 true이면 파이프라인은 deploy_op 구성요소를 계속 실행합니다. 정확성이 사전 정의된 기준점을 충족하지 않으면 파이프라인이 여기에서 중지되고 모델이 배포되지 않습니다.

3단계: 엔드 투 엔드 ML 파이프라인 컴파일 및 실행

전체 파이프라인이 정의되었으므로 이제 컴파일할 차례입니다.

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

다음으로 작업을 정의합니다.

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

마지막으로 작업을 실행합니다.

ml_pipeline_job.submit()

위의 셀을 실행한 후 로그에 표시된 링크로 이동하여 콘솔에서 파이프라인을 확인합니다. 이 파이프라인을 실행하는 데 1시간 조금 넘게 걸립니다. 대부분의 시간이 AutoML 학습 단계에서 소요됩니다. 완성된 파이프라인은 다음과 같습니다.

AutoML 파이프라인 완료

상단의 '아티팩트 펼치기' 버튼을 전환하면 파이프라인에서 생성된 다양한 아티팩트의 세부정보를 확인할 수 있습니다. 예를 들어 dataset 아티팩트를 클릭하면 생성된 Vertex AI 데이터 세트에 관한 세부정보가 표시됩니다. 여기의 링크를 클릭하면 해당 데이터 세트의 페이지로 이동할 수 있습니다.

파이프라인 데이터 세트

마찬가지로 커스텀 평가 구성요소의 결과로 표시되는 측정항목 시각화를 보려면 metricsc라는 아티팩트를 클릭합니다. 대시보드 오른쪽에 이 모델의 혼동 행렬이 표시됩니다.

측정항목 시각화

이 파이프라인 실행에서 생성된 모델과 엔드포인트를 보려면 모델 섹션으로 이동하여 automl-beans라는 모델을 클릭합니다. 엔드포인트에 배포된 모델이 표시됩니다.

Model-endpoint

파이프라인 그래프에서 엔드포인트 아티팩트를 클릭하여 이 페이지에 액세스할 수도 있습니다.

콘솔에서 파이프라인 그래프를 확인하는 것 외에도 Vertex Pipelines를 사용하여 계보 추적을 수행할 수 있습니다. 계보 추적이란 파이프라인 전반에서 생성된 아티팩트를 추적하는 것을 의미합니다. 이를 통해 아티팩트가 생성된 위치와 ML 워크플로 전반에서 아티팩트가 사용되는 방식을 파악할 수 있습니다. 예를 들어 이 파이프라인에서 생성된 데이터 세트의 계보 추적을 보려면 데이터 세트 아티팩트를 클릭한 다음 계보 보기를 클릭합니다.

계보 보기

이 아티팩트가 사용되는 모든 위치가 표시됩니다.

계보 세부정보

4단계: 파이프라인 실행의 측정항목 비교

이 파이프라인을 여러 번 실행하는 경우 실행 간에 측정항목을 비교하는 것이 좋습니다. aiplatform.get_pipeline_df() 메서드를 사용하여 실행 메타데이터에 액세스할 수 있습니다. 여기서는 이 파이프라인의 모든 실행에 대한 메타데이터를 가져와 Pandas DataFrame에 로드합니다.

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

이제 실습을 완료했습니다.

🎉 수고하셨습니다. 🎉

Vertex AI를 사용하여 다음을 수행하는 방법을 배웠습니다.

  • Kubeflow Pipelines SDK를 사용하여 맞춤 구성요소로 엔드 투 엔드 파이프라인 빌드
  • Vertex Pipelines에서 파이프라인을 실행하고 SDK로 파이프라인 실행 시작
  • 콘솔에서 Vertex Pipelines 그래프 보기 및 분석
  • 사전 빌드된 파이프라인 구성요소를 사용하여 파이프라인에 Vertex AI 서비스 추가
  • 반복되는 파이프라인 작업 예약

Vertex의 다른 부분에 대해 자세히 알아보려면 문서를 확인하세요.

7. 삭제

요금이 청구되지 않도록 하려면 이 실습에서 만든 리소스를 삭제하는 것이 좋습니다.

1단계: Notebooks 인스턴스 중지 또는 삭제하기

이 실습에서 만든 노트북을 계속 사용하려면 사용하지 않을 때 노트북을 끄는 것이 좋습니다. Cloud 콘솔의 Notebooks UI에서 노트북을 선택한 다음 중지를 선택합니다. 인스턴스를 완전히 삭제하려면 삭제를 선택합니다.

인스턴스 중지

2단계: 엔드포인트 삭제하기

배포한 엔드포인트를 삭제하려면 Vertex AI 콘솔의 엔드포인트 섹션으로 이동하여 삭제 아이콘을 클릭합니다.

엔드포인트 삭제

그런 다음 다음 메시지에서 Undeploy(배포 취소)를 클릭합니다.

모델 배포 취소

마지막으로 콘솔의 모델 섹션으로 이동하여 해당 모델을 찾은 후 오른쪽의 점 3개로 된 메뉴에서 모델 삭제를 클릭합니다.

모델 삭제

3단계: Cloud Storage 버킷 삭제

스토리지 버킷을 삭제하려면 Cloud 콘솔의 탐색 메뉴를 사용하여 스토리지로 이동하고 버킷을 선택하고 '삭제'를 클릭합니다.

스토리지 삭제