Introdução ao Vertex Pipelines

1. Visão geral

Neste laboratório, você vai aprender a criar e executar pipelines de ML com o Vertex Pipelines.

Conteúdo do laboratório

Você vai aprender a:

  • usar o SDK do Kubeflow Pipelines para criar pipelines de ML escalonáveis;
  • criar e executar um pipeline introdutório de três etapas que recebe uma entrada de texto;
  • criar e executar um pipeline que treina, avalia e implanta um modelo de classificação do AutoML;
  • Usar componentes pré-criados para interagir com serviços da Vertex AI, usando a biblioteca google_cloud_pipeline_components
  • agendar um job de pipeline com o Cloud Scheduler.

O custo total da execução deste laboratório no Google Cloud é de aproximadamente US$25.

2. Introdução à Vertex AI

Este laboratório usa a mais nova oferta de produtos de IA disponível no Google Cloud. A Vertex AI integra as ofertas de ML do Google Cloud em uma experiência de desenvolvimento intuitiva. Anteriormente, modelos treinados com o AutoML e modelos personalizados eram acessíveis por serviços separados. A nova oferta combina ambos em uma única API, com outros novos produtos. Você também pode migrar projetos existentes para a Vertex AI.

Além de oferecer treinamento de modelos e serviços de implantação, a Vertex AI também inclui uma variedade de produtos de MLOps, como o Vertex Pipelines (o assunto desse laboratório), Monitoramento de modelos, Feature Store e outros. Confira todos as ofertas de produtos da Vertex AI no diagrama abaixo.

Visão geral do produto Vertex

Se você tiver algum feedback, acesse a página de suporte.

Por que pipelines de ML são úteis?

Antes de começar, primeiro entenda em quais situações você usaria um pipeline. Imagine que você está criando um fluxo de trabalho de ML que inclui processamento de dados, treinamento de modelos, ajuste de hiperparâmetros, avaliação e implantação de modelos. Cada uma dessas etapas pode ter dependências diferentes, o que pode ser difícil de administrar se você tratar o fluxo inteiro como algo monolítico. Ao começar a escalonar seu processo de ML, você pode querer compartilhar seu fluxo de trabalho de ML com outras pessoas da sua equipe para que elas possam executar e colaborar com código. Sem um processo confiável e possível de ser reproduzido, isso pode ser difícil. Com pipelines, cada etapa do seu processo de ML é um contêiner próprio, o que permite que você desenvolva as etapas independentemente e rastreie a entrada e a saída de cada etapa de maneira possível de ser reproduzida. Você também pode agendar ou acionar execuções do seu pipeline com base em outros eventos do seu ambiente do Cloud, como iniciar uma execução de pipeline quando novos dados de treinamento estiverem disponíveis.

Resumo: os pipelines ajudam a automatizar e reproduzir seu fluxo de trabalho de ML.

3. configure o ambiente do Cloud

Para executar este codelab, você vai precisar de um projeto do Google Cloud Platform com o faturamento ativado. Para criar um projeto, siga estas instruções.

Etapa 1: iniciar o Cloud Shell

Neste laboratório, você vai trabalhar em uma sessão do Cloud Shell, que é um interpretador de comandos hospedado por uma máquina virtual em execução na nuvem do Google. A sessão também pode ser executada localmente no seu computador, mas se você usar o Cloud Shell, todas as pessoas vão ter acesso a uma experiência reproduzível em um ambiente consistente. Após concluir o laboratório, é uma boa ideia testar a sessão no seu computador.

Autorizar o Cloud Shell

Ativar o Cloud Shell

No canto superior direito do console do Cloud, clique no botão abaixo para Ativar o Cloud Shell:

Ativar o Cloud Shell

Se você nunca iniciou o Cloud Shell antes, uma tela intermediária (abaixo da dobra) será exibida com a descrição dele. Se esse for o caso, clique em Continuar (e você não verá mais esse aviso). Esta é a aparência dessa tela única:

Configuração do Cloud Shell

Leva apenas alguns instantes para provisionar e se conectar ao Cloud Shell.

Inicial do Cloud Shell

Essa máquina virtual tem todas as ferramentas de desenvolvimento necessárias. Ela oferece um diretório principal persistente de 5 GB, além de ser executada no Google Cloud. Isso aprimora o desempenho e a autenticação da rede. Praticamente todo o seu trabalho neste codelab pode ser feito em um navegador ou no seu Chromebook.

Depois de se conectar ao Cloud Shell, você já estará autenticado e o projeto já estará configurado com seu ID do projeto.

Execute o seguinte comando no Cloud Shell para confirmar se a conta está autenticada:

gcloud auth list

A resposta ao comando deve ser parecida com esta:

Saída do Cloud Shell

Execute o seguinte comando no Cloud Shell para confirmar que o comando gcloud sabe sobre seu projeto:

gcloud config list project

Resposta ao comando

[core]
project = <PROJECT_ID>

Se o projeto não estiver configurado, configure-o usando este comando:

gcloud config set project <PROJECT_ID>

Resposta ao comando

Updated property [core/project].

O Cloud Shell tem algumas variáveis de ambiente, incluindo GOOGLE_CLOUD_PROJECT, que contém o nome do nosso projeto atual do Cloud. Vamos usar isso em várias partes deste laboratório. É possível ver essa variável ao executar:

echo $GOOGLE_CLOUD_PROJECT

Etapa 2: ativar as APIs

Nas próximas etapas, você vai saber onde e por que esses serviços são necessários. Por enquanto, execute este comando para dar ao seu projeto acesso aos serviços do Compute Engine, Container Registry e Vertex AI:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

Uma mensagem semelhante a esta vai aparecer:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Etapa 3: criar um bucket do Cloud Storage

Para executar um job de treinamento na Vertex AI, vamos precisar de um bucket de armazenamento para armazenar nossos recursos de modelo salvos. O bucket precisa ser regional. Estamos usando a us-central aqui, mas você pode usar outra região, basta fazer a substituição ao longo do laboratório. Se você já tem um bucket, pule esta etapa.

Execute os comandos a seguir no terminal do Cloud Shell para criar um bucket:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Depois, vamos conceder à conta de serviço do Compute acesso a esse bucket. Isso garante que o Vertex Pipelines tenha as permissões necessárias para gravar arquivos no bucket. Execute o comando a seguir para adicionar essa permissão:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Etapa 4: criar uma instância do Vertex AI Workbench

Na seção Vertex AI do Console do Cloud, clique em "Workbench":

Menu da Vertex AI

Em Notebooks gerenciados pelo usuário, clique em Novo notebook:

Criar novo notebook

Em seguida, selecione o tipo de instância TensorFlow Enterprise 2.3 (com LTS) sem GPUs:

Instância TFE

Use as opções padrão e clique em Criar.

Etapa 5: abrir seu notebook

Depois que a instância for criada, selecione Abrir JupyterLab:

Abrir Notebook

4. Configuração do Vertex Pipelines

É preciso instalar algumas bibliotecas adicionais para usar o Vertex Pipelines:

  • Kubeflow Pipelines: o SDK que vamos usar para criar o pipeline. O Vertex Pipelines suporta pipelines em execução criados com o Kubeflow Pipelines ou o TFX.
  • Componentes de pipeline do Google Cloud: essa biblioteca oferece componentes pré-criados que facilitam a interação com os serviços da Vertex AI nas etapas do seu pipeline.

Etapa 1: criar o notebook Python e instalar as bibliotecas

Primeiro, no menu inicial da sua instância do Notebook, crie um notebook selecionando Python 3:

Criar notebook Python3

Para acessar o menu inicial, clique no sinal de + no canto superior esquerdo da sua instância do notebook.

Para instalar os serviços que vamos usar neste laboratório, primeiro configure a sinalização do usuário em uma célula do notebook:

USER_FLAG = "--user"

Em seguida, execute o seguinte no seu notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

Depois de instalar esses pacotes, será necessário reiniciar o kernel:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Por fim, verifique se os pacotes foram instalados corretamente. A versão do SDK do KFP deve ser a 1.8 ou mais recente:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Etapa 2: configurar o ID do projeto e o bucket

Neste laboratório, você vai fazer referência ao ID do projeto do Cloud e ao bucket que você criou anteriormente. A seguir, vamos criar variáveis para cada um deles.

Se não souber o ID do projeto, você pode consegui-lo executando o seguinte:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Caso contrário, defina-o aqui:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Depois, crie uma variável para armazenar o nome do seu bucket. Se você criou neste laboratório, as próximas etapas vão funcionar. Caso contrário, configure manualmente o que é mostrado a seguir:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Etapa 3: importar as bibliotecas

Adicione o código abaixo para importar as bibliotecas que vamos usar neste codelab:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

Etapa 4: definir as constantes

Por último, precisamos definir algumas variáveis constantes antes de criar o pipeline. PIPELINE_ROOT é o caminho do Cloud Storage em que os artefatos criados pelo pipeline serão gravados. Estamos usando us-central1 como a região aqui, mas se você usou uma região diferente quando criou o bucket, atualize a variável REGION no código abaixo:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

Depois de executar o código acima, você verá o diretório raiz do seu pipeline. Esse é o local do Cloud Storage em que os artefatos do seu pipeline serão gravados. Ele estará no formato gs://YOUR-BUCKET-NAME/pipeline_root/

5. Como criar seu primeiro pipeline

Para se familiarizar com o funcionamento do Vertex Pipelines, primeiro vamos criar um pipeline curto usando o SDK do KFP. Esse pipeline não faz nada relacionado a ML (não se preocupe, vamos chegar lá!). Este exercício ensina a você o seguinte:

  • Como criar componentes personalizados no SDK do KFP
  • Como executar e monitorar um pipeline no Vertex Pipelines

Vamos criar um pipeline que mostra uma sentença usando duas saídas: um nome de produto e uma descrição de emoji. O pipeline vai consistir de três componentes:

  • product_name: este componente usa um nome de produto (ou qualquer substantivo que você queira) como entrada e retorna essa string como a saída
  • emoji: este componente usa a descrição em texto de um emoji e a converte no emoji. Por exemplo, o código de texto de ✨ é "sparkles". Este componente usa uma biblioteca de emojis para mostrar como gerenciar dependências externas no seu pipeline
  • build_sentence: esse último componente consome a saída dos dois anteriores para criar uma frase que usa o emoji. Por exemplo, a saída resultante pode ser "Vertex Pipelines is ✨".

Vamos começar a programar!

Etapa 1: criar um componente baseado em função do Python

Usando o SDK do KFP, podemos criar componentes baseados em funções do Python. Vamos usar isso para os três componentes do nosso primeiro pipeline. Primeiro, vamos criar o componente product_name, que simplesmente usa uma string como entrada e retorna essa string. Adicione o seguinte ao seu notebook:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Vamos ver mais detalhes da sintaxe:

  • O decorador @component compila essa função em um componente quando o pipeline é executado. Você vai usar esse decorador sempre que criar um componente personalizado.
  • O parâmetro base_image especifica a imagem de contêiner que esse componente vai usar.
  • O parâmetro output_component_file é opcional e especifica o arquivo yaml em que o componente compilado vai ser criado. Depois de executar a célula, você deve ver o arquivo gravado na sua instância do notebook. Se você quiser compartilhar esse componente com alguém, pode enviar o arquivo yaml gerado e pedir para a pessoa carregar com o seguinte:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • O -> str após a definição da função especifica o tipo de saída desse componente.

Etapa 2: criar dois componentes adicionais

Para concluir nosso pipeline, vamos criar mais dois componentes. O primeiro que vamos definir usa uma string como entrada e a converte no emoji correspondente, se houver um. O componente retorna uma tupla com o texto de entrada transmitido e o emoji resultante:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

Este componente é um pouco mais complexo que o anterior. Veja os detalhes dele:

  • O parâmetro packages_to_install informa ao componente sobre quaisquer dependências de bibliotecas externas do contêiner. Neste caso, estamos usando uma biblioteca chamada emoji.
  • Esse componente retorna um NamedTuple com o nome Outputs. Observe que cada uma das strings dessa tupla tem chaves: emoji_text e emoji. Vamos usá-las no próximo componente para acessar a saída.

O último componente desse pipeline consome a saída dos dois primeiros e as combina para retornar uma string:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

Você pode se perguntar como esse componente sabe que é preciso usar a saída das etapas anteriores que você definiu. Essa é uma boa pergunta. Vamos unir tudo na próxima etapa.

Etapa 3: unir os componentes em um pipeline

As configurações do componente definidas acima criaram funções de fábrica que podem ser usadas em uma definição de pipeline para criar etapas. Para configurar um pipeline, use o decorador @pipeline, dê um nome e uma descrição ao pipeline e forneça o caminho raiz em que os artefatos do pipeline serão gravados. Artefatos são quaisquer arquivos de saída gerados pelo seu pipeline. Este pipeline introdutório não gera nenhum artefato, mas o próximo sim.

No próximo bloco de código, definimos uma função intro_pipeline. Nela, vamos especificar as entradas das etapas iniciais do pipeline e como conectar umas às outras:

  • product_task recebe um nome de produto como entrada. Aqui, estamos transmitindo "Vertex Pipelines" mas você pode mudar isso para o que quiser.
  • A emoji_task usa o código de texto de um emoji como entrada. Também é possível alterar para o que você quiser. Por exemplo, "party_face" se refere ao emoji 🥳. Como esse componente e o product_task não têm etapas que fornecem entrada, especificamos manualmente a entrada para eles quando definimos nosso pipeline.
  • A última etapa do pipeline, consumer_task, tem três parâmetros de entrada:
    • Saída de product_task. Como essa etapa produz apenas uma saída, podemos referenciá-la usando product_task.output.
    • A saída emoji da etapa emoji_task. Consulte o componente emoji definido acima, em que nomeamos os parâmetros de saída.
    • Da mesma forma, a saída nomeada emoji_text do componente emoji. Caso o pipeline receba texto que não corresponda a um emoji, ele usará esse texto para criar uma sentença.
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Etapa 4: compilar e executar o pipeline

Com seu pipeline definido, está tudo pronto para compilá-lo. O código a seguir gera um arquivo JSON que você vai usar para executar o pipeline:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Em seguida, crie uma variável TIMESTAMP. Usaremos isto em nosso ID da tarefa:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Depois defina o job do pipeline:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Por fim, execute o job para criar uma nova execução de pipeline:

job.submit()

Depois de executar essa célula, você verá registros com um link para visualizar a execução do pipeline no console:

Registros de jobs de pipeline

Acesse esse link. Quando concluído, o pipeline ficará assim:

Pipeline de introdução concluído

Esse pipeline vai levar de 5 a 6 minutos para ser executado. Quando concluído, você pode clicar no componente build-sentence para conferir a saída final:

Saída do pipeline de introdução

Agora que você sabe como o SDK do KFP e o Vertex Pipelines funcionam, você já pode criar um pipeline que cria e implanta um modelo de ML usando outros serviços da Vertex AI. Vamos ao que interessa!

6. Crie um pipeline de ML completo

É hora de criar seu primeiro pipeline de ML. Neste pipeline, vamos usar o conjunto de dados de grãos secos de machine learning da UCI, de: KOKLU, M. e OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI

Esse é um conjunto de dados tabular que deve ser usado no pipeline para treinar, avaliar e implantar um modelo do AutoML que classifica grãos em um dos sete tipos com base nas características deles.

Esse pipeline vai fazer o seguinte:

  • Crie um Conjunto de dados em
  • Treinar um modelo de classificação tabular com o AutoML
  • receber métricas de avaliação sobre o modelo;
  • com base nas métricas de avaliação, decidir se é preciso implantar o modelo usando a lógica condicional no Vertex Pipelines;
  • implantar o modelo em um endpoint usando a Previsão da Vertex

Cada uma das etapas descritas será um componente. A maioria das etapas do pipeline vai usar componentes pré-criados para serviços da Vertex AI usando a biblioteca google_cloud_pipeline_components que importamos anteriormente neste codelab. Nesta seção, vamos primeiro definir um componente personalizado e só depois definir as outras etapas do pipeline usando componentes pré-criados. Com componentes pré-criados, é mais fácil acessar serviços da Vertex AI como o treinamento de modelos e a implantação.

Etapa 1: um componente personalizado para avaliação de modelos

O componente personalizado que vamos definir será usado ao final do pipeline assim que o treinamento do modelo for concluído. Esse componente faz algumas coisas:

  • recebe métricas de avaliação do modelo de classificação treinado do AutoML;
  • analisa as métricas e as renderiza na IU do Vertex Pipelines;
  • compara as métricas com um limite para determinar se o modelo deve ser implantado.

Antes de definir o componente, vamos entender os parâmetros de entrada e saída dele. Como entrada, esse pipeline usa alguns metadados do nosso projeto do Cloud, o modelo treinado resultante (vamos definir esse componente mais tarde), as métricas de avaliação do modelo e um thresholds_dict_str. O thresholds_dict_str é algo que definiremos quando executarmos nosso pipeline. No caso deste modelo de classificação, ele será o valor da área sob a curva ROC em que o modelo será implantado. Por exemplo, se transmitirmos 0,95, isso significa que o pipeline só vai implantar o modelo se a métrica estiver acima de 95%.

Nosso componente de avaliação retorna uma string que indica se o modelo deve ser implantado ou não. Adicione o seguinte em uma célula do notebook para criar o componente personalizado:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Etapa 2: adicionar componentes pré-criados do Google Cloud

Nesta etapa, vamos definir o restante dos componentes do pipeline e ver como todos se encaixam. Primeiro, defina o nome de exibição da sua execução de pipeline usando um carimbo de data/hora:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Depois, copie o seguinte em uma nova célula do notebook:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

Confira o que acontece nesse código:

  • Primeiro, assim como no pipeline anterior, definimos os parâmetros de entrada que esse pipeline aceita. Precisamos defini-los manualmente, já que eles não dependem da saída de outras etapas do pipeline.
  • O restante do pipeline usa alguns componentes pré-criados para interagir com serviços da Vertex AI:
    • TabularDatasetCreateOp cria um conjunto de dados tabular na Vertex AI quando recebe uma origem de conjunto de dados no Cloud Storage ou no BigQuery. Neste pipeline, estamos transmitindo os dados por um URL de tabela do BigQuery
    • AutoMLTabularTrainingJobRunOp inicia um job de treinamento do AutoML para um conjunto de dados tabular. Passamos alguns parâmetros de configuração para esse componente, incluindo o tipo de modelo (neste caso, classificação), alguns dados nas colunas, por quanto tempo queremos executar o treinamento e um ponteiro para o conjunto de dados. Para transmitir o conjunto de dados para esse componente, estamos fornecendo a saída do componente anterior via dataset_create_op.outputs["dataset"].
    • EndpointCreateOp cria um endpoint na Vertex AI. O endpoint criado nesta etapa será transmitido como entrada para o próximo componente
    • ModelDeployOp implanta um determinado modelo em um endpoint da Vertex AI. Neste caso, estamos usando o endpoint criado na etapa anterior. Há outras opções de configuração disponíveis, mas aqui estamos fornecendo o tipo de máquina do endpoint e o modelo que queremos implantar. Estamos transmitindo o modelo acessando as saídas da etapa de treinamento do pipeline
  • Este pipeline também usa a lógica condicional, um recurso do Vertex Pipelines que permite definir uma condição, junto com diferentes ramificações baseadas no resultado dessa condição. Lembre-se de que, quando definimos nosso pipeline, transmitimos um parâmetro thresholds_dict_str. Esse é o limite de precisão que estamos usando para determinar se o modelo será implantado em um endpoint. Para implementar isso, usamos a classe Condition do SDK do KFP. A condição que transmitimos é a saída do componente de avaliação personalizado que definimos anteriormente neste codelab. Se essa condição for verdadeira, o pipeline continuará executando o componente deploy_op. Se a acurácia não atingir o limite predefinido, o pipeline para aqui e não implanta um modelo.

Etapa 3: compilar e executar o pipeline de ML completo

Com nosso pipeline completo definido, é hora de compilá-lo:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Em seguida, defina o job:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

Por fim, execute o job:

ml_pipeline_job.submit()

Para conferir o pipeline no console, navegue até o link mostrado nos registros depois de executar a célula acima. A execução desse pipeline vai levar pouco mais de uma hora. A maior parte do tempo é gasta na etapa de treinamento do AutoML. O pipeline concluído parecerá com este:

Pipeline concluído do AutoML

Se você acionar o botão "Expandir artefatos" na parte de cima da tela, poderá conferir detalhes dos diferentes artefatos criados usando seu pipeline. Por exemplo, se você clicar no artefato dataset, verá detalhes sobre o conjunto de dados da Vertex AI que foi criado. Você pode clicar no link mostrado aqui para acessar a página do conjunto de dados:

Conjunto de dados do pipeline

Da mesma forma, para ver as visualizações de métricas resultantes do nosso componente de avaliação personalizado, clique no artefato chamado metricsc. No lado direito do seu painel, você pode ver a matriz de confusão do modelo:

Visualização das métricas

Para conferir o modelo e o endpoint criados nessa execução do pipeline, acesse a seção de modelos e clique no modelo chamado automl-beans. Você vai conferir esse modelo implantado em um endpoint:

Model-endpoint

Também é possível acessar essa página clicando no artefato endpoint no gráfico do seu pipeline.

Além de conferir o gráfico do pipeline no console, é possível usar o Vertex Pipelines para fazer o Rastreamento de linhagem. O rastreamento de linhagem quer dizer rastrear os artefatos criados em todo o pipeline. Isso pode nos ajudar a entender onde os artefatos foram criados e como eles estão sendo usados em um fluxo de trabalho de ML. Por exemplo, para ver o rastreamento de linhagem para o conjunto de dados criado neste pipeline, clique no artefato de conjunto de dados e em Ver linhagem:

Ver linhagem

Isso nos mostra todos os lugares em que o artefato está sendo usado:

Detalhes da linhagem

Etapa 4: comparar métricas entre as execuções de pipeline

Se você executar o pipeline muitas vezes, talvez queira compartilhar as métricas entre execuções. É possível usar o método aiplatform.get_pipeline_df() para acessar os metadados de execução. Aqui, você vai receber metadados de todas as execuções desse pipeline e carregá-los em um DataFrame do Pandas:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

Com isso, você concluiu o laboratório.

Parabéns! 🎉

Você aprendeu a usar a Vertex AI para:

  • Usar o SDK do Kubeflow Pipelines para criar pipelines completos com componentes personalizados
  • Executar pipelines no Vertex Pipelines e iniciar execuções do pipeline com o SDK
  • Acessar e analisar o gráfico do Vertex Pipelines no console
  • Usar componentes de pipeline pré-criados para adicionar serviços da Vertex AI ao pipeline
  • Programar jobs recorrentes com pipelines

Para saber mais sobre as diferentes partes da Vertex, consulte a documentação.

7. Limpeza

Para não receber cobranças, recomendamos que você exclua os recursos criados neste laboratório.

Etapa 1: interromper ou excluir a instância de Notebooks

Se você quiser continuar usando o notebook que criou neste laboratório, é recomendado que você o desligue quando não estiver usando. A partir da interface de Notebooks no seu Console do Cloud, selecione o notebook e depois clique em Parar. Para excluir a instância completamente, selecione Excluir:

Interromper instância

Etapa 2: excluir o endpoint

Para excluir o endpoint implantado, acesse a seção Endpoints do console da Vertex AI e clique no ícone de exclusão:

Excluir endpoint

Depois, clique em Cancelar a implantação no seguinte prompt:

Cancelar a implantação do modelo

Por fim, navegue até a seção Modelos do seu console, encontre o modelo desejado e, no menu de três pontos à direita, clique em Excluir modelo:

Excluir modelo

Etapa 3: excluir o bucket do Cloud Storage

Para excluir o bucket do Storage, use o menu de navegação do console do Cloud, acesse o Storage, selecione o bucket e clique em "Excluir":

Excluir armazenamento